Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/pkey.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20Common API for all public keys.
21"""
23import base64
24from base64 import encodebytes, decodebytes
25from binascii import unhexlify
26import os
27from pathlib import Path
28from hashlib import md5, sha256
29import re
30import struct
32import bcrypt
34from cryptography.hazmat.backends import default_backend
35from cryptography.hazmat.primitives import padding, serialization
36from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher
37from cryptography.hazmat.primitives import asymmetric
39from paramiko import util
40from paramiko.util import u, b
41from paramiko.common import o600
42from paramiko.ssh_exception import SSHException, PasswordRequiredException
43from paramiko.message import Message
46# TripleDES is moving from `cryptography.hazmat.primitives.ciphers.algorithms`
47# in cryptography>=43.0.0 to `cryptography.hazmat.decrepit.ciphers.algorithms`
48# It will be removed from `cryptography.hazmat.primitives.ciphers.algorithms`
49# in cryptography==48.0.0.
50#
51# Source References:
52# - https://github.com/pyca/cryptography/commit/722a6393e61b3ac
53# - https://github.com/pyca/cryptography/pull/11407/files
54try:
55 from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES
56except ImportError:
57 from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES
60OPENSSH_AUTH_MAGIC = b"openssh-key-v1\x00"
63def _unpad_openssh(data):
64 # At the moment, this is only used for unpadding private keys on disk. This
65 # really ought to be made constant time (possibly by upstreaming this logic
66 # into pyca/cryptography).
67 padding_length = data[-1]
68 if 0x20 <= padding_length < 0x7F:
69 return data # no padding, last byte part comment (printable ascii)
70 if padding_length > 15:
71 raise SSHException("Invalid key")
72 for i in range(padding_length):
73 if data[i - padding_length] != i + 1:
74 raise SSHException("Invalid key")
75 return data[:-padding_length]
78class UnknownKeyType(Exception):
79 """
80 An unknown public/private key algorithm was attempted to be read.
81 """
83 def __init__(self, key_type=None, key_bytes=None):
84 self.key_type = key_type
85 self.key_bytes = key_bytes
87 def __str__(self):
88 return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa
91class PKey:
92 """
93 Base class for public keys.
95 Also includes some "meta" level convenience constructors such as
96 `.from_type_string`.
97 """
99 # known encryption types for private key files:
100 _CIPHER_TABLE = {
101 "AES-128-CBC": {
102 "cipher": algorithms.AES,
103 "keysize": 16,
104 "blocksize": 16,
105 "mode": modes.CBC,
106 },
107 "AES-256-CBC": {
108 "cipher": algorithms.AES,
109 "keysize": 32,
110 "blocksize": 16,
111 "mode": modes.CBC,
112 },
113 "DES-EDE3-CBC": {
114 "cipher": TripleDES,
115 "keysize": 24,
116 "blocksize": 8,
117 "mode": modes.CBC,
118 },
119 }
120 _PRIVATE_KEY_FORMAT_ORIGINAL = 1
121 _PRIVATE_KEY_FORMAT_OPENSSH = 2
122 BEGIN_TAG = re.compile(r"^-{5}BEGIN (RSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$")
123 END_TAG = re.compile(r"^-{5}END (RSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$")
125 @staticmethod
126 def from_path(path, passphrase=None):
127 """
128 Attempt to instantiate appropriate key subclass from given file path.
130 :param Path path: The path to load (may also be a `str`).
132 :returns:
133 A `PKey` subclass instance.
135 :raises:
136 `UnknownKeyType`, if our crypto backend doesn't know this key type.
138 .. versionadded:: 3.2
139 """
140 # TODO: make sure sphinx is reading Path right in param list...
142 # Lazy import to avoid circular import issues
143 from paramiko import RSAKey, Ed25519Key, ECDSAKey
145 # Normalize to string, as cert suffix isn't quite an extension, so
146 # pathlib isn't useful for this.
147 path = str(path)
149 # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API
150 # /either/ the key /or/ the cert, when there is a key/cert pair.
151 cert_suffix = "-cert.pub"
152 if str(path).endswith(cert_suffix):
153 key_path = path[: -len(cert_suffix)]
154 cert_path = path
155 else:
156 key_path = path
157 cert_path = path + cert_suffix
159 key_path = Path(key_path).expanduser()
160 cert_path = Path(cert_path).expanduser()
162 data = key_path.read_bytes()
163 # Like OpenSSH, try modern/OpenSSH-specific key load first
164 try:
165 loaded = serialization.load_ssh_private_key(
166 data=data, password=passphrase
167 )
168 # Then fall back to assuming legacy PEM type
169 except ValueError:
170 loaded = serialization.load_pem_private_key(
171 data=data, password=passphrase
172 )
173 # TODO Python 3.10: match statement? (NOTE: we cannot use a dict
174 # because the results from the loader are literal backend, eg openssl,
175 # private classes, so isinstance tests work but exact 'x class is y'
176 # tests will not work)
177 # TODO: leverage already-parsed/math'd obj to avoid duplicate cpu
178 # cycles? seemingly requires most of our key subclasses to be rewritten
179 # to be cryptography-object-forward. this is still likely faster than
180 # the old SSHClient code that just tried instantiating every class!
181 key_class = None
182 if isinstance(loaded, asymmetric.rsa.RSAPrivateKey):
183 key_class = RSAKey
184 elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey):
185 key_class = Ed25519Key
186 elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey):
187 key_class = ECDSAKey
188 else:
189 raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__)
190 with key_path.open() as fd:
191 key = key_class.from_private_key(fd, password=passphrase)
192 if cert_path.exists():
193 # load_certificate can take Message, path-str, or value-str
194 key.load_certificate(str(cert_path))
195 return key
197 @staticmethod
198 def from_type_string(key_type, key_bytes):
199 """
200 Given type `str` & raw `bytes`, return a `PKey` subclass instance.
202 For example, ``PKey.from_type_string("ssh-ed25519", <public bytes>)``
203 will (if successful) return a new `.Ed25519Key`.
205 :param str key_type:
206 The key type, eg ``"ssh-ed25519"``.
207 :param bytes key_bytes:
208 The raw byte data forming the key material, as expected by
209 subclasses' ``data`` parameter.
211 :returns:
212 A `PKey` subclass instance.
214 :raises:
215 `UnknownKeyType`, if no registered classes knew about this type.
217 .. versionadded:: 3.2
218 """
219 from paramiko import key_classes
221 for key_class in key_classes:
222 if key_type in key_class.identifiers():
223 # TODO: needs to passthru things like passphrase
224 return key_class(data=key_bytes)
225 raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes)
227 @classmethod
228 def identifiers(cls):
229 """
230 returns an iterable of key format/name strings this class can handle.
232 Most classes only have a single identifier, and thus this default
233 implementation suffices; see `.ECDSAKey` for one example of an
234 override.
235 """
236 return [cls.name]
238 # TODO 4.0: make this and subclasses consistent, some of our own
239 # classmethods even assume kwargs we don't define!
240 # TODO 4.0: prob also raise NotImplementedError instead of pass'ing; the
241 # contract is pretty obviously that you need to handle msg/data/filename
242 # appropriately. (If 'pass' is a concession to testing, see about doing the
243 # work to fix the tests instead)
244 def __init__(self, msg=None, data=None):
245 """
246 Create a new instance of this public key type. If ``msg`` is given,
247 the key's public part(s) will be filled in from the message. If
248 ``data`` is given, the key's public part(s) will be filled in from
249 the string.
251 :param .Message msg:
252 an optional SSH `.Message` containing a public key of this type.
253 :param bytes data:
254 optional, the bytes of a public key of this type
256 :raises: `.SSHException` --
257 if a key cannot be created from the ``data`` or ``msg`` given, or
258 no key was passed in.
259 """
260 pass
262 # TODO: arguably this might want to be __str__ instead? ehh
263 # TODO: ditto the interplay between showing class name (currently we just
264 # say PKey writ large) and algorithm (usually == class name, but not
265 # always, also sometimes shows certificate-ness)
266 # TODO: if we do change it, we also want to tweak eg AgentKey, as it
267 # currently displays agent-ness with a suffix
268 def __repr__(self):
269 comment = ""
270 # Works for AgentKey, may work for others?
271 if hasattr(self, "comment") and self.comment:
272 comment = f", comment={self.comment!r}"
273 return f"PKey(alg={self.algorithm_name}, bits={self.get_bits()}, fp={self.fingerprint}{comment})" # noqa
275 # TODO 4.0: just merge into __bytes__ (everywhere)
276 def asbytes(self):
277 """
278 Return a string of an SSH `.Message` made up of the public part(s) of
279 this key. This string is suitable for passing to `__init__` to
280 re-create the key object later.
281 """
282 return bytes()
284 def __bytes__(self):
285 return self.asbytes()
287 def __eq__(self, other):
288 return isinstance(other, PKey) and self._fields == other._fields
290 def __hash__(self):
291 return hash(self._fields)
293 @property
294 def _fields(self):
295 raise NotImplementedError
297 def get_name(self):
298 """
299 Return the name of this private key implementation.
301 :return:
302 name of this private key type, in SSH terminology, as a `str` (for
303 example, ``"ssh-rsa"``).
304 """
305 return ""
307 @property
308 def algorithm_name(self):
309 """
310 Return the key algorithm identifier for this key.
312 Similar to `get_name`, but aimed at pure algorithm name instead of SSH
313 protocol field value.
314 """
315 # Nuke the leading 'ssh-'
316 # TODO in Python 3.9: use .removeprefix()
317 name = self.get_name().replace("ssh-", "")
318 # Trim any cert suffix (but leave the -cert, as OpenSSH does)
319 cert_tail = "-cert-v01@openssh.com"
320 if cert_tail in name:
321 name = name.replace(cert_tail, "-cert")
322 # Nuke any eg ECDSA suffix, OpenSSH does basically this too.
323 else:
324 name = name.split("-")[0]
325 return name.upper()
327 def get_bits(self):
328 """
329 Return the number of significant bits in this key. This is useful
330 for judging the relative security of a key.
332 :return: bits in the key (as an `int`)
333 """
334 # TODO 4.0: raise NotImplementedError, 0 is unlikely to ever be
335 # _correct_ and nothing in the critical path seems to use this.
336 return 0
338 def can_sign(self):
339 """
340 Return ``True`` if this key has the private part necessary for signing
341 data.
342 """
343 return False
345 def get_fingerprint(self):
346 """
347 Return an MD5 fingerprint of the public part of this key. Nothing
348 secret is revealed.
350 :return:
351 a 16-byte `string <str>` (binary) of the MD5 fingerprint, in SSH
352 format.
353 """
354 return md5(self.asbytes()).digest()
356 @property
357 def fingerprint(self):
358 """
359 Modern fingerprint property designed to be comparable to OpenSSH.
361 Currently only does SHA256 (the OpenSSH default).
363 .. versionadded:: 3.2
364 """
365 hashy = sha256(bytes(self))
366 hash_name = hashy.name.upper()
367 b64ed = encodebytes(hashy.digest())
368 cleaned = u(b64ed).strip().rstrip("=") # yes, OpenSSH does this too!
369 return f"{hash_name}:{cleaned}"
371 def get_base64(self):
372 """
373 Return a base64 string containing the public part of this key. Nothing
374 secret is revealed. This format is compatible with that used to store
375 public key files or recognized host keys.
377 :return: a base64 `string <str>` containing the public part of the key.
378 """
379 return u(encodebytes(self.asbytes())).replace("\n", "")
381 def sign_ssh_data(self, data, algorithm=None):
382 """
383 Sign a blob of data with this private key, and return a `.Message`
384 representing an SSH signature message.
386 :param bytes data:
387 the data to sign.
388 :param str algorithm:
389 the signature algorithm to use, if different from the key's
390 internal name. Default: ``None``.
391 :return: an SSH signature `message <.Message>`.
393 .. versionchanged:: 2.9
394 Added the ``algorithm`` kwarg.
395 """
396 return bytes()
398 def verify_ssh_sig(self, data, msg):
399 """
400 Given a blob of data, and an SSH message representing a signature of
401 that data, verify that it was signed with this key.
403 :param bytes data: the data that was signed.
404 :param .Message msg: an SSH signature message
405 :return:
406 ``True`` if the signature verifies correctly; ``False`` otherwise.
407 """
408 return False
410 @classmethod
411 def from_private_key_file(cls, filename, password=None):
412 """
413 Create a key object by reading a private key file. If the private
414 key is encrypted and ``password`` is not ``None``, the given password
415 will be used to decrypt the key (otherwise `.PasswordRequiredException`
416 is thrown). Through the magic of Python, this factory method will
417 exist in all subclasses of PKey (such as `.RSAKey`), but
418 is useless on the abstract PKey class.
420 :param str filename: name of the file to read
421 :param str password:
422 an optional password to use to decrypt the key file, if it's
423 encrypted
424 :return: a new `.PKey` based on the given private key
426 :raises: ``IOError`` -- if there was an error reading the file
427 :raises: `.PasswordRequiredException` -- if the private key file is
428 encrypted, and ``password`` is ``None``
429 :raises: `.SSHException` -- if the key file is invalid
430 """
431 key = cls(filename=filename, password=password)
432 return key
434 @classmethod
435 def from_private_key(cls, file_obj, password=None):
436 """
437 Create a key object by reading a private key from a file (or file-like)
438 object. If the private key is encrypted and ``password`` is not
439 ``None``, the given password will be used to decrypt the key (otherwise
440 `.PasswordRequiredException` is thrown).
442 :param file_obj: the file-like object to read from
443 :param str password:
444 an optional password to use to decrypt the key, if it's encrypted
445 :return: a new `.PKey` based on the given private key
447 :raises: ``IOError`` -- if there was an error reading the key
448 :raises: `.PasswordRequiredException` --
449 if the private key file is encrypted, and ``password`` is ``None``
450 :raises: `.SSHException` -- if the key file is invalid
451 """
452 key = cls(file_obj=file_obj, password=password)
453 return key
455 def write_private_key_file(self, filename, password=None):
456 """
457 Write private key contents into a file. If the password is not
458 ``None``, the key is encrypted before writing.
460 :param str filename: name of the file to write
461 :param str password:
462 an optional password to use to encrypt the key file
464 :raises: ``IOError`` -- if there was an error writing the file
465 :raises: `.SSHException` -- if the key is invalid
466 """
467 raise Exception("Not implemented in PKey")
469 def write_private_key(self, file_obj, password=None):
470 """
471 Write private key contents into a file (or file-like) object. If the
472 password is not ``None``, the key is encrypted before writing.
474 :param file_obj: the file-like object to write into
475 :param str password: an optional password to use to encrypt the key
477 :raises: ``IOError`` -- if there was an error writing to the file
478 :raises: `.SSHException` -- if the key is invalid
479 """
480 # TODO 4.0: NotImplementedError (plus everywhere else in here)
481 raise Exception("Not implemented in PKey")
483 def _read_private_key_file(self, tag, filename, password=None):
484 """
485 Read an SSH2-format private key file, looking for a string of the type
486 ``"BEGIN xxx PRIVATE KEY"`` for some ``xxx``, base64-decode the text we
487 find, and return it as a string. If the private key is encrypted and
488 ``password`` is not ``None``, the given password will be used to
489 decrypt the key (otherwise `.PasswordRequiredException` is thrown).
491 :param str tag:
492 ``"RSA"`` (or etc), the tag used to mark the data block.
493 :param str filename:
494 name of the file to read.
495 :param str password:
496 an optional password to use to decrypt the key file, if it's
497 encrypted.
498 :return:
499 the `bytes` that make up the private key.
501 :raises: ``IOError`` -- if there was an error reading the file.
502 :raises: `.PasswordRequiredException` -- if the private key file is
503 encrypted, and ``password`` is ``None``.
504 :raises: `.SSHException` -- if the key file is invalid.
505 """
506 with open(filename, "r") as f:
507 data = self._read_private_key(tag, f, password)
508 return data
510 def _read_private_key(self, tag, f, password=None):
511 lines = f.readlines()
512 if not lines:
513 raise SSHException("no lines in {} private key file".format(tag))
515 # find the BEGIN tag
516 start = 0
517 m = self.BEGIN_TAG.match(lines[start])
518 line_range = len(lines) - 1
519 while start < line_range and not m:
520 start += 1
521 m = self.BEGIN_TAG.match(lines[start])
522 start += 1
523 keytype = m.group(1) if m else None
524 if start >= len(lines) or keytype is None:
525 raise SSHException("not a valid {} private key file".format(tag))
527 # find the END tag
528 end = start
529 m = self.END_TAG.match(lines[end])
530 while end < line_range and not m:
531 end += 1
532 m = self.END_TAG.match(lines[end])
534 if keytype == tag:
535 data = self._read_private_key_pem(lines, end, password)
536 pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL
537 elif keytype == "OPENSSH":
538 data = self._read_private_key_openssh(lines[start:end], password)
539 pkformat = self._PRIVATE_KEY_FORMAT_OPENSSH
540 else:
541 raise SSHException(
542 "encountered {} key, expected {} key".format(keytype, tag)
543 )
545 return pkformat, data
547 def _got_bad_key_format_id(self, id_):
548 err = "{}._read_private_key() spat out an unknown key format id '{}'"
549 raise SSHException(err.format(self.__class__.__name__, id_))
551 def _read_private_key_pem(self, lines, end, password):
552 start = 0
553 # parse any headers first
554 headers = {}
555 start += 1
556 while start < len(lines):
557 line = lines[start].split(": ")
558 if len(line) == 1:
559 break
560 headers[line[0].lower()] = line[1].strip()
561 start += 1
562 # if we trudged to the end of the file, just try to cope.
563 try:
564 data = decodebytes(b("".join(lines[start:end])))
565 except base64.binascii.Error as e:
566 raise SSHException("base64 decoding error: {}".format(e))
567 if "proc-type" not in headers:
568 # unencryped: done
569 return data
570 # encrypted keyfile: will need a password
571 proc_type = headers["proc-type"]
572 if proc_type != "4,ENCRYPTED":
573 raise SSHException(
574 'Unknown private key structure "{}"'.format(proc_type)
575 )
576 try:
577 encryption_type, saltstr = headers["dek-info"].split(",")
578 except:
579 raise SSHException("Can't parse DEK-info in private key file")
580 if encryption_type not in self._CIPHER_TABLE:
581 raise SSHException(
582 'Unknown private key cipher "{}"'.format(encryption_type)
583 )
584 # if no password was passed in,
585 # raise an exception pointing out that we need one
586 if password is None:
587 raise PasswordRequiredException("Private key file is encrypted")
588 cipher = self._CIPHER_TABLE[encryption_type]["cipher"]
589 keysize = self._CIPHER_TABLE[encryption_type]["keysize"]
590 mode = self._CIPHER_TABLE[encryption_type]["mode"]
591 salt = unhexlify(b(saltstr))
592 key = util.generate_key_bytes(md5, salt, password, keysize)
593 decryptor = Cipher(
594 cipher(key), mode(salt), backend=default_backend()
595 ).decryptor()
596 decrypted_data = decryptor.update(data) + decryptor.finalize()
597 unpadder = padding.PKCS7(cipher.block_size).unpadder()
598 try:
599 return unpadder.update(decrypted_data) + unpadder.finalize()
600 except ValueError:
601 raise SSHException("Bad password or corrupt private key file")
603 def _read_private_key_openssh(self, lines, password):
604 """
605 Read the new OpenSSH SSH2 private key format available
606 since OpenSSH version 6.5
607 Reference:
608 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key
609 """
610 try:
611 data = decodebytes(b("".join(lines)))
612 except base64.binascii.Error as e:
613 raise SSHException("base64 decoding error: {}".format(e))
615 # read data struct
616 auth_magic = data[:15]
617 if auth_magic != OPENSSH_AUTH_MAGIC:
618 raise SSHException("unexpected OpenSSH key header encountered")
620 cstruct = self._uint32_cstruct_unpack(data[15:], "sssur")
621 cipher, kdfname, kdf_options, num_pubkeys, remainder = cstruct
622 # For now, just support 1 key.
623 if num_pubkeys > 1:
624 raise SSHException(
625 "unsupported: private keyfile has multiple keys"
626 )
627 pubkey, privkey_blob = self._uint32_cstruct_unpack(remainder, "ss")
629 if kdfname == b("bcrypt"):
630 if cipher == b("aes256-cbc"):
631 mode = modes.CBC
632 elif cipher == b("aes256-ctr"):
633 mode = modes.CTR
634 else:
635 raise SSHException(
636 "unknown cipher `{}` used in private key file".format(
637 cipher.decode("utf-8")
638 )
639 )
640 # Encrypted private key.
641 # If no password was passed in, raise an exception pointing
642 # out that we need one
643 if password is None:
644 raise PasswordRequiredException(
645 "private key file is encrypted"
646 )
648 # Unpack salt and rounds from kdfoptions
649 salt, rounds = self._uint32_cstruct_unpack(kdf_options, "su")
651 # run bcrypt kdf to derive key and iv/nonce (32 + 16 bytes)
652 key_iv = bcrypt.kdf(
653 b(password),
654 b(salt),
655 48,
656 rounds,
657 # We can't control how many rounds are on disk, so no sense
658 # warning about it.
659 ignore_few_rounds=True,
660 )
661 key = key_iv[:32]
662 iv = key_iv[32:]
664 # decrypt private key blob
665 decryptor = Cipher(
666 algorithms.AES(key), mode(iv), default_backend()
667 ).decryptor()
668 decrypted_privkey = decryptor.update(privkey_blob)
669 decrypted_privkey += decryptor.finalize()
670 elif cipher == b("none") and kdfname == b("none"):
671 # Unencrypted private key
672 decrypted_privkey = privkey_blob
673 else:
674 raise SSHException(
675 "unknown cipher or kdf used in private key file"
676 )
678 # Unpack private key and verify checkints
679 cstruct = self._uint32_cstruct_unpack(decrypted_privkey, "uusr")
680 checkint1, checkint2, keytype, keydata = cstruct
682 if checkint1 != checkint2:
683 raise SSHException(
684 "OpenSSH private key file checkints do not match"
685 )
687 return _unpad_openssh(keydata)
689 def _uint32_cstruct_unpack(self, data, strformat):
690 """
691 Used to read new OpenSSH private key format.
692 Unpacks a c data structure containing a mix of 32-bit uints and
693 variable length strings prefixed by 32-bit uint size field,
694 according to the specified format. Returns the unpacked vars
695 in a tuple.
696 Format strings:
697 s - denotes a string
698 i - denotes a long integer, encoded as a byte string
699 u - denotes a 32-bit unsigned integer
700 r - the remainder of the input string, returned as a string
701 """
702 arr = []
703 idx = 0
704 try:
705 for f in strformat:
706 if f == "s":
707 # string
708 s_size = struct.unpack(">L", data[idx : idx + 4])[0]
709 idx += 4
710 s = data[idx : idx + s_size]
711 idx += s_size
712 arr.append(s)
713 if f == "i":
714 # long integer
715 s_size = struct.unpack(">L", data[idx : idx + 4])[0]
716 idx += 4
717 s = data[idx : idx + s_size]
718 idx += s_size
719 i = util.inflate_long(s, True)
720 arr.append(i)
721 elif f == "u":
722 # 32-bit unsigned int
723 u = struct.unpack(">L", data[idx : idx + 4])[0]
724 idx += 4
725 arr.append(u)
726 elif f == "r":
727 # remainder as string
728 s = data[idx:]
729 arr.append(s)
730 break
731 except Exception as e:
732 # PKey-consuming code frequently wants to save-and-skip-over issues
733 # with loading keys, and uses SSHException as the (really friggin
734 # awful) signal for this. So for now...we do this.
735 raise SSHException(str(e))
736 return tuple(arr)
738 def _write_private_key_file(self, filename, key, format, password=None):
739 """
740 Write an SSH2-format private key file in a form that can be read by
741 paramiko or openssh. If no password is given, the key is written in
742 a trivially-encoded format (base64) which is completely insecure. If
743 a password is given, DES-EDE3-CBC is used.
745 :param str tag:
746 ``"RSA"`` or etc, the tag used to mark the data block.
747 :param filename: name of the file to write.
748 :param bytes data: data blob that makes up the private key.
749 :param str password: an optional password to use to encrypt the file.
751 :raises: ``IOError`` -- if there was an error writing the file.
752 """
753 # Ensure that we create new key files directly with a user-only mode,
754 # instead of opening, writing, then chmodding, which leaves us open to
755 # CVE-2022-24302.
756 with os.fdopen(
757 os.open(
758 filename,
759 # NOTE: O_TRUNC is a noop on new files, and O_CREAT is a noop
760 # on existing files, so using all 3 in both cases is fine.
761 flags=os.O_WRONLY | os.O_TRUNC | os.O_CREAT,
762 # Ditto the use of the 'mode' argument; it should be safe to
763 # give even for existing files (though it will not act like a
764 # chmod in that case).
765 mode=o600,
766 ),
767 # Yea, you still gotta inform the FLO that it is in "write" mode.
768 "w",
769 ) as f:
770 self._write_private_key(f, key, format, password=password)
772 def _write_private_key(self, f, key, format, password=None):
773 if password is None:
774 encryption = serialization.NoEncryption()
775 else:
776 encryption = serialization.BestAvailableEncryption(b(password))
778 f.write(
779 key.private_bytes(
780 serialization.Encoding.PEM, format, encryption
781 ).decode()
782 )
784 def _check_type_and_load_cert(self, msg, key_type, cert_type):
785 """
786 Perform message type-checking & optional certificate loading.
788 This includes fast-forwarding cert ``msg`` objects past the nonce, so
789 that the subsequent fields are the key numbers; thus the caller may
790 expect to treat the message as key material afterwards either way.
792 The obtained key type is returned for classes which need to know what
793 it was (e.g. ECDSA.)
794 """
795 # Normalization; most classes have a single key type and give a string,
796 # but eg ECDSA is a 1:N mapping.
797 key_types = key_type
798 cert_types = cert_type
799 if isinstance(key_type, str):
800 key_types = [key_types]
801 if isinstance(cert_types, str):
802 cert_types = [cert_types]
803 # Can't do much with no message, that should've been handled elsewhere
804 if msg is None:
805 raise SSHException("Key object may not be empty")
806 # First field is always key type, in either kind of object. (make sure
807 # we rewind before grabbing it - sometimes caller had to do their own
808 # introspection first!)
809 msg.rewind()
810 type_ = msg.get_text()
811 # Regular public key - nothing special to do besides the implicit
812 # type check.
813 if type_ in key_types:
814 pass
815 # OpenSSH-compatible certificate - store full copy as .public_blob
816 # (so signing works correctly) and then fast-forward past the
817 # nonce.
818 elif type_ in cert_types:
819 # This seems the cleanest way to 'clone' an already-being-read
820 # message; they're *IO objects at heart and their .getvalue()
821 # always returns the full value regardless of pointer position.
822 self.load_certificate(Message(msg.asbytes()))
823 # Read out nonce as it comes before the public numbers - our caller
824 # is likely going to use the (only borrowed by us, not owned)
825 # 'msg' object for loading those numbers right after this.
826 # TODO: usefully interpret it & other non-public-number fields
827 # (requires going back into per-type subclasses.)
828 msg.get_string()
829 else:
830 err = "Invalid key (class: {}, data type: {}"
831 raise SSHException(err.format(self.__class__.__name__, type_))
833 def load_certificate(self, value):
834 """
835 Supplement the private key contents with data loaded from an OpenSSH
836 public key (``.pub``) or certificate (``-cert.pub``) file, a string
837 containing such a file, or a `.Message` object.
839 The .pub contents adds no real value, since the private key
840 file includes sufficient information to derive the public
841 key info. For certificates, however, this can be used on
842 the client side to offer authentication requests to the server
843 based on certificate instead of raw public key.
845 See:
846 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys
848 Note: very little effort is made to validate the certificate contents,
849 that is for the server to decide if it is good enough to authenticate
850 successfully.
851 """
852 if isinstance(value, Message):
853 constructor = "from_message"
854 elif os.path.isfile(value):
855 constructor = "from_file"
856 else:
857 constructor = "from_string"
858 blob = getattr(PublicBlob, constructor)(value)
859 if not blob.key_type.startswith(self.get_name()):
860 err = "PublicBlob type {} incompatible with key type {}"
861 raise ValueError(err.format(blob.key_type, self.get_name()))
862 self.public_blob = blob
865# General construct for an OpenSSH style Public Key blob
866# readable from a one-line file of the format:
867# <key-name> <base64-blob> [<comment>]
868# Of little value in the case of standard public keys
869# {ssh-rsa, ssh-ecdsa, ssh-ed25519}, but should
870# provide rudimentary support for {*-cert.v01}
871class PublicBlob:
872 """
873 OpenSSH plain public key or OpenSSH signed public key (certificate).
875 Tries to be as dumb as possible and barely cares about specific
876 per-key-type data.
878 .. note::
880 Most of the time you'll want to call `from_file`, `from_string` or
881 `from_message` for useful instantiation, the main constructor is
882 basically "I should be using ``attrs`` for this."
883 """
885 def __init__(self, type_, blob, comment=None):
886 """
887 Create a new public blob of given type and contents.
889 :param str type_: Type indicator, eg ``ssh-rsa``.
890 :param bytes blob: The blob bytes themselves.
891 :param str comment: A comment, if one was given (e.g. file-based.)
892 """
893 self.key_type = type_
894 self.key_blob = blob
895 self.comment = comment
897 @classmethod
898 def from_file(cls, filename):
899 """
900 Create a public blob from a ``-cert.pub``-style file on disk.
901 """
902 with open(filename) as f:
903 string = f.read()
904 return cls.from_string(string)
906 @classmethod
907 def from_string(cls, string):
908 """
909 Create a public blob from a ``-cert.pub``-style string.
910 """
911 fields = string.split(None, 2)
912 if len(fields) < 2:
913 msg = "Not enough fields for public blob: {}"
914 raise ValueError(msg.format(fields))
915 key_type = fields[0]
916 key_blob = decodebytes(b(fields[1]))
917 try:
918 comment = fields[2].strip()
919 except IndexError:
920 comment = None
921 # Verify that the blob message first (string) field matches the
922 # key_type
923 m = Message(key_blob)
924 blob_type = m.get_text()
925 if blob_type != key_type:
926 deets = "key type={!r}, but blob type={!r}".format(
927 key_type, blob_type
928 )
929 raise ValueError("Invalid PublicBlob contents: {}".format(deets))
930 # All good? All good.
931 return cls(type_=key_type, blob=key_blob, comment=comment)
933 @classmethod
934 def from_message(cls, message):
935 """
936 Create a public blob from a network `.Message`.
938 Specifically, a cert-bearing pubkey auth packet, because by definition
939 OpenSSH-style certificates 'are' their own network representation."
940 """
941 type_ = message.get_text()
942 return cls(type_=type_, blob=message.asbytes())
944 def __str__(self):
945 ret = "{} public key/certificate".format(self.key_type)
946 if self.comment:
947 ret += "- {}".format(self.comment)
948 return ret
950 def __eq__(self, other):
951 # Just piggyback on Message/BytesIO, since both of these should be one.
952 return self and other and self.key_blob == other.key_blob
954 def __ne__(self, other):
955 return not self == other