Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/pkey.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

372 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20Common API for all public keys. 

21""" 

22 

23import base64 

24import os 

25import re 

26import struct 

27from base64 import decodebytes, encodebytes 

28from binascii import unhexlify 

29from hashlib import md5, sha256 

30from io import RawIOBase 

31from pathlib import Path 

32from typing import NamedTuple, Optional, Union 

33 

34import bcrypt 

35from cryptography.hazmat.backends import default_backend 

36from cryptography.hazmat.primitives import asymmetric, padding, serialization 

37from cryptography.hazmat.primitives.asymmetric.ec import ( 

38 EllipticCurvePrivateKey, 

39) 

40from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 

41from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey 

42from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

43 

44from paramiko import util 

45from paramiko.common import o600 

46from paramiko.message import Message 

47from paramiko.ssh_exception import PasswordRequiredException, SSHException 

48from paramiko.util import b, u 

49 

50# TripleDES is moving from `cryptography.hazmat.primitives.ciphers.algorithms` 

51# in cryptography>=43.0.0 to `cryptography.hazmat.decrepit.ciphers.algorithms` 

52# It will be removed from `cryptography.hazmat.primitives.ciphers.algorithms` 

53# in cryptography==48.0.0. 

54# 

55# Source References: 

56# - https://github.com/pyca/cryptography/commit/722a6393e61b3ac 

57# - https://github.com/pyca/cryptography/pull/11407/files 

58try: 

59 from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES 

60except ImportError: 

61 from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES 

62 

63 

64OPENSSH_AUTH_MAGIC = b"openssh-key-v1\x00" 

65 

66 

67def _unpad_openssh(data): 

68 # At the moment, this is only used for unpadding private keys on disk. This 

69 # really ought to be made constant time (possibly by upstreaming this logic 

70 # into pyca/cryptography). 

71 padding_length = data[-1] 

72 if 0x20 <= padding_length < 0x7F: 

73 return data # no padding, last byte part comment (printable ascii) 

74 if padding_length > 15: 

75 raise SSHException("Invalid key") 

76 for i in range(padding_length): 

77 if data[i - padding_length] != i + 1: 

78 raise SSHException("Invalid key") 

79 return data[:-padding_length] 

80 

81 

82class UnknownKeyType(Exception): 

83 """ 

84 An unknown public/private key algorithm was attempted to be read. 

85 """ 

86 

87 def __init__(self, key_type=None, key_bytes=None): 

88 self.key_type = key_type 

89 self.key_bytes = key_bytes 

90 

91 def __str__(self): 

92 return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa 

93 

94 

95class FileFormat(NamedTuple): 

96 format: serialization.PrivateFormat 

97 encoding: serialization.Encoding 

98 

99 

100# While these have no apparent interface/protocol within Cryptography, all can 

101# be duck typed as eg "having .private_bytes", which we have always relied upon 

102# implicitly since the switch to this library. 

103PrivateKey = Union[RSAPrivateKey, EllipticCurvePrivateKey, Ed25519PrivateKey] 

104 

105# NOTE: considered making these part of an Enum but that was a bit 

106# annoying/fussy, so this is an okay middle ground? 

107PEM = FileFormat( 

108 format=serialization.PrivateFormat.TraditionalOpenSSL, 

109 encoding=serialization.Encoding.PEM, 

110) 

111OPENSSH = FileFormat( 

112 format=serialization.PrivateFormat.OpenSSH, 

113 encoding=serialization.Encoding.PEM, 

114) 

115# TODO: others as desired? 

116 

117 

118class PKey: 

119 """ 

120 Base class for public keys. 

121 

122 Also includes some "meta" level convenience constructors such as 

123 `.from_type_string`. 

124 """ 

125 

126 # known encryption types for private key files: 

127 _CIPHER_TABLE = { 

128 "AES-128-CBC": { 

129 "cipher": algorithms.AES, 

130 "keysize": 16, 

131 "blocksize": 16, 

132 "mode": modes.CBC, 

133 }, 

134 "AES-256-CBC": { 

135 "cipher": algorithms.AES, 

136 "keysize": 32, 

137 "blocksize": 16, 

138 "mode": modes.CBC, 

139 }, 

140 "DES-EDE3-CBC": { 

141 "cipher": TripleDES, 

142 "keysize": 24, 

143 "blocksize": 8, 

144 "mode": modes.CBC, 

145 }, 

146 } 

147 _PRIVATE_KEY_FORMAT_ORIGINAL = 1 

148 _PRIVATE_KEY_FORMAT_OPENSSH = 2 

149 BEGIN_TAG = re.compile(r"^-{5}BEGIN (RSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$") 

150 END_TAG = re.compile(r"^-{5}END (RSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$") 

151 

152 @staticmethod 

153 def from_path(path, password: Optional[str] = None): 

154 """ 

