Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/pkey.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20Common API for all public keys.
21"""
23import base64
24import os
25import re
26import struct
27from base64 import decodebytes, encodebytes
28from binascii import unhexlify
29from hashlib import md5, sha256
30from io import RawIOBase
31from pathlib import Path
32from typing import NamedTuple, Optional, Union
34import bcrypt
35from cryptography.hazmat.backends import default_backend
36from cryptography.hazmat.primitives import asymmetric, padding, serialization
37from cryptography.hazmat.primitives.asymmetric.ec import (
38 EllipticCurvePrivateKey,
39)
40from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
41from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
42from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
44from paramiko import util
45from paramiko.common import o600
46from paramiko.message import Message
47from paramiko.ssh_exception import PasswordRequiredException, SSHException
48from paramiko.util import b, u
50# TripleDES is moving from `cryptography.hazmat.primitives.ciphers.algorithms`
51# in cryptography>=43.0.0 to `cryptography.hazmat.decrepit.ciphers.algorithms`
52# It will be removed from `cryptography.hazmat.primitives.ciphers.algorithms`
53# in cryptography==48.0.0.
54#
55# Source References:
56# - https://github.com/pyca/cryptography/commit/722a6393e61b3ac
57# - https://github.com/pyca/cryptography/pull/11407/files
58try:
59 from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES
60except ImportError:
61 from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES
64OPENSSH_AUTH_MAGIC = b"openssh-key-v1\x00"
67def _unpad_openssh(data):
68 # At the moment, this is only used for unpadding private keys on disk. This
69 # really ought to be made constant time (possibly by upstreaming this logic
70 # into pyca/cryptography).
71 padding_length = data[-1]
72 if 0x20 <= padding_length < 0x7F:
73 return data # no padding, last byte part comment (printable ascii)
74 if padding_length > 15:
75 raise SSHException("Invalid key")
76 for i in range(padding_length):
77 if data[i - padding_length] != i + 1:
78 raise SSHException("Invalid key")
79 return data[:-padding_length]
82class UnknownKeyType(Exception):
83 """
84 An unknown public/private key algorithm was attempted to be read.
85 """
87 def __init__(self, key_type=None, key_bytes=None):
88 self.key_type = key_type
89 self.key_bytes = key_bytes
91 def __str__(self):
92 return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa
95class FileFormat(NamedTuple):
96 format: serialization.PrivateFormat
97 encoding: serialization.Encoding
100# While these have no apparent interface/protocol within Cryptography, all can
101# be duck typed as eg "having .private_bytes", which we have always relied upon
102# implicitly since the switch to this library.
103PrivateKey = Union[RSAPrivateKey, EllipticCurvePrivateKey, Ed25519PrivateKey]
105# NOTE: considered making these part of an Enum but that was a bit
106# annoying/fussy, so this is an okay middle ground?
107PEM = FileFormat(
108 format=serialization.PrivateFormat.TraditionalOpenSSL,
109 encoding=serialization.Encoding.PEM,
110)
111OPENSSH = FileFormat(
112 format=serialization.PrivateFormat.OpenSSH,
113 encoding=serialization.Encoding.PEM,
114)
115# TODO: others as desired?
118class PKey:
119 """
120 Base class for public keys.
122 Also includes some "meta" level convenience constructors such as
123 `.from_type_string`.
124 """
126 # known encryption types for private key files:
127 _CIPHER_TABLE = {
128 "AES-128-CBC": {
129 "cipher": algorithms.AES,
130 "keysize": 16,
131 "blocksize": 16,
132 "mode": modes.CBC,
133 },
134 "AES-256-CBC": {
135 "cipher": algorithms.AES,
136 "keysize": 32,
137 "blocksize": 16,
138 "mode": modes.CBC,
139 },
140 "DES-EDE3-CBC": {
141 "cipher": TripleDES,
142 "keysize": 24,
143 "blocksize": 8,
144 "mode": modes.CBC,
145 },
146 }
147 _PRIVATE_KEY_FORMAT_ORIGINAL = 1
148 _PRIVATE_KEY_FORMAT_OPENSSH = 2
149 BEGIN_TAG = re.compile(r"^-{5}BEGIN (RSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$")
150 END_TAG = re.compile(r"^-{5}END (RSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$")
152 @staticmethod
153 def from_path(path, password: Optional[str] = None):
154 """
155 Attempt to instantiate appropriate key subclass from given file path.
157 :param pathlib.Path path: The path to load (may also be a `str`).
158 :param str password: Optional decryption password.
160 :returns:
161 A `PKey` subclass instance.
163 :raises:
164 `UnknownKeyType`, if our crypto backend doesn't know this key type.
166 .. versionadded:: 3.2
167 .. versionchanged:: 5.0
168 Renamed ``passphrase`` argument to ``password`` for consistency
169 with older methods.
170 """
172 # Lazy import to avoid circular import issues
173 from paramiko import ECDSAKey, Ed25519Key, RSAKey
175 # Normalize to string, as cert suffix isn't quite an extension, so
176 # pathlib isn't useful for this.
177 path = str(path)
179 # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API
180 # /either/ the key /or/ the cert, when there is a key/cert pair.
181 cert_suffix = "-cert.pub"
182 if str(path).endswith(cert_suffix):
183 key_path = path[: -len(cert_suffix)]
184 cert_path = path
185 else:
186 key_path = path
187 cert_path = path + cert_suffix
189 key_path = Path(key_path).expanduser()
190 cert_path = Path(cert_path).expanduser()
192 data = key_path.read_bytes()
193 # Like OpenSSH, try modern/OpenSSH-specific key load first
194 try:
195 loaded = serialization.load_ssh_private_key(
196 data=data, password=password
197 )
198 # Then fall back to assuming legacy PEM type
199 except ValueError:
200 loaded = serialization.load_pem_private_key(
201 data=data, password=password
202 )
203 # TODO Python 3.10: match statement? (NOTE: we cannot use a dict
204 # because the results from the loader are literal backend, eg openssl,
205 # private classes, so isinstance tests work but exact 'x class is y'
206 # tests will not work)
207 # TODO: leverage already-parsed/math'd obj to avoid duplicate cpu
208 # cycles? seemingly requires most of our key subclasses to be rewritten
209 # to be cryptography-object-forward. this is still likely faster than
210 # the old SSHClient code that just tried instantiating every class!
211 key_class = None
212 if isinstance(loaded, asymmetric.rsa.RSAPrivateKey):
213 key_class = RSAKey
214 elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey):
215 key_class = Ed25519Key
216 elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey):
217 key_class = ECDSAKey
218 else:
219 raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__)
220 with key_path.open() as fd:
221 key = key_class.from_private_key(fd, password=password)
222 if cert_path.exists():
223 # load_certificate can take Message, path-str, or value-str
224 key.load_certificate(str(cert_path))
225 return key
227 @staticmethod
228 def from_type_string(
229 key_type: str, key_bytes: bytes, password: Optional[str] = None
230 ):
231 """
232 Given type `str` & raw `bytes`, return a `PKey` subclass instance.
234 For example, ``PKey.from_type_string("ssh-ed25519", <public bytes>)``
235 will (if successful) return a new `.Ed25519Key`.
237 :param str key_type:
238 The key type, eg ``"ssh-ed25519"``.
239 :param bytes key_bytes:
240 The raw byte data forming the key material, as expected by
241 subclasses' ``data`` parameter.
242 :param str password:
243 Optional password used to decrypt ``key_bytes``.
245 :returns:
246 A `PKey` subclass instance.
248 :raises:
249 `UnknownKeyType`, if no registered classes knew about this type.
251 .. versionadded:: 3.2
252 .. versionchanged:: 5.0
253 Added the ``password`` kwarg.
254 """
255 from paramiko import key_classes
257 for key_class in key_classes:
258 if key_type in key_class.identifiers():
259 return key_class(data=key_bytes, password=password)
260 raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes)
262 @classmethod
263 def identifiers(cls):
264 """
265 returns an iterable of key format/name strings this class can handle.
267 Most classes only have a single identifier, and thus this default
268 implementation suffices; see `.ECDSAKey` for one example of an
269 override.
270 """
271 return [cls.name]
273 # TODO (backwards incompat): make this and subclasses consistent, some of
274 # our own classmethods even assume kwargs we don't define!
275 # TODO (backwards incompat): prob also raise NotImplementedError instead of
276 # pass'ing; the contract is pretty obviously that you need to handle
277 # msg/data/filename appropriately. (If 'pass' is a concession to testing,
278 # see about doing the work to fix the tests instead)
279 def __init__(self, msg=None, data=None):
280 """
281 Create a new instance of this public key type. If ``msg`` is given,
282 the key's public part(s) will be filled in from the message. If
283 ``data`` is given, the key's public part(s) will be filled in from
284 the string.
286 :param .Message msg:
287 an optional SSH `.Message` containing a public key of this type.
288 :param bytes data:
289 optional, the bytes of a public key of this type
291 :raises: `.SSHException` --
292 if a key cannot be created from the ``data`` or ``msg`` given, or
293 no key was passed in.
294 """
295 pass
297 # TODO: arguably this might want to be __str__ instead? ehh
298 # TODO: ditto the interplay between showing class name (currently we just
299 # say PKey writ large) and algorithm (usually == class name, but not
300 # always, also sometimes shows certificate-ness)
301 # TODO: if we do change it, we also want to tweak eg AgentKey, as it
302 # currently displays agent-ness with a suffix
303 def __repr__(self):
304 comment = ""
305 # Works for AgentKey, may work for others?
306 if hasattr(self, "comment") and self.comment:
307 comment = f", comment={self.comment!r}"
308 return f"PKey(alg={self.algorithm_name}, bits={self.get_bits()}, fp={self.fingerprint}{comment})" # noqa
310 # TODO (backwards incompat): just merge into __bytes__ (everywhere)
311 def asbytes(self):
312 """
313 Return a string of an SSH `.Message` made up of the public part(s) of
314 this key. This string is suitable for passing to `__init__` to
315 re-create the key object later.
316 """
317 return bytes()
319 def __bytes__(self):
320 return self.asbytes()
322 def __eq__(self, other):
323 return isinstance(other, PKey) and self._fields == other._fields
325 def __hash__(self):
326 return hash(self._fields)
328 @property
329 def _fields(self):
330 raise NotImplementedError
332 def get_name(self):
333 """
334 Return the name of this private key implementation.
336 :return:
337 name of this private key type, in SSH terminology, as a `str` (for
338 example, ``"ssh-rsa"``).
339 """
340 return ""
342 @property
343 def algorithm_name(self):
344 """
345 Return the key algorithm identifier for this key.
347 Similar to `get_name`, but aimed at pure algorithm name instead of SSH
348 protocol field value.
349 """
350 # Nuke the leading 'ssh-'
351 # TODO in Python 3.9: use .removeprefix()
352 name = self.get_name().replace("ssh-", "")
353 # Trim any cert suffix (but leave the -cert, as OpenSSH does)
354 cert_tail = "-cert-v01@openssh.com"
355 if cert_tail in name:
356 name = name.replace(cert_tail, "-cert")
357 # Nuke any eg ECDSA suffix, OpenSSH does basically this too.
358 else:
359 name = name.split("-")[0]
360 return name.upper()
362 def get_bits(self):
363 """
364 Return the number of significant bits in this key. This is useful
365 for judging the relative security of a key.
367 :return: bits in the key (as an `int`)
368 """
369 # TODO (backwards incompat): raise NotImplementedError, 0 is unlikely
370 # to ever be _correct_ and nothing in the critical path seems to use
371 # this.
372 return 0
374 def can_sign(self):
375 """
376 Return ``True`` if this key has the private part necessary for signing
377 data.
378 """
379 return False
381 def get_fingerprint(self):
382 """
383 Return an MD5 fingerprint of the public part of this key. Nothing
384 secret is revealed.
386 :return:
387 a 16-byte `string <str>` (binary) of the MD5 fingerprint, in SSH
388 format.
389 """
390 return md5(self.asbytes()).digest()
392 @property
393 def fingerprint(self):
394 """
395 Modern fingerprint property designed to be comparable to OpenSSH.
397 Currently only does SHA256 (the OpenSSH default).
399 .. versionadded:: 3.2
400 """
401 hashy = sha256(bytes(self))
402 hash_name = hashy.name.upper()
403 b64ed = encodebytes(hashy.digest())
404 cleaned = u(b64ed).strip().rstrip("=") # yes, OpenSSH does this too!
405 return f"{hash_name}:{cleaned}"
407 def get_base64(self):
408 """
409 Return a base64 string containing the public part of this key. Nothing
410 secret is revealed. This format is compatible with that used to store
411 public key files or recognized host keys.
413 :return: a base64 `string <str>` containing the public part of the key.
414 """
415 return u(encodebytes(self.asbytes())).replace("\n", "")
417 def sign_ssh_data(self, data, algorithm=None):
418 """
419 Sign a blob of data with this private key, and return a `.Message`
420 representing an SSH signature message.
422 :param bytes data:
423 the data to sign.
424 :param str algorithm:
425 the signature algorithm to use, if different from the key's
426 internal name. Default: ``None``.
427 :return: an SSH signature `message <.Message>`.
429 .. versionchanged:: 2.9
430 Added the ``algorithm`` kwarg.
431 """
432 return bytes()
434 def verify_ssh_sig(self, data, msg):
435 """
436 Given a blob of data, and an SSH message representing a signature of
437 that data, verify that it was signed with this key.
439 :param bytes data: the data that was signed.
440 :param .Message msg: an SSH signature message
441 :return:
442 ``True`` if the signature verifies correctly; ``False`` otherwise.
443 """
444 return False
446 @classmethod
447 def from_private_key_file(cls, filename, password=None):
448 """
449 Create a key object by reading a private key file. If the private
450 key is encrypted and ``password`` is not ``None``, the given password
451 will be used to decrypt the key (otherwise `.PasswordRequiredException`
452 is thrown). Through the magic of Python, this factory method will
453 exist in all subclasses of PKey (such as `.RSAKey`), but
454 is useless on the abstract PKey class.
456 :param str filename: name of the file to read
457 :param str password:
458 an optional password to use to decrypt the key file, if it's
459 encrypted
460 :return: a new `.PKey` based on the given private key
462 :raises: ``IOError`` -- if there was an error reading the file
463 :raises: `.PasswordRequiredException` -- if the private key file is
464 encrypted, and ``password`` is ``None``
465 :raises: `.SSHException` -- if the key file is invalid
466 """
467 key = cls(filename=filename, password=password)
468 return key
470 @classmethod
471 def from_private_key(cls, file_obj, password=None):
472 """
473 Create a key object by reading a private key from a file (or file-like)
474 object. If the private key is encrypted and ``password`` is not
475 ``None``, the given password will be used to decrypt the key (otherwise
476 `.PasswordRequiredException` is thrown).
478 :param file_obj: the file-like object to read from
479 :param str password:
480 an optional password to use to decrypt the key, if it's encrypted
481 :return: a new `.PKey` based on the given private key
483 :raises: ``IOError`` -- if there was an error reading the key
484 :raises: `.PasswordRequiredException` --
485 if the private key file is encrypted, and ``password`` is ``None``
486 :raises: `.SSHException` -- if the key file is invalid
487 """
488 key = cls(file_obj=file_obj, password=password)
489 return key
491 def write_private_key_file(
492 self,
493 filename: str,
494 password: Optional[str] = None,
495 file_format: FileFormat = PEM,
496 ):
497 """
498 Write private key contents into a file. If the password is not
499 ``None``, the key is encrypted before writing.
501 :param str filename: name of the file to write
502 :param str password:
503 an optional password to use to encrypt the key file
504 :param FileFormat file_format:
505 what format+encoding pair to use; defaults to the original
506 behavior, namely PEM.
508 :raises: ``IOError`` -- if there was an error writing the file
509 :raises: `.SSHException` -- if the key is invalid
510 """
511 self._write_private_key_file(
512 filename,
513 self.private_key,
514 file_format=file_format,
515 password=password,
516 )
518 def write_private_key(
519 self,
520 file_obj: RawIOBase,
521 password: Optional[str] = None,
522 file_format: FileFormat = PEM,
523 ):
524 """
525 Write private key contents into a file (or file-like) object. If the
526 password is not ``None``, the key is encrypted before writing.
528 :param file_obj: the file-like object to write into
529 :param str password: an optional password to use to encrypt the key
530 :param FileFormat file_format:
531 what format+encoding pair to use; defaults to the original
532 behavior, namely PEM.
534 :raises: ``IOError`` -- if there was an error writing to the file
535 :raises: `.SSHException` -- if the key is invalid
536 """
537 self._write_private_key(
538 file_obj,
539 self.private_key,
540 file_format=file_format,
541 password=password,
542 )
544 def _read_private_key_file(self, tag, filename, password=None):
545 """
546 Read an SSH2-format private key file, looking for a string of the type
547 ``"BEGIN xxx PRIVATE KEY"`` for some ``xxx``, base64-decode the text we
548 find, and return it as a string. If the private key is encrypted and
549 ``password`` is not ``None``, the given password will be used to
550 decrypt the key (otherwise `.PasswordRequiredException` is thrown).
552 :param str tag:
553 ``"RSA"`` (or etc), the tag used to mark the data block.
554 :param str filename:
555 name of the file to read.
556 :param str password:
557 an optional password to use to decrypt the key file, if it's
558 encrypted.
559 :return:
560 the `bytes` that make up the private key.
562 :raises: ``IOError`` -- if there was an error reading the file.
563 :raises: `.PasswordRequiredException` -- if the private key file is
564 encrypted, and ``password`` is ``None``.
565 :raises: `.SSHException` -- if the key file is invalid.
566 """
567 with open(filename, "r") as f:
568 data = self._read_private_key(tag, f, password)
569 return data
571 def _read_private_key(self, tag, f, password=None):
572 lines = f.readlines()
573 if not lines:
574 raise SSHException("no lines in {} private key file".format(tag))
576 # find the BEGIN tag
577 start = 0
578 m = self.BEGIN_TAG.match(lines[start])
579 line_range = len(lines) - 1
580 while start < line_range and not m:
581 start += 1
582 m = self.BEGIN_TAG.match(lines[start])
583 start += 1
584 keytype = m.group(1) if m else None
585 if start >= len(lines) or keytype is None:
586 raise SSHException("not a valid {} private key file".format(tag))
588 # find the END tag
589 end = start
590 m = self.END_TAG.match(lines[end])
591 while end < line_range and not m:
592 end += 1
593 m = self.END_TAG.match(lines[end])
595 if keytype == tag:
596 data = self._read_private_key_pem(lines, end, password)
597 pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL
598 elif keytype == "OPENSSH":
599 data = self._read_private_key_openssh(lines[start:end], password)
600 pkformat = self._PRIVATE_KEY_FORMAT_OPENSSH
601 else:
602 raise SSHException(
603 "encountered {} key, expected {} key".format(keytype, tag)
604 )
606 return pkformat, data
608 def _got_bad_key_format_id(self, id_):
609 err = "{}._read_private_key() spat out an unknown key format id '{}'"
610 raise SSHException(err.format(self.__class__.__name__, id_))
612 def _read_private_key_pem(self, lines, end, password):
613 start = 0
614 # parse any headers first
615 headers = {}
616 start += 1
617 while start < len(lines):
618 line = lines[start].split(": ")
619 if len(line) == 1:
620 break
621 headers[line[0].lower()] = line[1].strip()
622 start += 1
623 # if we trudged to the end of the file, just try to cope.
624 try:
625 data = decodebytes(b("".join(lines[start:end])))
626 except base64.binascii.Error as e:
627 raise SSHException("base64 decoding error: {}".format(e))
628 if "proc-type" not in headers:
629 # unencryped: done
630 return data
631 # encrypted keyfile: will need a password
632 proc_type = headers["proc-type"]
633 if proc_type != "4,ENCRYPTED":
634 raise SSHException(
635 'Unknown private key structure "{}"'.format(proc_type)
636 )
637 try:
638 encryption_type, saltstr = headers["dek-info"].split(",")
639 except:
640 raise SSHException("Can't parse DEK-info in private key file")
641 if encryption_type not in self._CIPHER_TABLE:
642 raise SSHException(
643 'Unknown private key cipher "{}"'.format(encryption_type)
644 )
645 # if no password was passed in,
646 # raise an exception pointing out that we need one
647 if password is None:
648 raise PasswordRequiredException("Private key file is encrypted")
649 cipher = self._CIPHER_TABLE[encryption_type]["cipher"]
650 keysize = self._CIPHER_TABLE[encryption_type]["keysize"]
651 mode = self._CIPHER_TABLE[encryption_type]["mode"]
652 salt = unhexlify(b(saltstr))
653 key = util.generate_key_bytes(md5, salt, password, keysize)
654 decryptor = Cipher(
655 cipher(key), mode(salt), backend=default_backend()
656 ).decryptor()
657 decrypted_data = decryptor.update(data) + decryptor.finalize()
658 unpadder = padding.PKCS7(cipher.block_size).unpadder()
659 try:
660 return unpadder.update(decrypted_data) + unpadder.finalize()
661 except ValueError:
662 raise SSHException("Bad password or corrupt private key file")
664 def _read_private_key_openssh(self, lines, password):
665 """
666 Read the new OpenSSH SSH2 private key format available
667 since OpenSSH version 6.5
668 Reference:
669 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key
670 """
671 try:
672 data = decodebytes(b("".join(lines)))
673 except base64.binascii.Error as e:
674 raise SSHException("base64 decoding error: {}".format(e))
676 # read data struct
677 auth_magic = data[:15]
678 if auth_magic != OPENSSH_AUTH_MAGIC:
679 raise SSHException("unexpected OpenSSH key header encountered")
681 cstruct = self._uint32_cstruct_unpack(data[15:], "sssur")
682 cipher, kdfname, kdf_options, num_pubkeys, remainder = cstruct
683 # For now, just support 1 key.
684 if num_pubkeys > 1:
685 raise SSHException(
686 "unsupported: private keyfile has multiple keys"
687 )
688 pubkey, privkey_blob = self._uint32_cstruct_unpack(remainder, "ss")
690 if kdfname == b("bcrypt"):
691 if cipher == b("aes256-cbc"):
692 mode = modes.CBC
693 elif cipher == b("aes256-ctr"):
694 mode = modes.CTR
695 else:
696 raise SSHException(
697 "unknown cipher `{}` used in private key file".format(
698 cipher.decode("utf-8")
699 )
700 )
701 # Encrypted private key.
702 # If no password was passed in, raise an exception pointing
703 # out that we need one
704 if password is None:
705 raise PasswordRequiredException(
706 "private key file is encrypted"
707 )
709 # Unpack salt and rounds from kdfoptions
710 salt, rounds = self._uint32_cstruct_unpack(kdf_options, "su")
712 # run bcrypt kdf to derive key and iv/nonce (32 + 16 bytes)
713 key_iv = bcrypt.kdf(
714 b(password),
715 b(salt),
716 48,
717 rounds,
718 # We can't control how many rounds are on disk, so no sense
719 # warning about it.
720 ignore_few_rounds=True,
721 )
722 key = key_iv[:32]
723 iv = key_iv[32:]
725 # decrypt private key blob
726 decryptor = Cipher(
727 algorithms.AES(key), mode(iv), default_backend()
728 ).decryptor()
729 decrypted_privkey = decryptor.update(privkey_blob)
730 decrypted_privkey += decryptor.finalize()
731 elif cipher == b("none") and kdfname == b("none"):
732 # Unencrypted private key
733 decrypted_privkey = privkey_blob
734 else:
735 raise SSHException(
736 "unknown cipher or kdf used in private key file"
737 )
739 # Unpack private key and verify checkints
740 cstruct = self._uint32_cstruct_unpack(decrypted_privkey, "uusr")
741 checkint1, checkint2, keytype, keydata = cstruct
743 if checkint1 != checkint2:
744 raise SSHException(
745 "OpenSSH private key file checkints do not match"
746 )
748 return _unpad_openssh(keydata)
750 def _uint32_cstruct_unpack(self, data, strformat):
751 """
752 Used to read new OpenSSH private key format.
753 Unpacks a c data structure containing a mix of 32-bit uints and
754 variable length strings prefixed by 32-bit uint size field,
755 according to the specified format. Returns the unpacked vars
756 in a tuple.
757 Format strings:
758 s - denotes a string
759 i - denotes a long integer, encoded as a byte string
760 u - denotes a 32-bit unsigned integer
761 r - the remainder of the input string, returned as a string
762 """
763 arr = []
764 idx = 0
765 try:
766 for f in strformat:
767 if f == "s":
768 # string
769 s_size = struct.unpack(">L", data[idx : idx + 4])[0]
770 idx += 4
771 s = data[idx : idx + s_size]
772 idx += s_size
773 arr.append(s)
774 if f == "i":
775 # long integer
776 s_size = struct.unpack(">L", data[idx : idx + 4])[0]
777 idx += 4
778 s = data[idx : idx + s_size]
779 idx += s_size
780 i = util.inflate_long(s, True)
781 arr.append(i)
782 elif f == "u":
783 # 32-bit unsigned int
784 u = struct.unpack(">L", data[idx : idx + 4])[0]
785 idx += 4
786 arr.append(u)
787 elif f == "r":
788 # remainder as string
789 s = data[idx:]
790 arr.append(s)
791 break
792 except Exception as e:
793 # PKey-consuming code frequently wants to save-and-skip-over issues
794 # with loading keys, and uses SSHException as the (really friggin
795 # awful) signal for this. So for now...we do this.
796 raise SSHException(str(e))
797 return tuple(arr)
799 def _write_private_key_file(
800 self,
801 filename: str,
802 key: PrivateKey,
803 file_format: FileFormat,
804 password: Optional[str] = None,
805 ):
806 # Ensure that we create new key files directly with a user-only mode,
807 # instead of opening, writing, then chmodding, which leaves us open to
808 # CVE-2022-24302.
809 with os.fdopen(
810 os.open(
811 filename,
812 # NOTE: O_TRUNC is a noop on new files, and O_CREAT is a noop
813 # on existing files, so using all 3 in both cases is fine.
814 flags=os.O_WRONLY | os.O_TRUNC | os.O_CREAT,
815 # Ditto the use of the 'mode' argument; it should be safe to
816 # give even for existing files (though it will not act like a
817 # chmod in that case).
818 mode=o600,
819 ),
820 # Yea, you still gotta inform the FLO that it is in "write" mode.
821 "w",
822 ) as f:
823 self._write_private_key(f, key, file_format, password=password)
825 def _write_private_key(
826 self,
827 f: RawIOBase,
828 key: PrivateKey,
829 file_format: FileFormat,
830 password: Optional[str] = None,
831 ):
832 if password is None:
833 encryption = serialization.NoEncryption()
834 else:
835 encryption = serialization.BestAvailableEncryption(b(password))
837 f.write(
838 key.private_bytes(
839 file_format.encoding, file_format.format, encryption
840 ).decode()
841 )
843 def _check_type_and_load_cert(self, msg, key_type, cert_type):
844 """
845 Perform message type-checking & optional certificate loading.
847 This includes fast-forwarding cert ``msg`` objects past the nonce, so
848 that the subsequent fields are the key numbers; thus the caller may
849 expect to treat the message as key material afterwards either way.
851 The obtained key type is returned for classes which need to know what
852 it was (e.g. ECDSA.)
853 """
854 # Normalization; most classes have a single key type and give a string,
855 # but eg ECDSA is a 1:N mapping.
856 key_types = key_type
857 cert_types = cert_type
858 if isinstance(key_type, str):
859 key_types = [key_types]
860 if isinstance(cert_types, str):
861 cert_types = [cert_types]
862 # Can't do much with no message, that should've been handled elsewhere
863 if msg is None:
864 raise SSHException("Key object may not be empty")
865 # First field is always key type, in either kind of object. (make sure
866 # we rewind before grabbing it - sometimes caller had to do their own
867 # introspection first!)
868 msg.rewind()
869 type_ = msg.get_text()
870 # Regular public key - nothing special to do besides the implicit
871 # type check.
872 if type_ in key_types:
873 pass
874 # OpenSSH-compatible certificate - store full copy as .public_blob
875 # (so signing works correctly) and then fast-forward past the
876 # nonce.
877 elif type_ in cert_types:
878 # This seems the cleanest way to 'clone' an already-being-read
879 # message; they're *IO objects at heart and their .getvalue()
880 # always returns the full value regardless of pointer position.
881 self.load_certificate(Message(msg.asbytes()))
882 # Read out nonce as it comes before the public numbers - our caller
883 # is likely going to use the (only borrowed by us, not owned)
884 # 'msg' object for loading those numbers right after this.
885 # TODO: usefully interpret it & other non-public-number fields
886 # (requires going back into per-type subclasses.)
887 msg.get_string()
888 else:
889 err = "Invalid key (class: {}, data type: {}"
890 raise SSHException(err.format(self.__class__.__name__, type_))
892 def load_certificate(self, value):
893 """
894 Supplement the private key contents with data loaded from an OpenSSH
895 public key (``.pub``) or certificate (``-cert.pub``) file, a string
896 containing such a file, or a `.Message` object.
898 The .pub contents adds no real value, since the private key
899 file includes sufficient information to derive the public
900 key info. For certificates, however, this can be used on
901 the client side to offer authentication requests to the server
902 based on certificate instead of raw public key.
904 See:
905 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys
907 Note: very little effort is made to validate the certificate contents,
908 that is for the server to decide if it is good enough to authenticate
909 successfully.
910 """
911 if isinstance(value, Message):
912 constructor = "from_message"
913 elif os.path.isfile(value):
914 constructor = "from_file"
915 else:
916 constructor = "from_string"
917 blob = getattr(PublicBlob, constructor)(value)
918 if not blob.key_type.startswith(self.get_name()):
919 err = "PublicBlob type {} incompatible with key type {}"
920 raise ValueError(err.format(blob.key_type, self.get_name()))
921 self.public_blob = blob
924# General construct for an OpenSSH style Public Key blob
925# readable from a one-line file of the format:
926# <key-name> <base64-blob> [<comment>]
927# Of little value in the case of standard public keys
928# {ssh-rsa, ssh-ecdsa, ssh-ed25519}, but should
929# provide rudimentary support for {*-cert.v01}
930class PublicBlob:
931 """
932 OpenSSH plain public key or OpenSSH signed public key (certificate).
934 Tries to be as dumb as possible and barely cares about specific
935 per-key-type data.
937 .. note::
939 Most of the time you'll want to call `from_file`, `from_string` or
940 `from_message` for useful instantiation, the main constructor is
941 basically "I should be using ``attrs`` for this."
942 """
944 def __init__(self, type_, blob, comment=None):
945 """
946 Create a new public blob of given type and contents.
948 :param str type_: Type indicator, eg ``ssh-rsa``.
949 :param bytes blob: The blob bytes themselves.
950 :param str comment: A comment, if one was given (e.g. file-based.)
951 """
952 self.key_type = type_
953 self.key_blob = blob
954 self.comment = comment
956 @classmethod
957 def from_file(cls, filename):
958 """
959 Create a public blob from a ``-cert.pub``-style file on disk.
960 """
961 with open(filename) as f:
962 string = f.read()
963 return cls.from_string(string)
965 @classmethod
966 def from_string(cls, string):
967 """
968 Create a public blob from a ``-cert.pub``-style string.
969 """
970 fields = string.split(None, 2)
971 if len(fields) < 2:
972 msg = "Not enough fields for public blob: {}"
973 raise ValueError(msg.format(fields))
974 key_type = fields[0]
975 key_blob = decodebytes(b(fields[1]))
976 try:
977 comment = fields[2].strip()
978 except IndexError:
979 comment = None
980 # Verify that the blob message first (string) field matches the
981 # key_type
982 m = Message(key_blob)
983 blob_type = m.get_text()
984 if blob_type != key_type:
985 deets = "key type={!r}, but blob type={!r}".format(
986 key_type, blob_type
987 )
988 raise ValueError("Invalid PublicBlob contents: {}".format(deets))
989 # All good? All good.
990 return cls(type_=key_type, blob=key_blob, comment=comment)
992 @classmethod
993 def from_message(cls, message):
994 """
995 Create a public blob from a network `.Message`.
997 Specifically, a cert-bearing pubkey auth packet, because by definition
998 OpenSSH-style certificates 'are' their own network representation."
999 """
1000 type_ = message.get_text()
1001 return cls(type_=type_, blob=message.asbytes())
1003 def __str__(self):
1004 ret = "{} public key/certificate".format(self.key_type)
1005 if self.comment:
1006 ret += "- {}".format(self.comment)
1007 return ret
1009 def __eq__(self, other):
1010 # Just piggyback on Message/BytesIO, since both of these should be one.
1011 return self and other and self.key_blob == other.key_blob
1013 def __ne__(self, other):
1014 return not self == other