Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/pkey.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20Common API for all public keys.
21"""
23import base64
24from base64 import encodebytes, decodebytes
25from binascii import unhexlify
26import os
27from pathlib import Path
28from hashlib import md5, sha256
29import re
30import struct
32import bcrypt
34from cryptography.hazmat.backends import default_backend
35from cryptography.hazmat.primitives import padding, serialization
36from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher
37from cryptography.hazmat.primitives import asymmetric
39from paramiko import util
40from paramiko.util import u, b
41from paramiko.common import o600
42from paramiko.ssh_exception import SSHException, PasswordRequiredException
43from paramiko.message import Message
46# TripleDES is moving from `cryptography.hazmat.primitives.ciphers.algorithms`
47# in cryptography>=43.0.0 to `cryptography.hazmat.decrepit.ciphers.algorithms`
48# It will be removed from `cryptography.hazmat.primitives.ciphers.algorithms`
49# in cryptography==48.0.0.
50#
51# Source References:
52# - https://github.com/pyca/cryptography/commit/722a6393e61b3ac
53# - https://github.com/pyca/cryptography/pull/11407/files
54try:
55 from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES
56except ImportError:
57 from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES
60OPENSSH_AUTH_MAGIC = b"openssh-key-v1\x00"
63def _unpad_openssh(data):
64 # At the moment, this is only used for unpadding private keys on disk. This
65 # really ought to be made constant time (possibly by upstreaming this logic
66 # into pyca/cryptography).
67 padding_length = data[-1]
68 if 0x20 <= padding_length < 0x7F:
69 return data # no padding, last byte part comment (printable ascii)
70 if padding_length > 15:
71 raise SSHException("Invalid key")
72 for i in range(padding_length):
73 if data[i - padding_length] != i + 1:
74 raise SSHException("Invalid key")
75 return data[:-padding_length]
78class UnknownKeyType(Exception):
79 """
80 An unknown public/private key algorithm was attempted to be read.
81 """
83 def __init__(self, key_type=None, key_bytes=None):
84 self.key_type = key_type
85 self.key_bytes = key_bytes
87 def __str__(self):
88 return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa
91class PKey:
92 """
93 Base class for public keys.
95 Also includes some "meta" level convenience constructors such as
96 `.from_type_string`.
97 """
99 # known encryption types for private key files:
100 _CIPHER_TABLE = {
101 "AES-128-CBC": {
102 "cipher": algorithms.AES,
103 "keysize": 16,
104 "blocksize": 16,
105 "mode": modes.CBC,
106 },
107 "AES-256-CBC": {
108 "cipher": algorithms.AES,
109 "keysize": 32,
110 "blocksize": 16,
111 "mode": modes.CBC,
112 },
113 "DES-EDE3-CBC": {
114 "cipher": TripleDES,
115 "keysize": 24,
116 "blocksize": 8,
117 "mode": modes.CBC,
118 },
119 }
120 _PRIVATE_KEY_FORMAT_ORIGINAL = 1
121 _PRIVATE_KEY_FORMAT_OPENSSH = 2
122 BEGIN_TAG = re.compile(
123 r"^-{5}BEGIN (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$"
124 )
125 END_TAG = re.compile(r"^-{5}END (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$")
127 @staticmethod
128 def from_path(path, passphrase=None):
129 """
130 Attempt to instantiate appropriate key subclass from given file path.
132 :param Path path: The path to load (may also be a `str`).
134 :returns:
135 A `PKey` subclass instance.
137 :raises:
138 `UnknownKeyType`, if our crypto backend doesn't know this key type.
140 .. versionadded:: 3.2
141 """
142 # TODO: make sure sphinx is reading Path right in param list...
144 # Lazy import to avoid circular import issues
145 from paramiko import DSSKey, RSAKey, Ed25519Key, ECDSAKey
147 # Normalize to string, as cert suffix isn't quite an extension, so
148 # pathlib isn't useful for this.
149 path = str(path)
151 # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API
152 # /either/ the key /or/ the cert, when there is a key/cert pair.
153 cert_suffix = "-cert.pub"
154 if str(path).endswith(cert_suffix):
155 key_path = path[: -len(cert_suffix)]
156 cert_path = path
157 else:
158 key_path = path
159 cert_path = path + cert_suffix
161 key_path = Path(key_path).expanduser()
162 cert_path = Path(cert_path).expanduser()
164 data = key_path.read_bytes()
165 # Like OpenSSH, try modern/OpenSSH-specific key load first
166 try:
167 loaded = serialization.load_ssh_private_key(
168 data=data, password=passphrase
169 )
170 # Then fall back to assuming legacy PEM type
171 except ValueError:
172 loaded = serialization.load_pem_private_key(
173 data=data, password=passphrase
174 )
175 # TODO Python 3.10: match statement? (NOTE: we cannot use a dict
176 # because the results from the loader are literal backend, eg openssl,
177 # private classes, so isinstance tests work but exact 'x class is y'
178 # tests will not work)
179 # TODO: leverage already-parsed/math'd obj to avoid duplicate cpu
180 # cycles? seemingly requires most of our key subclasses to be rewritten
181 # to be cryptography-object-forward. this is still likely faster than
182 # the old SSHClient code that just tried instantiating every class!
183 key_class = None
184 if isinstance(loaded, asymmetric.dsa.DSAPrivateKey):
185 key_class = DSSKey
186 elif isinstance(loaded, asymmetric.rsa.RSAPrivateKey):
187 key_class = RSAKey
188 elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey):
189 key_class = Ed25519Key
190 elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey):
191 key_class = ECDSAKey
192 else:
193 raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__)
194 with key_path.open() as fd:
195 key = key_class.from_private_key(fd, password=passphrase)
196 if cert_path.exists():
197 # load_certificate can take Message, path-str, or value-str
198 key.load_certificate(str(cert_path))
199 return key
201 @staticmethod
202 def from_type_string(key_type, key_bytes):
203 """
204 Given type `str` & raw `bytes`, return a `PKey` subclass instance.
206 For example, ``PKey.from_type_string("ssh-ed25519", <public bytes>)``
207 will (if successful) return a new `.Ed25519Key`.
209 :param str key_type:
210 The key type, eg ``"ssh-ed25519"``.
211 :param bytes key_bytes:
212 The raw byte data forming the key material, as expected by
213 subclasses' ``data`` parameter.
215 :returns:
216 A `PKey` subclass instance.
218 :raises:
219 `UnknownKeyType`, if no registered classes knew about this type.
221 .. versionadded:: 3.2
222 """
223 from paramiko import key_classes
225 for key_class in key_classes:
226 if key_type in key_class.identifiers():
227 # TODO: needs to passthru things like passphrase
228 return key_class(data=key_bytes)
229 raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes)
231 @classmethod
232 def identifiers(cls):
233 """
234 returns an iterable of key format/name strings this class can handle.
236 Most classes only have a single identifier, and thus this default
237 implementation suffices; see `.ECDSAKey` for one example of an
238 override.
239 """
240 return [cls.name]
242 # TODO 4.0: make this and subclasses consistent, some of our own
243 # classmethods even assume kwargs we don't define!
244 # TODO 4.0: prob also raise NotImplementedError instead of pass'ing; the
245 # contract is pretty obviously that you need to handle msg/data/filename
246 # appropriately. (If 'pass' is a concession to testing, see about doing the
247 # work to fix the tests instead)
248 def __init__(self, msg=None, data=None):
249 """
250 Create a new instance of this public key type. If ``msg`` is given,
251 the key's public part(s) will be filled in from the message. If
252 ``data`` is given, the key's public part(s) will be filled in from
253 the string.
255 :param .Message msg:
256 an optional SSH `.Message` containing a public key of this type.
257 :param bytes data:
258 optional, the bytes of a public key of this type
260 :raises: `.SSHException` --
261 if a key cannot be created from the ``data`` or ``msg`` given, or
262 no key was passed in.
263 """
264 pass
266 # TODO: arguably this might want to be __str__ instead? ehh
267 # TODO: ditto the interplay between showing class name (currently we just
268 # say PKey writ large) and algorithm (usually == class name, but not
269 # always, also sometimes shows certificate-ness)
270 # TODO: if we do change it, we also want to tweak eg AgentKey, as it
271 # currently displays agent-ness with a suffix
272 def __repr__(self):
273 comment = ""
274 # Works for AgentKey, may work for others?
275 if hasattr(self, "comment") and self.comment:
276 comment = f", comment={self.comment!r}"
277 return f"PKey(alg={self.algorithm_name}, bits={self.get_bits()}, fp={self.fingerprint}{comment})" # noqa
279 # TODO 4.0: just merge into __bytes__ (everywhere)
280 def asbytes(self):
281 """
282 Return a string of an SSH `.Message` made up of the public part(s) of
283 this key. This string is suitable for passing to `__init__` to
284 re-create the key object later.
285 """
286 return bytes()
288 def __bytes__(self):
289 return self.asbytes()
291 def __eq__(self, other):
292 return isinstance(other, PKey) and self._fields == other._fields
294 def __hash__(self):
295 return hash(self._fields)
297 @property
298 def _fields(self):
299 raise NotImplementedError
301 def get_name(self):
302 """
303 Return the name of this private key implementation.
305 :return:
306 name of this private key type, in SSH terminology, as a `str` (for
307 example, ``"ssh-rsa"``).
308 """
309 return ""
311 @property
312 def algorithm_name(self):
313 """
314 Return the key algorithm identifier for this key.
316 Similar to `get_name`, but aimed at pure algorithm name instead of SSH
317 protocol field value.
318 """
319 # Nuke the leading 'ssh-'
320 # TODO in Python 3.9: use .removeprefix()
321 name = self.get_name().replace("ssh-", "")
322 # Trim any cert suffix (but leave the -cert, as OpenSSH does)
323 cert_tail = "-cert-v01@openssh.com"
324 if cert_tail in name:
325 name = name.replace(cert_tail, "-cert")
326 # Nuke any eg ECDSA suffix, OpenSSH does basically this too.
327 else:
328 name = name.split("-")[0]
329 return name.upper()
331 def get_bits(self):
332 """
333 Return the number of significant bits in this key. This is useful
334 for judging the relative security of a key.
336 :return: bits in the key (as an `int`)
337 """
338 # TODO 4.0: raise NotImplementedError, 0 is unlikely to ever be
339 # _correct_ and nothing in the critical path seems to use this.
340 return 0
342 def can_sign(self):
343 """
344 Return ``True`` if this key has the private part necessary for signing
345 data.
346 """
347 return False
349 def get_fingerprint(self):
350 """
351 Return an MD5 fingerprint of the public part of this key. Nothing
352 secret is revealed.
354 :return:
355 a 16-byte `string <str>` (binary) of the MD5 fingerprint, in SSH
356 format.
357 """
358 return md5(self.asbytes()).digest()
360 @property
361 def fingerprint(self):
362 """
363 Modern fingerprint property designed to be comparable to OpenSSH.
365 Currently only does SHA256 (the OpenSSH default).
367 .. versionadded:: 3.2
368 """
369 hashy = sha256(bytes(self))
370 hash_name = hashy.name.upper()
371 b64ed = encodebytes(hashy.digest())
372 cleaned = u(b64ed).strip().rstrip("=") # yes, OpenSSH does this too!
373 return f"{hash_name}:{cleaned}"
375 def get_base64(self):
376 """
377 Return a base64 string containing the public part of this key. Nothing
378 secret is revealed. This format is compatible with that used to store
379 public key files or recognized host keys.
381 :return: a base64 `string <str>` containing the public part of the key.
382 """
383 return u(encodebytes(self.asbytes())).replace("\n", "")
385 def sign_ssh_data(self, data, algorithm=None):
386 """
387 Sign a blob of data with this private key, and return a `.Message`
388 representing an SSH signature message.
390 :param bytes data:
391 the data to sign.
392 :param str algorithm:
393 the signature algorithm to use, if different from the key's
394 internal name. Default: ``None``.
395 :return: an SSH signature `message <.Message>`.
397 .. versionchanged:: 2.9
398 Added the ``algorithm`` kwarg.
399 """
400 return bytes()
402 def verify_ssh_sig(self, data, msg):
403 """
404 Given a blob of data, and an SSH message representing a signature of
405 that data, verify that it was signed with this key.
407 :param bytes data: the data that was signed.
408 :param .Message msg: an SSH signature message
409 :return:
410 ``True`` if the signature verifies correctly; ``False`` otherwise.
411 """
412 return False
414 @classmethod
415 def from_private_key_file(cls, filename, password=None):
416 """
417 Create a key object by reading a private key file. If the private
418 key is encrypted and ``password`` is not ``None``, the given password
419 will be used to decrypt the key (otherwise `.PasswordRequiredException`
420 is thrown). Through the magic of Python, this factory method will
421 exist in all subclasses of PKey (such as `.RSAKey` or `.DSSKey`), but
422 is useless on the abstract PKey class.
424 :param str filename: name of the file to read
425 :param str password:
426 an optional password to use to decrypt the key file, if it's
427 encrypted
428 :return: a new `.PKey` based on the given private key
430 :raises: ``IOError`` -- if there was an error reading the file
431 :raises: `.PasswordRequiredException` -- if the private key file is
432 encrypted, and ``password`` is ``None``
433 :raises: `.SSHException` -- if the key file is invalid
434 """
435 key = cls(filename=filename, password=password)
436 return key
438 @classmethod
439 def from_private_key(cls, file_obj, password=None):
440 """
441 Create a key object by reading a private key from a file (or file-like)
442 object. If the private key is encrypted and ``password`` is not
443 ``None``, the given password will be used to decrypt the key (otherwise
444 `.PasswordRequiredException` is thrown).
446 :param file_obj: the file-like object to read from
447 :param str password:
448 an optional password to use to decrypt the key, if it's encrypted
449 :return: a new `.PKey` based on the given private key
451 :raises: ``IOError`` -- if there was an error reading the key
452 :raises: `.PasswordRequiredException` --
453 if the private key file is encrypted, and ``password`` is ``None``
454 :raises: `.SSHException` -- if the key file is invalid
455 """
456 key = cls(file_obj=file_obj, password=password)
457 return key
459 def write_private_key_file(self, filename, password=None):
460 """
461 Write private key contents into a file. If the password is not
462 ``None``, the key is encrypted before writing.
464 :param str filename: name of the file to write
465 :param str password:
466 an optional password to use to encrypt the key file
468 :raises: ``IOError`` -- if there was an error writing the file
469 :raises: `.SSHException` -- if the key is invalid
470 """
471 raise Exception("Not implemented in PKey")
473 def write_private_key(self, file_obj, password=None):
474 """
475 Write private key contents into a file (or file-like) object. If the
476 password is not ``None``, the key is encrypted before writing.
478 :param file_obj: the file-like object to write into
479 :param str password: an optional password to use to encrypt the key
481 :raises: ``IOError`` -- if there was an error writing to the file
482 :raises: `.SSHException` -- if the key is invalid
483 """
484 # TODO 4.0: NotImplementedError (plus everywhere else in here)
485 raise Exception("Not implemented in PKey")
487 def _read_private_key_file(self, tag, filename, password=None):
488 """
489 Read an SSH2-format private key file, looking for a string of the type
490 ``"BEGIN xxx PRIVATE KEY"`` for some ``xxx``, base64-decode the text we
491 find, and return it as a string. If the private key is encrypted and
492 ``password`` is not ``None``, the given password will be used to
493 decrypt the key (otherwise `.PasswordRequiredException` is thrown).
495 :param str tag: ``"RSA"`` or ``"DSA"``, the tag used to mark the
496 data block.
497 :param str filename: name of the file to read.
498 :param str password:
499 an optional password to use to decrypt the key file, if it's
500 encrypted.
501 :return: the `bytes` that make up the private key.
503 :raises: ``IOError`` -- if there was an error reading the file.
504 :raises: `.PasswordRequiredException` -- if the private key file is
505 encrypted, and ``password`` is ``None``.
506 :raises: `.SSHException` -- if the key file is invalid.
507 """
508 with open(filename, "r") as f:
509 data = self._read_private_key(tag, f, password)
510 return data
512 def _read_private_key(self, tag, f, password=None):
513 lines = f.readlines()
514 if not lines:
515 raise SSHException("no lines in {} private key file".format(tag))
517 # find the BEGIN tag
518 start = 0
519 m = self.BEGIN_TAG.match(lines[start])
520 line_range = len(lines) - 1
521 while start < line_range and not m:
522 start += 1
523 m = self.BEGIN_TAG.match(lines[start])
524 start += 1
525 keytype = m.group(1) if m else None
526 if start >= len(lines) or keytype is None:
527 raise SSHException("not a valid {} private key file".format(tag))
529 # find the END tag
530 end = start
531 m = self.END_TAG.match(lines[end])
532 while end < line_range and not m:
533 end += 1
534 m = self.END_TAG.match(lines[end])
536 if keytype == tag:
537 data = self._read_private_key_pem(lines, end, password)
538 pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL
539 elif keytype == "OPENSSH":
540 data = self._read_private_key_openssh(lines[start:end], password)
541 pkformat = self._PRIVATE_KEY_FORMAT_OPENSSH
542 else:
543 raise SSHException(
544 "encountered {} key, expected {} key".format(keytype, tag)
545 )
547 return pkformat, data
549 def _got_bad_key_format_id(self, id_):
550 err = "{}._read_private_key() spat out an unknown key format id '{}'"
551 raise SSHException(err.format(self.__class__.__name__, id_))
553 def _read_private_key_pem(self, lines, end, password):
554 start = 0
555 # parse any headers first
556 headers = {}
557 start += 1
558 while start < len(lines):
559 line = lines[start].split(": ")
560 if len(line) == 1:
561 break
562 headers[line[0].lower()] = line[1].strip()
563 start += 1
564 # if we trudged to the end of the file, just try to cope.
565 try:
566 data = decodebytes(b("".join(lines[start:end])))
567 except base64.binascii.Error as e:
568 raise SSHException("base64 decoding error: {}".format(e))
569 if "proc-type" not in headers:
570 # unencryped: done
571 return data
572 # encrypted keyfile: will need a password
573 proc_type = headers["proc-type"]
574 if proc_type != "4,ENCRYPTED":
575 raise SSHException(
576 'Unknown private key structure "{}"'.format(proc_type)
577 )
578 try:
579 encryption_type, saltstr = headers["dek-info"].split(",")
580 except:
581 raise SSHException("Can't parse DEK-info in private key file")
582 if encryption_type not in self._CIPHER_TABLE:
583 raise SSHException(
584 'Unknown private key cipher "{}"'.format(encryption_type)
585 )
586 # if no password was passed in,
587 # raise an exception pointing out that we need one
588 if password is None:
589 raise PasswordRequiredException("Private key file is encrypted")
590 cipher = self._CIPHER_TABLE[encryption_type]["cipher"]
591 keysize = self._CIPHER_TABLE[encryption_type]["keysize"]
592 mode = self._CIPHER_TABLE[encryption_type]["mode"]
593 salt = unhexlify(b(saltstr))
594 key = util.generate_key_bytes(md5, salt, password, keysize)
595 decryptor = Cipher(
596 cipher(key), mode(salt), backend=default_backend()
597 ).decryptor()
598 decrypted_data = decryptor.update(data) + decryptor.finalize()
599 unpadder = padding.PKCS7(cipher.block_size).unpadder()
600 try:
601 return unpadder.update(decrypted_data) + unpadder.finalize()
602 except ValueError:
603 raise SSHException("Bad password or corrupt private key file")
605 def _read_private_key_openssh(self, lines, password):
606 """
607 Read the new OpenSSH SSH2 private key format available
608 since OpenSSH version 6.5
609 Reference:
610 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key
611 """
612 try:
613 data = decodebytes(b("".join(lines)))
614 except base64.binascii.Error as e:
615 raise SSHException("base64 decoding error: {}".format(e))
617 # read data struct
618 auth_magic = data[:15]
619 if auth_magic != OPENSSH_AUTH_MAGIC:
620 raise SSHException("unexpected OpenSSH key header encountered")
622 cstruct = self._uint32_cstruct_unpack(data[15:], "sssur")
623 cipher, kdfname, kdf_options, num_pubkeys, remainder = cstruct
624 # For now, just support 1 key.
625 if num_pubkeys > 1:
626 raise SSHException(
627 "unsupported: private keyfile has multiple keys"
628 )
629 pubkey, privkey_blob = self._uint32_cstruct_unpack(remainder, "ss")
631 if kdfname == b("bcrypt"):
632 if cipher == b("aes256-cbc"):
633 mode = modes.CBC
634 elif cipher == b("aes256-ctr"):
635 mode = modes.CTR
636 else:
637 raise SSHException(
638 "unknown cipher `{}` used in private key file".format(
639 cipher.decode("utf-8")
640 )
641 )
642 # Encrypted private key.
643 # If no password was passed in, raise an exception pointing
644 # out that we need one
645 if password is None:
646 raise PasswordRequiredException(
647 "private key file is encrypted"
648 )
650 # Unpack salt and rounds from kdfoptions
651 salt, rounds = self._uint32_cstruct_unpack(kdf_options, "su")
653 # run bcrypt kdf to derive key and iv/nonce (32 + 16 bytes)
654 key_iv = bcrypt.kdf(
655 b(password),
656 b(salt),
657 48,
658 rounds,
659 # We can't control how many rounds are on disk, so no sense
660 # warning about it.
661 ignore_few_rounds=True,
662 )
663 key = key_iv[:32]
664 iv = key_iv[32:]
666 # decrypt private key blob
667 decryptor = Cipher(
668 algorithms.AES(key), mode(iv), default_backend()
669 ).decryptor()
670 decrypted_privkey = decryptor.update(privkey_blob)
671 decrypted_privkey += decryptor.finalize()
672 elif cipher == b("none") and kdfname == b("none"):
673 # Unencrypted private key
674 decrypted_privkey = privkey_blob
675 else:
676 raise SSHException(
677 "unknown cipher or kdf used in private key file"
678 )
680 # Unpack private key and verify checkints
681 cstruct = self._uint32_cstruct_unpack(decrypted_privkey, "uusr")
682 checkint1, checkint2, keytype, keydata = cstruct
684 if checkint1 != checkint2:
685 raise SSHException(
686 "OpenSSH private key file checkints do not match"
687 )
689 return _unpad_openssh(keydata)
691 def _uint32_cstruct_unpack(self, data, strformat):
692 """
693 Used to read new OpenSSH private key format.
694 Unpacks a c data structure containing a mix of 32-bit uints and
695 variable length strings prefixed by 32-bit uint size field,
696 according to the specified format. Returns the unpacked vars
697 in a tuple.
698 Format strings:
699 s - denotes a string
700 i - denotes a long integer, encoded as a byte string
701 u - denotes a 32-bit unsigned integer
702 r - the remainder of the input string, returned as a string
703 """
704 arr = []
705 idx = 0
706 try:
707 for f in strformat:
708 if f == "s":
709 # string
710 s_size = struct.unpack(">L", data[idx : idx + 4])[0]
711 idx += 4
712 s = data[idx : idx + s_size]
713 idx += s_size
714 arr.append(s)
715 if f == "i":
716 # long integer
717 s_size = struct.unpack(">L", data[idx : idx + 4])[0]
718 idx += 4
719 s = data[idx : idx + s_size]
720 idx += s_size
721 i = util.inflate_long(s, True)
722 arr.append(i)
723 elif f == "u":
724 # 32-bit unsigned int
725 u = struct.unpack(">L", data[idx : idx + 4])[0]
726 idx += 4
727 arr.append(u)
728 elif f == "r":
729 # remainder as string
730 s = data[idx:]
731 arr.append(s)
732 break
733 except Exception as e:
734 # PKey-consuming code frequently wants to save-and-skip-over issues
735 # with loading keys, and uses SSHException as the (really friggin
736 # awful) signal for this. So for now...we do this.
737 raise SSHException(str(e))
738 return tuple(arr)
740 def _write_private_key_file(self, filename, key, format, password=None):
741 """
742 Write an SSH2-format private key file in a form that can be read by
743 paramiko or openssh. If no password is given, the key is written in
744 a trivially-encoded format (base64) which is completely insecure. If
745 a password is given, DES-EDE3-CBC is used.
747 :param str tag:
748 ``"RSA"`` or ``"DSA"``, the tag used to mark the data block.
749 :param filename: name of the file to write.
750 :param bytes data: data blob that makes up the private key.
751 :param str password: an optional password to use to encrypt the file.
753 :raises: ``IOError`` -- if there was an error writing the file.
754 """
755 # Ensure that we create new key files directly with a user-only mode,
756 # instead of opening, writing, then chmodding, which leaves us open to
757 # CVE-2022-24302.
758 with os.fdopen(
759 os.open(
760 filename,
761 # NOTE: O_TRUNC is a noop on new files, and O_CREAT is a noop
762 # on existing files, so using all 3 in both cases is fine.
763 flags=os.O_WRONLY | os.O_TRUNC | os.O_CREAT,
764 # Ditto the use of the 'mode' argument; it should be safe to
765 # give even for existing files (though it will not act like a
766 # chmod in that case).
767 mode=o600,
768 ),
769 # Yea, you still gotta inform the FLO that it is in "write" mode.
770 "w",
771 ) as f:
772 self._write_private_key(f, key, format, password=password)
774 def _write_private_key(self, f, key, format, password=None):
775 if password is None:
776 encryption = serialization.NoEncryption()
777 else:
778 encryption = serialization.BestAvailableEncryption(b(password))
780 f.write(
781 key.private_bytes(
782 serialization.Encoding.PEM, format, encryption
783 ).decode()
784 )
786 def _check_type_and_load_cert(self, msg, key_type, cert_type):
787 """
788 Perform message type-checking & optional certificate loading.
790 This includes fast-forwarding cert ``msg`` objects past the nonce, so
791 that the subsequent fields are the key numbers; thus the caller may
792 expect to treat the message as key material afterwards either way.
794 The obtained key type is returned for classes which need to know what
795 it was (e.g. ECDSA.)
796 """
797 # Normalization; most classes have a single key type and give a string,
798 # but eg ECDSA is a 1:N mapping.
799 key_types = key_type
800 cert_types = cert_type
801 if isinstance(key_type, str):
802 key_types = [key_types]
803 if isinstance(cert_types, str):
804 cert_types = [cert_types]
805 # Can't do much with no message, that should've been handled elsewhere
806 if msg is None:
807 raise SSHException("Key object may not be empty")
808 # First field is always key type, in either kind of object. (make sure
809 # we rewind before grabbing it - sometimes caller had to do their own
810 # introspection first!)
811 msg.rewind()
812 type_ = msg.get_text()
813 # Regular public key - nothing special to do besides the implicit
814 # type check.
815 if type_ in key_types:
816 pass
817 # OpenSSH-compatible certificate - store full copy as .public_blob
818 # (so signing works correctly) and then fast-forward past the
819 # nonce.
820 elif type_ in cert_types:
821 # This seems the cleanest way to 'clone' an already-being-read
822 # message; they're *IO objects at heart and their .getvalue()
823 # always returns the full value regardless of pointer position.
824 self.load_certificate(Message(msg.asbytes()))
825 # Read out nonce as it comes before the public numbers - our caller
826 # is likely going to use the (only borrowed by us, not owned)
827 # 'msg' object for loading those numbers right after this.
828 # TODO: usefully interpret it & other non-public-number fields
829 # (requires going back into per-type subclasses.)
830 msg.get_string()
831 else:
832 err = "Invalid key (class: {}, data type: {}"
833 raise SSHException(err.format(self.__class__.__name__, type_))
835 def load_certificate(self, value):
836 """
837 Supplement the private key contents with data loaded from an OpenSSH
838 public key (``.pub``) or certificate (``-cert.pub``) file, a string
839 containing such a file, or a `.Message` object.
841 The .pub contents adds no real value, since the private key
842 file includes sufficient information to derive the public
843 key info. For certificates, however, this can be used on
844 the client side to offer authentication requests to the server
845 based on certificate instead of raw public key.
847 See:
848 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys
850 Note: very little effort is made to validate the certificate contents,
851 that is for the server to decide if it is good enough to authenticate
852 successfully.
853 """
854 if isinstance(value, Message):
855 constructor = "from_message"
856 elif os.path.isfile(value):
857 constructor = "from_file"
858 else:
859 constructor = "from_string"
860 blob = getattr(PublicBlob, constructor)(value)
861 if not blob.key_type.startswith(self.get_name()):
862 err = "PublicBlob type {} incompatible with key type {}"
863 raise ValueError(err.format(blob.key_type, self.get_name()))
864 self.public_blob = blob
867# General construct for an OpenSSH style Public Key blob
868# readable from a one-line file of the format:
869# <key-name> <base64-blob> [<comment>]
870# Of little value in the case of standard public keys
871# {ssh-rsa, ssh-dss, ssh-ecdsa, ssh-ed25519}, but should
872# provide rudimentary support for {*-cert.v01}
873class PublicBlob:
874 """
875 OpenSSH plain public key or OpenSSH signed public key (certificate).
877 Tries to be as dumb as possible and barely cares about specific
878 per-key-type data.
880 .. note::
882 Most of the time you'll want to call `from_file`, `from_string` or
883 `from_message` for useful instantiation, the main constructor is
884 basically "I should be using ``attrs`` for this."
885 """
887 def __init__(self, type_, blob, comment=None):
888 """
889 Create a new public blob of given type and contents.
891 :param str type_: Type indicator, eg ``ssh-rsa``.
892 :param bytes blob: The blob bytes themselves.
893 :param str comment: A comment, if one was given (e.g. file-based.)
894 """
895 self.key_type = type_
896 self.key_blob = blob
897 self.comment = comment
899 @classmethod
900 def from_file(cls, filename):
901 """
902 Create a public blob from a ``-cert.pub``-style file on disk.
903 """
904 with open(filename) as f:
905 string = f.read()
906 return cls.from_string(string)
908 @classmethod
909 def from_string(cls, string):
910 """
911 Create a public blob from a ``-cert.pub``-style string.
912 """
913 fields = string.split(None, 2)
914 if len(fields) < 2:
915 msg = "Not enough fields for public blob: {}"
916 raise ValueError(msg.format(fields))
917 key_type = fields[0]
918 key_blob = decodebytes(b(fields[1]))
919 try:
920 comment = fields[2].strip()
921 except IndexError:
922 comment = None
923 # Verify that the blob message first (string) field matches the
924 # key_type
925 m = Message(key_blob)
926 blob_type = m.get_text()
927 if blob_type != key_type:
928 deets = "key type={!r}, but blob type={!r}".format(
929 key_type, blob_type
930 )
931 raise ValueError("Invalid PublicBlob contents: {}".format(deets))
932 # All good? All good.
933 return cls(type_=key_type, blob=key_blob, comment=comment)
935 @classmethod
936 def from_message(cls, message):
937 """
938 Create a public blob from a network `.Message`.
940 Specifically, a cert-bearing pubkey auth packet, because by definition
941 OpenSSH-style certificates 'are' their own network representation."
942 """
943 type_ = message.get_text()
944 return cls(type_=type_, blob=message.asbytes())
946 def __str__(self):
947 ret = "{} public key/certificate".format(self.key_type)
948 if self.comment:
949 ret += "- {}".format(self.comment)
950 return ret
952 def __eq__(self, other):
953 # Just piggyback on Message/BytesIO, since both of these should be one.
954 return self and other and self.key_blob == other.key_blob
956 def __ne__(self, other):
957 return not self == other