155 Attempt to instantiate appropriate key subclass from given file path. 

156 

157 :param pathlib.Path path: The path to load (may also be a `str`). 

158 :param str password: Optional decryption password. 

159 

160 :returns: 

161 A `PKey` subclass instance. 

162 

163 :raises: 

164 `UnknownKeyType`, if our crypto backend doesn't know this key type. 

165 

166 .. versionadded:: 3.2 

167 .. versionchanged:: 5.0 

168 Renamed ``passphrase`` argument to ``password`` for consistency 

169 with older methods. 

170 """ 

171 

172 # Lazy import to avoid circular import issues 

173 from paramiko import ECDSAKey, Ed25519Key, RSAKey 

174 

175 # Normalize to string, as cert suffix isn't quite an extension, so 

176 # pathlib isn't useful for this. 

177 path = str(path) 

178 

179 # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API 

180 # /either/ the key /or/ the cert, when there is a key/cert pair. 

181 cert_suffix = "-cert.pub" 

182 if str(path).endswith(cert_suffix): 

183 key_path = path[: -len(cert_suffix)] 

184 cert_path = path 

185 else: 

186 key_path = path 

187 cert_path = path + cert_suffix 

188 

189 key_path = Path(key_path).expanduser() 

190 cert_path = Path(cert_path).expanduser() 

191 

192 data = key_path.read_bytes() 

193 # Like OpenSSH, try modern/OpenSSH-specific key load first 

194 try: 

195 loaded = serialization.load_ssh_private_key( 

196 data=data, password=password 

197 ) 

198 # Then fall back to assuming legacy PEM type 

199 except ValueError: 

200 loaded = serialization.load_pem_private_key( 

201 data=data, password=password 

202 ) 

203 # TODO Python 3.10: match statement? (NOTE: we cannot use a dict 

204 # because the results from the loader are literal backend, eg openssl, 

205 # private classes, so isinstance tests work but exact 'x class is y' 

206 # tests will not work) 

207 # TODO: leverage already-parsed/math'd obj to avoid duplicate cpu 

208 # cycles? seemingly requires most of our key subclasses to be rewritten 

209 # to be cryptography-object-forward. this is still likely faster than 

210 # the old SSHClient code that just tried instantiating every class! 

211 key_class = None 

212 if isinstance(loaded, asymmetric.rsa.RSAPrivateKey): 

213 key_class = RSAKey 

214 elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey): 

215 key_class = Ed25519Key 

216 elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey): 

217 key_class = ECDSAKey 

218 else: 

219 raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__) 

220 with key_path.open() as fd: 

221 key = key_class.from_private_key(fd, password=password) 

222 if cert_path.exists(): 

223 # load_certificate can take Message, path-str, or value-str 

224 key.load_certificate(str(cert_path)) 

225 return key 

226 

227 @staticmethod 

228 def from_type_string( 

229 key_type: str, key_bytes: bytes, password: Optional[str] = None 

230 ): 

231 """ 

232 Given type `str` & raw `bytes`, return a `PKey` subclass instance. 

233 

234 For example, ``PKey.from_type_string("ssh-ed25519", <public bytes>)`` 

235 will (if successful) return a new `.Ed25519Key`. 

236 

237 :param str key_type: 

238 The key type, eg ``"ssh-ed25519"``. 

239 :param bytes key_bytes: 

240 The raw byte data forming the key material, as expected by 

241 subclasses' ``data`` parameter. 

242 :param str password: 

243 Optional password used to decrypt ``key_bytes``. 

244 

245 :returns: 

246 A `PKey` subclass instance. 

247 

248 :raises: 

249 `UnknownKeyType`, if no registered classes knew about this type. 

250 

251 .. versionadded:: 3.2 

252 .. versionchanged:: 5.0 

253 Added the ``password`` kwarg. 

254 """ 

255 from paramiko import key_classes 

256 

257 for key_class in key_classes: 

258 if key_type in key_class.identifiers(): 

259 return key_class(data=key_bytes, password=password) 

260 raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes) 

261 

262 @classmethod 

263 def identifiers(cls): 

264 """ 

265 returns an iterable of key format/name strings this class can handle. 

266 

267 Most classes only have a single identifier, and thus this default 

268 implementation suffices; see `.ECDSAKey` for one example of an 

269 override. 

270 """ 

271 return [cls.name] 

272 

273 # TODO (backwards incompat): make this and subclasses consistent, some of 

274 # our own classmethods even assume kwargs we don't define! 

275 # TODO (backwards incompat): prob also raise NotImplementedError instead of 

276 # pass'ing; the contract is pretty obviously that you need to handle 

277 # msg/data/filename appropriately. (If 'pass' is a concession to testing, 

278 # see about doing the work to fix the tests instead) 

279 def __init__(self, msg=None, data=None): 

280 """ 

281 Create a new instance of this public key type. If ``msg`` is given, 

282 the key's public part(s) will be filled in from the message. If 

283 ``data`` is given, the key's public part(s) will be filled in from 

284 the string. 

285 

286 :param .Message msg: 

287 an optional SSH `.Message` containing a public key of this type. 

288 :param bytes data: 

289 optional, the bytes of a public key of this type 

290 

291 :raises: `.SSHException` -- 

292 if a key cannot be created from the ``data`` or ``msg`` given, or 

293 no key was passed in. 

294 """ 

295 pass 

296 

297 # TODO: arguably this might want to be __str__ instead? ehh 

298 # TODO: ditto the interplay between showing class name (currently we just 

299 # say PKey writ large) and algorithm (usually == class name, but not 

300 # always, also sometimes shows certificate-ness) 

301 # TODO: if we do change it, we also want to tweak eg AgentKey, as it 

302 # currently displays agent-ness with a suffix 

303 def __repr__(self): 

304 comment = "" 

305 # Works for AgentKey, may work for others? 

306 if hasattr(self, "comment") and self.comment: 

307 comment = f", comment={self.comment!r}" 

308 return f"PKey(alg={self.algorithm_name}, bits={self.get_bits()}, fp={self.fingerprint}{comment})" # noqa 

309 

310 # TODO (backwards incompat): just merge into __bytes__ (everywhere) 

311 def asbytes(self): 

312 """ 

313 Return a string of an SSH `.Message` made up of the public part(s) of 

314 this key. This string is suitable for passing to `__init__` to 

315 re-create the key object later. 

316 """ 

317 return bytes() 

318 

319 def __bytes__(self): 

320 return self.asbytes() 

321 

322 def __eq__(self, other): 

323 return isinstance(other, PKey) and self._fields == other._fields 

324 

325 def __hash__(self): 

326 return hash(self._fields) 

327 

328 @property 

329 def _fields(self): 

330 raise NotImplementedError 

331 

332 def get_name(self): 

333 """ 

334 Return the name of this private key implementation. 

335 

336 :return: 

337 name of this private key type, in SSH terminology, as a `str` (for 

338 example, ``"ssh-rsa"``). 

339 """ 

340 return "" 

341 

342 @property 

343 def algorithm_name(self): 

344 """ 

345 Return the key algorithm identifier for this key. 

346 

347 Similar to `get_name`, but aimed at pure algorithm name instead of SSH 

348 protocol field value. 

349 """ 

350 # Nuke the leading 'ssh-' 

351 # TODO in Python 3.9: use .removeprefix() 

352 name = self.get_name().replace("ssh-", "") 

353 # Trim any cert suffix (but leave the -cert, as OpenSSH does) 

354 cert_tail = "-cert-v01@openssh.com" 

355 if cert_tail in name: 

356 name = name.replace(cert_tail, "-cert") 

357 # Nuke any eg ECDSA suffix, OpenSSH does basically this too. 

358 else: 

359 name = name.split("-")[0] 

360 return name.upper() 

361 

362 def get_bits(self): 

363 """ 

364 Return the number of significant bits in this key. This is useful 

365 for judging the relative security of a key. 

366 

367 :return: bits in the key (as an `int`) 

368 """ 

369 # TODO (backwards incompat): raise NotImplementedError, 0 is unlikely 

370 # to ever be _correct_ and nothing in the critical path seems to use 

371 # this. 

372 return 0 

373 

374 def can_sign(self): 

375 """ 

376 Return ``True`` if this key has the private part necessary for signing 

377 data. 

378 """ 

379 return False 

380 

381 def get_fingerprint(self): 

382 """ 

383 Return an MD5 fingerprint of the public part of this key. Nothing 

384 secret is revealed. 

385 

386 :return: 

387 a 16-byte `string <str>` (binary) of the MD5 fingerprint, in SSH 

388 format. 

389 """ 

390 return md5(self.asbytes()).digest() 

391 

392 @property 

393 def fingerprint(self): 

394 """ 

395 Modern fingerprint property designed to be comparable to OpenSSH. 

396 

397 Currently only does SHA256 (the OpenSSH default). 

398 

399 .. versionadded:: 3.2 

400 """ 

401 hashy = sha256(bytes(self)) 

402 hash_name = hashy.name.upper() 

403 b64ed = encodebytes(hashy.digest()) 

404 cleaned = u(b64ed).strip().rstrip("=") # yes, OpenSSH does this too! 

405 return f"{hash_name}:{cleaned}" 

406 

407 def get_base64(self): 

408 """ 

409 Return a base64 string containing the public part of this key. Nothing 

410 secret is revealed. This format is compatible with that used to store 

411 public key files or recognized host keys. 

412 

413 :return: a base64 `string <str>` containing the public part of the key. 

414 """ 

415 return u(encodebytes(self.asbytes())).replace("\n", "") 

416 

417 def sign_ssh_data(self, data, algorithm=None): 

418 """ 

419 Sign a blob of data with this private key, and return a `.Message` 

420 representing an SSH signature message. 

421 

422 :param bytes data: 

423 the data to sign. 

424 :param str algorithm: 

425 the signature algorithm to use, if different from the key's 

426 internal name. Default: ``None``. 

427 :return: an SSH signature `message <.Message>`. 

428 

429 .. versionchanged:: 2.9 

430 Added the ``algorithm`` kwarg. 

431 """ 

432 return bytes() 

433 

434 def verify_ssh_sig(self, data, msg): 

435 """ 

436 Given a blob of data, and an SSH message representing a signature of 

437 that data, verify that it was signed with this key. 

438 

439 :param bytes data: the data that was signed. 

440 :param .Message msg: an SSH signature message 

441 :return: 

442 ``True`` if the signature verifies correctly; ``False`` otherwise. 

443 """ 

444 return False 

445 

446 @classmethod 

447 def from_private_key_file(cls, filename, password=None): 

448 """ 

449 Create a key object by reading a private key file. If the private 

450 key is encrypted and ``password`` is not ``None``, the given password 

451 will be used to decrypt the key (otherwise `.PasswordRequiredException` 

452 is thrown). Through the magic of Python, this factory method will 

453 exist in all subclasses of PKey (such as `.RSAKey`), but 

454 is useless on the abstract PKey class. 

455 

456 :param str filename: name of the file to read 

457 :param str password: 

458 an optional password to use to decrypt the key file, if it's 

459 encrypted 

460 :return: a new `.PKey` based on the given private key 

461 

462 :raises: ``IOError`` -- if there was an error reading the file 

463 :raises: `.PasswordRequiredException` -- if the private key file is 

464 encrypted, and ``password`` is ``None`` 

465 :raises: `.SSHException` -- if the key file is invalid 

466 """ 

467 key = cls(filename=filename, password=password) 

468 return key 

469 

470 @classmethod 

471 def from_private_key(cls, file_obj, password=None): 

472 """ 

473 Create a key object by reading a private key from a file (or file-like) 

474 object. If the private key is encrypted and ``password`` is not 

475 ``None``, the given password will be used to decrypt the key (otherwise 

476 `.PasswordRequiredException` is thrown). 

477 

478 :param file_obj: the file-like object to read from 

479 :param str password: 

480 an optional password to use to decrypt the key, if it's encrypted 

481 :return: a new `.PKey` based on the given private key 

482 

483 :raises: ``IOError`` -- if there was an error reading the key 

484 :raises: `.PasswordRequiredException` -- 

485 if the private key file is encrypted, and ``password`` is ``None`` 

486 :raises: `.SSHException` -- if the key file is invalid 

487 """ 

488 key = cls(file_obj=file_obj, password=password) 

489 return key 

490 

491 def write_private_key_file( 

492 self, 

493 filename: str, 

494 password: Optional[str] = None, 

495 file_format: FileFormat = PEM, 

496 ): 

497 """ 

498 Write private key contents into a file. If the password is not 

499 ``None``, the key is encrypted before writing. 

500 

501 :param str filename: name of the file to write 

502 :param str password: 

503 an optional password to use to encrypt the key file 

504 :param FileFormat file_format: 

505 what format+encoding pair to use; defaults to the original 

506 behavior, namely PEM. 

507 

508 :raises: ``IOError`` -- if there was an error writing the file 

509 :raises: `.SSHException` -- if the key is invalid 

510 """ 

511 self._write_private_key_file( 

512 filename, 

513 self.private_key, 

514 file_format=file_format, 

515 password=password, 

516 ) 

517 

518 def write_private_key( 

519 self, 

520 file_obj: RawIOBase, 

521 password: Optional[str] = None, 

522 file_format: FileFormat = PEM, 

523 ): 

524 """ 

525 Write private key contents into a file (or file-like) object. If the 

526 password is not ``None``, the key is encrypted before writing. 

527 

528 :param file_obj: the file-like object to write into 

529 :param str password: an optional password to use to encrypt the key 

530 :param FileFormat file_format: 

531 what format+encoding pair to use; defaults to the original 

532 behavior, namely PEM. 

533 

534 :raises: ``IOError`` -- if there was an error writing to the file 

535 :raises: `.SSHException` -- if the key is invalid 

536 """ 

537 self._write_private_key( 

538 file_obj, 

539 self.private_key, 

540 file_format=file_format, 

541 password=password, 

542 ) 

543 

544 def _read_private_key_file(self, tag, filename, password=None): 

545 """ 

546 Read an SSH2-format private key file, looking for a string of the type 

547 ``"BEGIN xxx PRIVATE KEY"`` for some ``xxx``, base64-decode the text we 

548 find, and return it as a string. If the private key is encrypted and 

549 ``password`` is not ``None``, the given password will be used to 

550 decrypt the key (otherwise `.PasswordRequiredException` is thrown). 

551 

552 :param str tag: 

553 ``"RSA"`` (or etc), the tag used to mark the data block. 

554 :param str filename: 

555 name of the file to read. 

556 :param str password: 

557 an optional password to use to decrypt the key file, if it's 

558 encrypted. 

559 :return: 

560 the `bytes` that make up the private key. 

561 

562 :raises: ``IOError`` -- if there was an error reading the file. 

563 :raises: `.PasswordRequiredException` -- if the private key file is 

564 encrypted, and ``password`` is ``None``. 

565 :raises: `.SSHException` -- if the key file is invalid. 

566 """ 

567 with open(filename, "r") as f: 

568 data = self._read_private_key(tag, f, password) 

569 return data 

570 

571 def _read_private_key(self, tag, f, password=None): 

572 lines = f.readlines() 

573 if not lines: 

574 raise SSHException("no lines in {} private key file".format(tag)) 

575 

576 # find the BEGIN tag 

577 start = 0 

578 m = self.BEGIN_TAG.match(lines[start]) 

579 line_range = len(lines) - 1 

580 while start < line_range and not m: 

581 start += 1 

582 m = self.BEGIN_TAG.match(lines[start]) 

583 start += 1 

584 keytype = m.group(1) if m else None 

585 if start >= len(lines) or keytype is None: 

586 raise SSHException("not a valid {} private key file".format(tag)) 

587 

588 # find the END tag 

589 end = start 

590 m = self.END_TAG.match(lines[end]) 

591 while end < line_range and not m: 

592 end += 1 

593 m = self.END_TAG.match(lines[end]) 

594 

595 if keytype == tag: 

596 data = self._read_private_key_pem(lines, end, password) 

597 pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL 

598 elif keytype == "OPENSSH": 

599 data = self._read_private_key_openssh(lines[start:end], password) 

600 pkformat = self._PRIVATE_KEY_FORMAT_OPENSSH 

601 else: 

602 raise SSHException( 

603 "encountered {} key, expected {} key".format(keytype, tag) 

604 ) 

605 

606 return pkformat, data 

607 

608 def _got_bad_key_format_id(self, id_): 

609 err = "{}._read_private_key() spat out an unknown key format id '{}'" 

610 raise SSHException(err.format(self.__class__.__name__, id_)) 

611 

612 def _read_private_key_pem(self, lines, end, password): 

613 start = 0 

614 # parse any headers first 

615 headers = {} 

616 start += 1 

617 while start < len(lines): 

618 line = lines[start].split(": ") 

619 if len(line) == 1: 

620 break 

621 headers[line[0].lower()] = line[1].strip() 

622 start += 1 

623 # if we trudged to the end of the file, just try to cope. 

624 try: 

625 data = decodebytes(b("".join(lines[start:end]))) 

626 except base64.binascii.Error as e: 

627 raise SSHException("base64 decoding error: {}".format(e)) 

628 if "proc-type" not in headers: 

629 # unencryped: done 

630 return data 

631 # encrypted keyfile: will need a password 

632 proc_type = headers["proc-type"] 

633 if proc_type != "4,ENCRYPTED": 

634 raise SSHException( 

635 'Unknown private key structure "{}"'.format(proc_type) 

636 ) 

637 try: 

638 encryption_type, saltstr = headers["dek-info"].split(",") 

639 except: 

640 raise SSHException("Can't parse DEK-info in private key file") 

641 if encryption_type not in self._CIPHER_TABLE: 

642 raise SSHException( 

643 'Unknown private key cipher "{}"'.format(encryption_type) 

644 ) 

645 # if no password was passed in, 

646 # raise an exception pointing out that we need one 

647 if password is None: 

648 raise PasswordRequiredException("Private key file is encrypted") 

649 cipher = self._CIPHER_TABLE[encryption_type]["cipher"] 

650 keysize = self._CIPHER_TABLE[encryption_type]["keysize"] 

651 mode = self._CIPHER_TABLE[encryption_type]["mode"] 

652 salt = unhexlify(b(saltstr)) 

653 key = util.generate_key_bytes(md5, salt, password, keysize) 

654 decryptor = Cipher( 

655 cipher(key), mode(salt), backend=default_backend() 

656 ).decryptor() 

657 decrypted_data = decryptor.update(data) + decryptor.finalize() 

658 unpadder = padding.PKCS7(cipher.block_size).unpadder() 

659 try: 

660 return unpadder.update(decrypted_data) + unpadder.finalize() 

661 except ValueError: 

662 raise SSHException("Bad password or corrupt private key file") 

663 

664 def _read_private_key_openssh(self, lines, password): 

665 """ 

666 Read the new OpenSSH SSH2 private key format available 

667 since OpenSSH version 6.5 

668 Reference: 

669 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key 

670 """ 

671 try: 

672 data = decodebytes(b("".join(lines))) 

673 except base64.binascii.Error as e: 

674 raise SSHException("base64 decoding error: {}".format(e)) 

675 

676 # read data struct 

677 auth_magic = data[:15] 

678 if auth_magic != OPENSSH_AUTH_MAGIC: 

679 raise SSHException("unexpected OpenSSH key header encountered") 

680 

681 cstruct = self._uint32_cstruct_unpack(data[15:], "sssur") 

682 cipher, kdfname, kdf_options, num_pubkeys, remainder = cstruct 

683 # For now, just support 1 key. 

684 if num_pubkeys > 1: 

685 raise SSHException( 

686 "unsupported: private keyfile has multiple keys" 

687 ) 

688 pubkey, privkey_blob = self._uint32_cstruct_unpack(remainder, "ss") 

689 

690 if kdfname == b("bcrypt"): 

691 if cipher == b("aes256-cbc"): 

692 mode = modes.CBC 

693 elif cipher == b("aes256-ctr"): 

694 mode = modes.CTR 

695 else: 

696 raise SSHException( 

697 "unknown cipher `{}` used in private key file".format( 

698 cipher.decode("utf-8") 

699 ) 

700 ) 

701 # Encrypted private key. 

702 # If no password was passed in, raise an exception pointing 

703 # out that we need one 

704 if password is None: 

705 raise PasswordRequiredException( 

706 "private key file is encrypted" 

707 ) 

708 

709 # Unpack salt and rounds from kdfoptions 

710 salt, rounds = self._uint32_cstruct_unpack(kdf_options, "su") 

711 

712 # run bcrypt kdf to derive key and iv/nonce (32 + 16 bytes) 

713 key_iv = bcrypt.kdf( 

714 b(password), 

715 b(salt), 

716 48, 

717 rounds, 

718 # We can't control how many rounds are on disk, so no sense 

719 # warning about it. 

720 ignore_few_rounds=True, 

721 ) 

722 key = key_iv[:32] 

723 iv = key_iv[32:] 

724 

725 # decrypt private key blob 

726 decryptor = Cipher( 

727 algorithms.AES(key), mode(iv), default_backend() 

728 ).decryptor() 

729 decrypted_privkey = decryptor.update(privkey_blob) 

730 decrypted_privkey += decryptor.finalize() 

731 elif cipher == b("none") and kdfname == b("none"): 

732 # Unencrypted private key 

733 decrypted_privkey = privkey_blob 

734 else: 

735 raise SSHException( 

736 "unknown cipher or kdf used in private key file" 

737 ) 

738 

739 # Unpack private key and verify checkints 

740 cstruct = self._uint32_cstruct_unpack(decrypted_privkey, "uusr") 

741 checkint1, checkint2, keytype, keydata = cstruct 

742 

743 if checkint1 != checkint2: 

744 raise SSHException( 

745 "OpenSSH private key file checkints do not match" 

746 ) 

747 

748 return _unpad_openssh(keydata) 

749 

750 def _uint32_cstruct_unpack(self, data, strformat): 

751 """ 

752 Used to read new OpenSSH private key format. 

753 Unpacks a c data structure containing a mix of 32-bit uints and 

754 variable length strings prefixed by 32-bit uint size field, 

755 according to the specified format. Returns the unpacked vars 

756 in a tuple. 

757 Format strings: 

758 s - denotes a string 

759 i - denotes a long integer, encoded as a byte string 

760 u - denotes a 32-bit unsigned integer 

761 r - the remainder of the input string, returned as a string 

762 """ 

763 arr = [] 

764 idx = 0 

765 try: 

766 for f in strformat: 

767 if f == "s": 

768 # string 

769 s_size = struct.unpack(">L", data[idx : idx + 4])[0] 

770 idx += 4 

771 s = data[idx : idx + s_size] 

772 idx += s_size 

773 arr.append(s) 

774 if f == "i": 

775 # long integer 

776 s_size = struct.unpack(">L", data[idx : idx + 4])[0] 

777 idx += 4 

778 s = data[idx : idx + s_size] 

779 idx += s_size 

780 i = util.inflate_long(s, True) 

781 arr.append(i) 

782 elif f == "u": 

783 # 32-bit unsigned int 

784 u = struct.unpack(">L", data[idx : idx + 4])[0] 

785 idx += 4 

786 arr.append(u) 

787 elif f == "r": 

788 # remainder as string 

789 s = data[idx:] 

790 arr.append(s) 

791 break 

792 except Exception as e: 

793 # PKey-consuming code frequently wants to save-and-skip-over issues 

794 # with loading keys, and uses SSHException as the (really friggin 

795 # awful) signal for this. So for now...we do this. 

796 raise SSHException(str(e)) 

797 return tuple(arr) 

798 

799 def _write_private_key_file( 

800 self, 

801 filename: str, 

802 key: PrivateKey, 

803 file_format: FileFormat, 

804 password: Optional[str] = None, 

805 ): 

806 # Ensure that we create new key files directly with a user-only mode, 

807 # instead of opening, writing, then chmodding, which leaves us open to 

808 # CVE-2022-24302. 

809 with os.fdopen( 

810 os.open( 

811 filename, 

812 # NOTE: O_TRUNC is a noop on new files, and O_CREAT is a noop 

813 # on existing files, so using all 3 in both cases is fine. 

814 flags=os.O_WRONLY | os.O_TRUNC | os.O_CREAT, 

815 # Ditto the use of the 'mode' argument; it should be safe to 

816 # give even for existing files (though it will not act like a 

817 # chmod in that case). 

818 mode=o600, 

819 ), 

820 # Yea, you still gotta inform the FLO that it is in "write" mode. 

821 "w", 

822 ) as f: 

823 self._write_private_key(f, key, file_format, password=password) 

824 

825 def _write_private_key( 

826 self, 

827 f: RawIOBase, 

828 key: PrivateKey, 

829 file_format: FileFormat, 

830 password: Optional[str] = None, 

831 ): 

832 if password is None: 

833 encryption = serialization.NoEncryption() 

834 else: 

835 encryption = serialization.BestAvailableEncryption(b(password)) 

836 

837 f.write( 

838 key.private_bytes( 

839 file_format.encoding, file_format.format, encryption 

840 ).decode() 

841 ) 

842 

843 def _check_type_and_load_cert(self, msg, key_type, cert_type): 

844 """ 

845 Perform message type-checking & optional certificate loading. 

846 

847 This includes fast-forwarding cert ``msg`` objects past the nonce, so 

848 that the subsequent fields are the key numbers; thus the caller may 

849 expect to treat the message as key material afterwards either way. 

850 

851 The obtained key type is returned for classes which need to know what 

852 it was (e.g. ECDSA.) 

853 """ 

854 # Normalization; most classes have a single key type and give a string, 

855 # but eg ECDSA is a 1:N mapping. 

856 key_types = key_type 

857 cert_types = cert_type 

858 if isinstance(key_type, str): 

859 key_types = [key_types] 

860 if isinstance(cert_types, str): 

861 cert_types = [cert_types] 

862 # Can't do much with no message, that should've been handled elsewhere 

863 if msg is None: 

864 raise SSHException("Key object may not be empty") 

865 # First field is always key type, in either kind of object. (make sure 

866 # we rewind before grabbing it - sometimes caller had to do their own 

867 # introspection first!) 

868 msg.rewind() 

869 type_ = msg.get_text() 

870 # Regular public key - nothing special to do besides the implicit 

871 # type check. 

872 if type_ in key_types: 

873 pass 

874 # OpenSSH-compatible certificate - store full copy as .public_blob 

875 # (so signing works correctly) and then fast-forward past the 

876 # nonce. 

877 elif type_ in cert_types: 

878 # This seems the cleanest way to 'clone' an already-being-read 

879 # message; they're *IO objects at heart and their .getvalue() 

880 # always returns the full value regardless of pointer position. 

881 self.load_certificate(Message(msg.asbytes())) 

882 # Read out nonce as it comes before the public numbers - our caller 

883 # is likely going to use the (only borrowed by us, not owned) 

884 # 'msg' object for loading those numbers right after this. 

885 # TODO: usefully interpret it & other non-public-number fields 

886 # (requires going back into per-type subclasses.) 

887 msg.get_string() 

888 else: 

889 err = "Invalid key (class: {}, data type: {}" 

890 raise SSHException(err.format(self.__class__.__name__, type_)) 

891 

892 def load_certificate(self, value): 

893 """ 

894 Supplement the private key contents with data loaded from an OpenSSH 

895 public key (``.pub``) or certificate (``-cert.pub``) file, a string 

896 containing such a file, or a `.Message` object. 

897 

898 The .pub contents adds no real value, since the private key 

899 file includes sufficient information to derive the public 

900 key info. For certificates, however, this can be used on 

901 the client side to offer authentication requests to the server 

902 based on certificate instead of raw public key. 

903 

904 See: 

905 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys 

906 

907 Note: very little effort is made to validate the certificate contents, 

908 that is for the server to decide if it is good enough to authenticate 

909 successfully. 

910 """ 

911 if isinstance(value, Message): 

912 constructor = "from_message" 

913 elif os.path.isfile(value): 

914 constructor = "from_file" 

915 else: 

916 constructor = "from_string" 

917 blob = getattr(PublicBlob, constructor)(value) 

918 if not blob.key_type.startswith(self.get_name()): 

919 err = "PublicBlob type {} incompatible with key type {}" 

920 raise ValueError(err.format(blob.key_type, self.get_name())) 

921 self.public_blob = blob 

922 

923 

924# General construct for an OpenSSH style Public Key blob 

925# readable from a one-line file of the format: 

926# <key-name> <base64-blob> [<comment>] 

927# Of little value in the case of standard public keys 

928# {ssh-rsa, ssh-ecdsa, ssh-ed25519}, but should 

929# provide rudimentary support for {*-cert.v01} 

930class PublicBlob: 

931 """ 

932 OpenSSH plain public key or OpenSSH signed public key (certificate). 

933 

934 Tries to be as dumb as possible and barely cares about specific 

935 per-key-type data. 

936 

937 .. note:: 

938 

939 Most of the time you'll want to call `from_file`, `from_string` or 

940 `from_message` for useful instantiation, the main constructor is 

941 basically "I should be using ``attrs`` for this." 

942 """ 

943 

944 def __init__(self, type_, blob, comment=None): 

945 """ 

946 Create a new public blob of given type and contents. 

947 

948 :param str type_: Type indicator, eg ``ssh-rsa``. 

949 :param bytes blob: The blob bytes themselves. 

950 :param str comment: A comment, if one was given (e.g. file-based.) 

951 """ 

952 self.key_type = type_ 

953 self.key_blob = blob 

954 self.comment = comment 

955 

956 @classmethod 

957 def from_file(cls, filename): 

958 """ 

959 Create a public blob from a ``-cert.pub``-style file on disk. 

960 """ 

961 with open(filename) as f: 

962 string = f.read() 

963 return cls.from_string(string) 

964 

965 @classmethod 

966 def from_string(cls, string): 

967 """ 

968 Create a public blob from a ``-cert.pub``-style string. 

969 """ 

970 fields = string.split(None, 2) 

971 if len(fields) < 2: 

972 msg = "Not enough fields for public blob: {}" 

973 raise ValueError(msg.format(fields)) 

974 key_type = fields[0] 

975 key_blob = decodebytes(b(fields[1])) 

976 try: 

977 comment = fields[2].strip() 

978 except IndexError: 

979 comment = None 

980 # Verify that the blob message first (string) field matches the 

981 # key_type 

982 m = Message(key_blob) 

983 blob_type = m.get_text() 

984 if blob_type != key_type: 

985 deets = "key type={!r}, but blob type={!r}".format( 

986 key_type, blob_type 

987 ) 

988 raise ValueError("Invalid PublicBlob contents: {}".format(deets)) 

989 # All good? All good. 

990 return cls(type_=key_type, blob=key_blob, comment=comment) 

991 

992 @classmethod 

993 def from_message(cls, message): 

994 """ 

995 Create a public blob from a network `.Message`. 

996 

997 Specifically, a cert-bearing pubkey auth packet, because by definition 

998 OpenSSH-style certificates 'are' their own network representation." 

999 """ 

1000 type_ = message.get_text() 

1001 return cls(type_=type_, blob=message.asbytes()) 

1002 

1003 def __str__(self): 

1004 ret = "{} public key/certificate".format(self.key_type) 

1005 if self.comment: 

1006 ret += "- {}".format(self.comment) 

1007 return ret 

1008 

1009 def __eq__(self, other): 

1010 # Just piggyback on Message/BytesIO, since both of these should be one. 

1011 return self and other and self.key_blob == other.key_blob 

1012 

1013 def __ne__(self, other): 

1014 return not self == other