Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/pkey.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

362 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20Common API for all public keys. 

21""" 

22 

23import base64 

24from base64 import encodebytes, decodebytes 

25from binascii import unhexlify 

26import os 

27from pathlib import Path 

28from hashlib import md5, sha256 

29import re 

30import struct 

31 

32import bcrypt 

33 

34from cryptography.hazmat.backends import default_backend 

35from cryptography.hazmat.primitives import padding, serialization 

36from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher 

37from cryptography.hazmat.primitives import asymmetric 

38 

39from paramiko import util 

40from paramiko.util import u, b 

41from paramiko.common import o600 

42from paramiko.ssh_exception import SSHException, PasswordRequiredException 

43from paramiko.message import Message 

44 

45 

46# TripleDES is moving from `cryptography.hazmat.primitives.ciphers.algorithms` 

47# in cryptography>=43.0.0 to `cryptography.hazmat.decrepit.ciphers.algorithms` 

48# It will be removed from `cryptography.hazmat.primitives.ciphers.algorithms` 

49# in cryptography==48.0.0. 

50# 

51# Source References: 

52# - https://github.com/pyca/cryptography/commit/722a6393e61b3ac 

53# - https://github.com/pyca/cryptography/pull/11407/files 

54try: 

55 from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES 

56except ImportError: 

57 from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES 

58 

59 

60OPENSSH_AUTH_MAGIC = b"openssh-key-v1\x00" 

61 

62 

63def _unpad_openssh(data): 

64 # At the moment, this is only used for unpadding private keys on disk. This 

65 # really ought to be made constant time (possibly by upstreaming this logic 

66 # into pyca/cryptography). 

67 padding_length = data[-1] 

68 if 0x20 <= padding_length < 0x7F: 

69 return data # no padding, last byte part comment (printable ascii) 

70 if padding_length > 15: 

71 raise SSHException("Invalid key") 

72 for i in range(padding_length): 

73 if data[i - padding_length] != i + 1: 

74 raise SSHException("Invalid key") 

75 return data[:-padding_length] 

76 

77 

78class UnknownKeyType(Exception): 

79 """ 

80 An unknown public/private key algorithm was attempted to be read. 

81 """ 

82 

83 def __init__(self, key_type=None, key_bytes=None): 

84 self.key_type = key_type 

85 self.key_bytes = key_bytes 

86 

87 def __str__(self): 

88 return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa 

89 

90 

91class PKey: 

92 """ 

93 Base class for public keys. 

94 

95 Also includes some "meta" level convenience constructors such as 

96 `.from_type_string`. 

97 """ 

98 

99 # known encryption types for private key files: 

100 _CIPHER_TABLE = { 

101 "AES-128-CBC": { 

102 "cipher": algorithms.AES, 

103 "keysize": 16, 

104 "blocksize": 16, 

105 "mode": modes.CBC, 

106 }, 

107 "AES-256-CBC": { 

108 "cipher": algorithms.AES, 

109 "keysize": 32, 

110 "blocksize": 16, 

111 "mode": modes.CBC, 

112 }, 

113 "DES-EDE3-CBC": { 

114 "cipher": TripleDES, 

115 "keysize": 24, 

116 "blocksize": 8, 

117 "mode": modes.CBC, 

118 }, 

119 } 

120 _PRIVATE_KEY_FORMAT_ORIGINAL = 1 

121 _PRIVATE_KEY_FORMAT_OPENSSH = 2 

122 BEGIN_TAG = re.compile(r"^-{5}BEGIN (RSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$") 

123 END_TAG = re.compile(r"^-{5}END (RSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$") 

124 

125 @staticmethod 

126 def from_path(path, passphrase=None): 

127 """ 

128 Attempt to instantiate appropriate key subclass from given file path. 

129 

130 :param Path path: The path to load (may also be a `str`). 

131 

132 :returns: 

133 A `PKey` subclass instance. 

134 

135 :raises: 

136 `UnknownKeyType`, if our crypto backend doesn't know this key type. 

137 

138 .. versionadded:: 3.2 

139 """ 

140 # TODO: make sure sphinx is reading Path right in param list... 

141 

142 # Lazy import to avoid circular import issues 

143 from paramiko import RSAKey, Ed25519Key, ECDSAKey 

144 

145 # Normalize to string, as cert suffix isn't quite an extension, so 

146 # pathlib isn't useful for this. 

147 path = str(path) 

148 

149 # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API 

150 # /either/ the key /or/ the cert, when there is a key/cert pair. 

151 cert_suffix = "-cert.pub" 

152 if str(path).endswith(cert_suffix): 

153 key_path = path[: -len(cert_suffix)] 

154 cert_path = path 

155 else: 

156 key_path = path 

157 cert_path = path + cert_suffix 

158 

159 key_path = Path(key_path).expanduser() 

160 cert_path = Path(cert_path).expanduser() 

161 

162 data = key_path.read_bytes() 

163 # Like OpenSSH, try modern/OpenSSH-specific key load first 

164 try: 

165 loaded = serialization.load_ssh_private_key( 

166 data=data, password=passphrase 

167 ) 

168 # Then fall back to assuming legacy PEM type 

169 except ValueError: 

170 loaded = serialization.load_pem_private_key( 

171 data=data, password=passphrase 

172 ) 

173 # TODO Python 3.10: match statement? (NOTE: we cannot use a dict 

174 # because the results from the loader are literal backend, eg openssl, 

175 # private classes, so isinstance tests work but exact 'x class is y' 

176 # tests will not work) 

177 # TODO: leverage already-parsed/math'd obj to avoid duplicate cpu 

178 # cycles? seemingly requires most of our key subclasses to be rewritten 

179 # to be cryptography-object-forward. this is still likely faster than 

180 # the old SSHClient code that just tried instantiating every class! 

181 key_class = None 

182 if isinstance(loaded, asymmetric.rsa.RSAPrivateKey): 

183 key_class = RSAKey 

184 elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey): 

185 key_class = Ed25519Key 

186 elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey): 

187 key_class = ECDSAKey 

188 else: 

189 raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__) 

190 with key_path.open() as fd: 

191 key = key_class.from_private_key(fd, password=passphrase) 

192 if cert_path.exists(): 

193 # load_certificate can take Message, path-str, or value-str 

194 key.load_certificate(str(cert_path)) 

195 return key 

196 

197 @staticmethod 

198 def from_type_string(key_type, key_bytes): 

199 """ 

200 Given type `str` & raw `bytes`, return a `PKey` subclass instance. 

201 

202 For example, ``PKey.from_type_string("ssh-ed25519", <public bytes>)`` 

203 will (if successful) return a new `.Ed25519Key`. 

204 

205 :param str key_type: 

206 The key type, eg ``"ssh-ed25519"``. 

207 :param bytes key_bytes: 

208 The raw byte data forming the key material, as expected by 

209 subclasses' ``data`` parameter. 

210 

211 :returns: 

212 A `PKey` subclass instance. 

213 

214 :raises: 

215 `UnknownKeyType`, if no registered classes knew about this type. 

216 

217 .. versionadded:: 3.2 

218 """ 

219 from paramiko import key_classes 

220 

221 for key_class in key_classes: 

222 if key_type in key_class.identifiers(): 

223 # TODO: needs to passthru things like passphrase 

224 return key_class(data=key_bytes) 

225 raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes) 

226 

227 @classmethod 

228 def identifiers(cls): 

229 """ 

230 returns an iterable of key format/name strings this class can handle. 

231 

232 Most classes only have a single identifier, and thus this default 

233 implementation suffices; see `.ECDSAKey` for one example of an 

234 override. 

235 """ 

236 return [cls.name] 

237 

238 # TODO 4.0: make this and subclasses consistent, some of our own 

239 # classmethods even assume kwargs we don't define! 

240 # TODO 4.0: prob also raise NotImplementedError instead of pass'ing; the 

241 # contract is pretty obviously that you need to handle msg/data/filename 

242 # appropriately. (If 'pass' is a concession to testing, see about doing the 

243 # work to fix the tests instead) 

244 def __init__(self, msg=None, data=None): 

245 """ 

246 Create a new instance of this public key type. If ``msg`` is given, 

247 the key's public part(s) will be filled in from the message. If 

248 ``data`` is given, the key's public part(s) will be filled in from 

249 the string. 

250 

251 :param .Message msg: 

252 an optional SSH `.Message` containing a public key of this type. 

253 :param bytes data: 

254 optional, the bytes of a public key of this type 

255 

256 :raises: `.SSHException` -- 

257 if a key cannot be created from the ``data`` or ``msg`` given, or 

258 no key was passed in. 

259 """ 

260 pass 

261 

262 # TODO: arguably this might want to be __str__ instead? ehh 

263 # TODO: ditto the interplay between showing class name (currently we just 

264 # say PKey writ large) and algorithm (usually == class name, but not 

265 # always, also sometimes shows certificate-ness) 

266 # TODO: if we do change it, we also want to tweak eg AgentKey, as it 

267 # currently displays agent-ness with a suffix 

268 def __repr__(self): 

269 comment = "" 

270 # Works for AgentKey, may work for others? 

271 if hasattr(self, "comment") and self.comment: 

272 comment = f", comment={self.comment!r}" 

273 return f"PKey(alg={self.algorithm_name}, bits={self.get_bits()}, fp={self.fingerprint}{comment})" # noqa 

274 

275 # TODO 4.0: just merge into __bytes__ (everywhere) 

276 def asbytes(self): 

277 """ 

278 Return a string of an SSH `.Message` made up of the public part(s) of 

279 this key. This string is suitable for passing to `__init__` to 

280 re-create the key object later. 

281 """ 

282 return bytes() 

283 

284 def __bytes__(self): 

285 return self.asbytes() 

286 

287 def __eq__(self, other): 

288 return isinstance(other, PKey) and self._fields == other._fields 

289 

290 def __hash__(self): 

291 return hash(self._fields) 

292 

293 @property 

294 def _fields(self): 

295 raise NotImplementedError 

296 

297 def get_name(self): 

298 """ 

299 Return the name of this private key implementation. 

300 

301 :return: 

302 name of this private key type, in SSH terminology, as a `str` (for 

303 example, ``"ssh-rsa"``). 

304 """ 

305 return "" 

306 

307 @property 

308 def algorithm_name(self): 

309 """ 

310 Return the key algorithm identifier for this key. 

311 

312 Similar to `get_name`, but aimed at pure algorithm name instead of SSH 

313 protocol field value. 

314 """ 

315 # Nuke the leading 'ssh-' 

316 # TODO in Python 3.9: use .removeprefix() 

317 name = self.get_name().replace("ssh-", "") 

318 # Trim any cert suffix (but leave the -cert, as OpenSSH does) 

319 cert_tail = "-cert-v01@openssh.com" 

320 if cert_tail in name: 

321 name = name.replace(cert_tail, "-cert") 

322 # Nuke any eg ECDSA suffix, OpenSSH does basically this too. 

323 else: 

324 name = name.split("-")[0] 

325 return name.upper() 

326 

327 def get_bits(self): 

328 """ 

329 Return the number of significant bits in this key. This is useful 

330 for judging the relative security of a key. 

331 

332 :return: bits in the key (as an `int`) 

333 """ 

334 # TODO 4.0: raise NotImplementedError, 0 is unlikely to ever be 

335 # _correct_ and nothing in the critical path seems to use this. 

336 return 0 

337 

338 def can_sign(self): 

339 """ 

340 Return ``True`` if this key has the private part necessary for signing 

341 data. 

342 """ 

343 return False 

344 

345 def get_fingerprint(self): 

346 """ 

347 Return an MD5 fingerprint of the public part of this key. Nothing 

348 secret is revealed. 

349 

350 :return: 

351 a 16-byte `string <str>` (binary) of the MD5 fingerprint, in SSH 

352 format. 

353 """ 

354 return md5(self.asbytes()).digest() 

355 

356 @property 

357 def fingerprint(self): 

358 """ 

359 Modern fingerprint property designed to be comparable to OpenSSH. 

360 

361 Currently only does SHA256 (the OpenSSH default). 

362 

363 .. versionadded:: 3.2 

364 """ 

365 hashy = sha256(bytes(self)) 

366 hash_name = hashy.name.upper() 

367 b64ed = encodebytes(hashy.digest()) 

368 cleaned = u(b64ed).strip().rstrip("=") # yes, OpenSSH does this too! 

369 return f"{hash_name}:{cleaned}" 

370 

371 def get_base64(self): 

372 """ 

373 Return a base64 string containing the public part of this key. Nothing 

374 secret is revealed. This format is compatible with that used to store 

375 public key files or recognized host keys. 

376 

377 :return: a base64 `string <str>` containing the public part of the key. 

378 """ 

379 return u(encodebytes(self.asbytes())).replace("\n", "") 

380 

381 def sign_ssh_data(self, data, algorithm=None): 

382 """ 

383 Sign a blob of data with this private key, and return a `.Message` 

384 representing an SSH signature message. 

385 

386 :param bytes data: 

387 the data to sign. 

388 :param str algorithm: 

389 the signature algorithm to use, if different from the key's 

390 internal name. Default: ``None``. 

391 :return: an SSH signature `message <.Message>`. 

392 

393 .. versionchanged:: 2.9 

394 Added the ``algorithm`` kwarg. 

395 """ 

396 return bytes() 

397 

398 def verify_ssh_sig(self, data, msg): 

399 """ 

400 Given a blob of data, and an SSH message representing a signature of 

401 that data, verify that it was signed with this key. 

402 

403 :param bytes data: the data that was signed. 

404 :param .Message msg: an SSH signature message 

405 :return: 

406 ``True`` if the signature verifies correctly; ``False`` otherwise. 

407 """ 

408 return False 

409 

410 @classmethod 

411 def from_private_key_file(cls, filename, password=None): 

412 """ 

413 Create a key object by reading a private key file. If the private 

414 key is encrypted and ``password`` is not ``None``, the given password 

415 will be used to decrypt the key (otherwise `.PasswordRequiredException` 

416 is thrown). Through the magic of Python, this factory method will 

417 exist in all subclasses of PKey (such as `.RSAKey`), but 

418 is useless on the abstract PKey class. 

419 

420 :param str filename: name of the file to read 

421 :param str password: 

422 an optional password to use to decrypt the key file, if it's 

423 encrypted 

424 :return: a new `.PKey` based on the given private key 

425 

426 :raises: ``IOError`` -- if there was an error reading the file 

427 :raises: `.PasswordRequiredException` -- if the private key file is 

428 encrypted, and ``password`` is ``None`` 

429 :raises: `.SSHException` -- if the key file is invalid 

430 """ 

431 key = cls(filename=filename, password=password) 

432 return key 

433 

434 @classmethod 

435 def from_private_key(cls, file_obj, password=None): 

436 """ 

437 Create a key object by reading a private key from a file (or file-like) 

438 object. If the private key is encrypted and ``password`` is not 

439 ``None``, the given password will be used to decrypt the key (otherwise 

440 `.PasswordRequiredException` is thrown). 

441 

442 :param file_obj: the file-like object to read from 

443 :param str password: 

444 an optional password to use to decrypt the key, if it's encrypted 

445 :return: a new `.PKey` based on the given private key 

446 

447 :raises: ``IOError`` -- if there was an error reading the key 

448 :raises: `.PasswordRequiredException` -- 

449 if the private key file is encrypted, and ``password`` is ``None`` 

450 :raises: `.SSHException` -- if the key file is invalid 

451 """ 

452 key = cls(file_obj=file_obj, password=password) 

453 return key 

454 

455 def write_private_key_file(self, filename, password=None): 

456 """ 

457 Write private key contents into a file. If the password is not 

458 ``None``, the key is encrypted before writing. 

459 

460 :param str filename: name of the file to write 

461 :param str password: 

462 an optional password to use to encrypt the key file 

463 

464 :raises: ``IOError`` -- if there was an error writing the file 

465 :raises: `.SSHException` -- if the key is invalid 

466 """ 

467 raise Exception("Not implemented in PKey") 

468 

469 def write_private_key(self, file_obj, password=None): 

470 """ 

471 Write private key contents into a file (or file-like) object. If the 

472 password is not ``None``, the key is encrypted before writing. 

473 

474 :param file_obj: the file-like object to write into 

475 :param str password: an optional password to use to encrypt the key 

476 

477 :raises: ``IOError`` -- if there was an error writing to the file 

478 :raises: `.SSHException` -- if the key is invalid 

479 """ 

480 # TODO 4.0: NotImplementedError (plus everywhere else in here) 

481 raise Exception("Not implemented in PKey") 

482 

483 def _read_private_key_file(self, tag, filename, password=None): 

484 """ 

485 Read an SSH2-format private key file, looking for a string of the type 

486 ``"BEGIN xxx PRIVATE KEY"`` for some ``xxx``, base64-decode the text we 

487 find, and return it as a string. If the private key is encrypted and 

488 ``password`` is not ``None``, the given password will be used to 

489 decrypt the key (otherwise `.PasswordRequiredException` is thrown). 

490 

491 :param str tag: 

492 ``"RSA"`` (or etc), the tag used to mark the data block. 

493 :param str filename: 

494 name of the file to read. 

495 :param str password: 

496 an optional password to use to decrypt the key file, if it's 

497 encrypted. 

498 :return: 

499 the `bytes` that make up the private key. 

500 

501 :raises: ``IOError`` -- if there was an error reading the file. 

502 :raises: `.PasswordRequiredException` -- if the private key file is 

503 encrypted, and ``password`` is ``None``. 

504 :raises: `.SSHException` -- if the key file is invalid. 

505 """ 

506 with open(filename, "r") as f: 

507 data = self._read_private_key(tag, f, password) 

508 return data 

509 

510 def _read_private_key(self, tag, f, password=None): 

511 lines = f.readlines() 

512 if not lines: 

513 raise SSHException("no lines in {} private key file".format(tag)) 

514 

515 # find the BEGIN tag 

516 start = 0 

517 m = self.BEGIN_TAG.match(lines[start]) 

518 line_range = len(lines) - 1 

519 while start < line_range and not m: 

520 start += 1 

521 m = self.BEGIN_TAG.match(lines[start]) 

522 start += 1 

523 keytype = m.group(1) if m else None 

524 if start >= len(lines) or keytype is None: 

525 raise SSHException("not a valid {} private key file".format(tag)) 

526 

527 # find the END tag 

528 end = start 

529 m = self.END_TAG.match(lines[end]) 

530 while end < line_range and not m: 

531 end += 1 

532 m = self.END_TAG.match(lines[end]) 

533 

534 if keytype == tag: 

535 data = self._read_private_key_pem(lines, end, password) 

536 pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL 

537 elif keytype == "OPENSSH": 

538 data = self._read_private_key_openssh(lines[start:end], password) 

539 pkformat = self._PRIVATE_KEY_FORMAT_OPENSSH 

540 else: 

541 raise SSHException( 

542 "encountered {} key, expected {} key".format(keytype, tag) 

543 ) 

544 

545 return pkformat, data 

546 

547 def _got_bad_key_format_id(self, id_): 

548 err = "{}._read_private_key() spat out an unknown key format id '{}'" 

549 raise SSHException(err.format(self.__class__.__name__, id_)) 

550 

551 def _read_private_key_pem(self, lines, end, password): 

552 start = 0 

553 # parse any headers first 

554 headers = {} 

555 start += 1 

556 while start < len(lines): 

557 line = lines[start].split(": ") 

558 if len(line) == 1: 

559 break 

560 headers[line[0].lower()] = line[1].strip() 

561 start += 1 

562 # if we trudged to the end of the file, just try to cope. 

563 try: 

564 data = decodebytes(b("".join(lines[start:end]))) 

565 except base64.binascii.Error as e: 

566 raise SSHException("base64 decoding error: {}".format(e)) 

567 if "proc-type" not in headers: 

568 # unencryped: done 

569 return data 

570 # encrypted keyfile: will need a password 

571 proc_type = headers["proc-type"] 

572 if proc_type != "4,ENCRYPTED": 

573 raise SSHException( 

574 'Unknown private key structure "{}"'.format(proc_type) 

575 ) 

576 try: 

577 encryption_type, saltstr = headers["dek-info"].split(",") 

578 except: 

579 raise SSHException("Can't parse DEK-info in private key file") 

580 if encryption_type not in self._CIPHER_TABLE: 

581 raise SSHException( 

582 'Unknown private key cipher "{}"'.format(encryption_type) 

583 ) 

584 # if no password was passed in, 

585 # raise an exception pointing out that we need one 

586 if password is None: 

587 raise PasswordRequiredException("Private key file is encrypted") 

588 cipher = self._CIPHER_TABLE[encryption_type]["cipher"] 

589 keysize = self._CIPHER_TABLE[encryption_type]["keysize"] 

590 mode = self._CIPHER_TABLE[encryption_type]["mode"] 

591 salt = unhexlify(b(saltstr)) 

592 key = util.generate_key_bytes(md5, salt, password, keysize) 

593 decryptor = Cipher( 

594 cipher(key), mode(salt), backend=default_backend() 

595 ).decryptor() 

596 decrypted_data = decryptor.update(data) + decryptor.finalize() 

597 unpadder = padding.PKCS7(cipher.block_size).unpadder() 

598 try: 

599 return unpadder.update(decrypted_data) + unpadder.finalize() 

600 except ValueError: 

601 raise SSHException("Bad password or corrupt private key file") 

602 

603 def _read_private_key_openssh(self, lines, password): 

604 """ 

605 Read the new OpenSSH SSH2 private key format available 

606 since OpenSSH version 6.5 

607 Reference: 

608 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key 

609 """ 

610 try: 

611 data = decodebytes(b("".join(lines))) 

612 except base64.binascii.Error as e: 

613 raise SSHException("base64 decoding error: {}".format(e)) 

614 

615 # read data struct 

616 auth_magic = data[:15] 

617 if auth_magic != OPENSSH_AUTH_MAGIC: 

618 raise SSHException("unexpected OpenSSH key header encountered") 

619 

620 cstruct = self._uint32_cstruct_unpack(data[15:], "sssur") 

621 cipher, kdfname, kdf_options, num_pubkeys, remainder = cstruct 

622 # For now, just support 1 key. 

623 if num_pubkeys > 1: 

624 raise SSHException( 

625 "unsupported: private keyfile has multiple keys" 

626 ) 

627 pubkey, privkey_blob = self._uint32_cstruct_unpack(remainder, "ss") 

628 

629 if kdfname == b("bcrypt"): 

630 if cipher == b("aes256-cbc"): 

631 mode = modes.CBC 

632 elif cipher == b("aes256-ctr"): 

633 mode = modes.CTR 

634 else: 

635 raise SSHException( 

636 "unknown cipher `{}` used in private key file".format( 

637 cipher.decode("utf-8") 

638 ) 

639 ) 

640 # Encrypted private key. 

641 # If no password was passed in, raise an exception pointing 

642 # out that we need one 

643 if password is None: 

644 raise PasswordRequiredException( 

645 "private key file is encrypted" 

646 ) 

647 

648 # Unpack salt and rounds from kdfoptions 

649 salt, rounds = self._uint32_cstruct_unpack(kdf_options, "su") 

650 

651 # run bcrypt kdf to derive key and iv/nonce (32 + 16 bytes) 

652 key_iv = bcrypt.kdf( 

653 b(password), 

654 b(salt), 

655 48, 

656 rounds, 

657 # We can't control how many rounds are on disk, so no sense 

658 # warning about it. 

659 ignore_few_rounds=True, 

660 ) 

661 key = key_iv[:32] 

662 iv = key_iv[32:] 

663 

664 # decrypt private key blob 

665 decryptor = Cipher( 

666 algorithms.AES(key), mode(iv), default_backend() 

667 ).decryptor() 

668 decrypted_privkey = decryptor.update(privkey_blob) 

669 decrypted_privkey += decryptor.finalize() 

670 elif cipher == b("none") and kdfname == b("none"): 

671 # Unencrypted private key 

672 decrypted_privkey = privkey_blob 

673 else: 

674 raise SSHException( 

675 "unknown cipher or kdf used in private key file" 

676 ) 

677 

678 # Unpack private key and verify checkints 

679 cstruct = self._uint32_cstruct_unpack(decrypted_privkey, "uusr") 

680 checkint1, checkint2, keytype, keydata = cstruct 

681 

682 if checkint1 != checkint2: 

683 raise SSHException( 

684 "OpenSSH private key file checkints do not match" 

685 ) 

686 

687 return _unpad_openssh(keydata) 

688 

689 def _uint32_cstruct_unpack(self, data, strformat): 

690 """ 

691 Used to read new OpenSSH private key format. 

692 Unpacks a c data structure containing a mix of 32-bit uints and 

693 variable length strings prefixed by 32-bit uint size field, 

694 according to the specified format. Returns the unpacked vars 

695 in a tuple. 

696 Format strings: 

697 s - denotes a string 

698 i - denotes a long integer, encoded as a byte string 

699 u - denotes a 32-bit unsigned integer 

700 r - the remainder of the input string, returned as a string 

701 """ 

702 arr = [] 

703 idx = 0 

704 try: 

705 for f in strformat: 

706 if f == "s": 

707 # string 

708 s_size = struct.unpack(">L", data[idx : idx + 4])[0] 

709 idx += 4 

710 s = data[idx : idx + s_size] 

711 idx += s_size 

712 arr.append(s) 

713 if f == "i": 

714 # long integer 

715 s_size = struct.unpack(">L", data[idx : idx + 4])[0] 

716 idx += 4 

717 s = data[idx : idx + s_size] 

718 idx += s_size 

719 i = util.inflate_long(s, True) 

720 arr.append(i) 

721 elif f == "u": 

722 # 32-bit unsigned int 

723 u = struct.unpack(">L", data[idx : idx + 4])[0] 

724 idx += 4 

725 arr.append(u) 

726 elif f == "r": 

727 # remainder as string 

728 s = data[idx:] 

729 arr.append(s) 

730 break 

731 except Exception as e: 

732 # PKey-consuming code frequently wants to save-and-skip-over issues 

733 # with loading keys, and uses SSHException as the (really friggin 

734 # awful) signal for this. So for now...we do this. 

735 raise SSHException(str(e)) 

736 return tuple(arr) 

737 

738 def _write_private_key_file(self, filename, key, format, password=None): 

739 """ 

740 Write an SSH2-format private key file in a form that can be read by 

741 paramiko or openssh. If no password is given, the key is written in 

742 a trivially-encoded format (base64) which is completely insecure. If 

743 a password is given, DES-EDE3-CBC is used. 

744 

745 :param str tag: 

746 ``"RSA"`` or etc, the tag used to mark the data block. 

747 :param filename: name of the file to write. 

748 :param bytes data: data blob that makes up the private key. 

749 :param str password: an optional password to use to encrypt the file. 

750 

751 :raises: ``IOError`` -- if there was an error writing the file. 

752 """ 

753 # Ensure that we create new key files directly with a user-only mode, 

754 # instead of opening, writing, then chmodding, which leaves us open to 

755 # CVE-2022-24302. 

756 with os.fdopen( 

757 os.open( 

758 filename, 

759 # NOTE: O_TRUNC is a noop on new files, and O_CREAT is a noop 

760 # on existing files, so using all 3 in both cases is fine. 

761 flags=os.O_WRONLY | os.O_TRUNC | os.O_CREAT, 

762 # Ditto the use of the 'mode' argument; it should be safe to 

763 # give even for existing files (though it will not act like a 

764 # chmod in that case). 

765 mode=o600, 

766 ), 

767 # Yea, you still gotta inform the FLO that it is in "write" mode. 

768 "w", 

769 ) as f: 

770 self._write_private_key(f, key, format, password=password) 

771 

772 def _write_private_key(self, f, key, format, password=None): 

773 if password is None: 

774 encryption = serialization.NoEncryption() 

775 else: 

776 encryption = serialization.BestAvailableEncryption(b(password)) 

777 

778 f.write( 

779 key.private_bytes( 

780 serialization.Encoding.PEM, format, encryption 

781 ).decode() 

782 ) 

783 

784 def _check_type_and_load_cert(self, msg, key_type, cert_type): 

785 """ 

786 Perform message type-checking & optional certificate loading. 

787 

788 This includes fast-forwarding cert ``msg`` objects past the nonce, so 

789 that the subsequent fields are the key numbers; thus the caller may 

790 expect to treat the message as key material afterwards either way. 

791 

792 The obtained key type is returned for classes which need to know what 

793 it was (e.g. ECDSA.) 

794 """ 

795 # Normalization; most classes have a single key type and give a string, 

796 # but eg ECDSA is a 1:N mapping. 

797 key_types = key_type 

798 cert_types = cert_type 

799 if isinstance(key_type, str): 

800 key_types = [key_types] 

801 if isinstance(cert_types, str): 

802 cert_types = [cert_types] 

803 # Can't do much with no message, that should've been handled elsewhere 

804 if msg is None: 

805 raise SSHException("Key object may not be empty") 

806 # First field is always key type, in either kind of object. (make sure 

807 # we rewind before grabbing it - sometimes caller had to do their own 

808 # introspection first!) 

809 msg.rewind() 

810 type_ = msg.get_text() 

811 # Regular public key - nothing special to do besides the implicit 

812 # type check. 

813 if type_ in key_types: 

814 pass 

815 # OpenSSH-compatible certificate - store full copy as .public_blob 

816 # (so signing works correctly) and then fast-forward past the 

817 # nonce. 

818 elif type_ in cert_types: 

819 # This seems the cleanest way to 'clone' an already-being-read 

820 # message; they're *IO objects at heart and their .getvalue() 

821 # always returns the full value regardless of pointer position. 

822 self.load_certificate(Message(msg.asbytes())) 

823 # Read out nonce as it comes before the public numbers - our caller 

824 # is likely going to use the (only borrowed by us, not owned) 

825 # 'msg' object for loading those numbers right after this. 

826 # TODO: usefully interpret it & other non-public-number fields 

827 # (requires going back into per-type subclasses.) 

828 msg.get_string() 

829 else: 

830 err = "Invalid key (class: {}, data type: {}" 

831 raise SSHException(err.format(self.__class__.__name__, type_)) 

832 

833 def load_certificate(self, value): 

834 """ 

835 Supplement the private key contents with data loaded from an OpenSSH 

836 public key (``.pub``) or certificate (``-cert.pub``) file, a string 

837 containing such a file, or a `.Message` object. 

838 

839 The .pub contents adds no real value, since the private key 

840 file includes sufficient information to derive the public 

841 key info. For certificates, however, this can be used on 

842 the client side to offer authentication requests to the server 

843 based on certificate instead of raw public key. 

844 

845 See: 

846 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys 

847 

848 Note: very little effort is made to validate the certificate contents, 

849 that is for the server to decide if it is good enough to authenticate 

850 successfully. 

851 """ 

852 if isinstance(value, Message): 

853 constructor = "from_message" 

854 elif os.path.isfile(value): 

855 constructor = "from_file" 

856 else: 

857 constructor = "from_string" 

858 blob = getattr(PublicBlob, constructor)(value) 

859 if not blob.key_type.startswith(self.get_name()): 

860 err = "PublicBlob type {} incompatible with key type {}" 

861 raise ValueError(err.format(blob.key_type, self.get_name())) 

862 self.public_blob = blob 

863 

864 

865# General construct for an OpenSSH style Public Key blob 

866# readable from a one-line file of the format: 

867# <key-name> <base64-blob> [<comment>] 

868# Of little value in the case of standard public keys 

869# {ssh-rsa, ssh-ecdsa, ssh-ed25519}, but should 

870# provide rudimentary support for {*-cert.v01} 

871class PublicBlob: 

872 """ 

873 OpenSSH plain public key or OpenSSH signed public key (certificate). 

874 

875 Tries to be as dumb as possible and barely cares about specific 

876 per-key-type data. 

877 

878 .. note:: 

879 

880 Most of the time you'll want to call `from_file`, `from_string` or 

881 `from_message` for useful instantiation, the main constructor is 

882 basically "I should be using ``attrs`` for this." 

883 """ 

884 

885 def __init__(self, type_, blob, comment=None): 

886 """ 

887 Create a new public blob of given type and contents. 

888 

889 :param str type_: Type indicator, eg ``ssh-rsa``. 

890 :param bytes blob: The blob bytes themselves. 

891 :param str comment: A comment, if one was given (e.g. file-based.) 

892 """ 

893 self.key_type = type_ 

894 self.key_blob = blob 

895 self.comment = comment 

896 

897 @classmethod 

898 def from_file(cls, filename): 

899 """ 

900 Create a public blob from a ``-cert.pub``-style file on disk. 

901 """ 

902 with open(filename) as f: 

903 string = f.read() 

904 return cls.from_string(string) 

905 

906 @classmethod 

907 def from_string(cls, string): 

908 """ 

909 Create a public blob from a ``-cert.pub``-style string. 

910 """ 

911 fields = string.split(None, 2) 

912 if len(fields) < 2: 

913 msg = "Not enough fields for public blob: {}" 

914 raise ValueError(msg.format(fields)) 

915 key_type = fields[0] 

916 key_blob = decodebytes(b(fields[1])) 

917 try: 

918 comment = fields[2].strip() 

919 except IndexError: 

920 comment = None 

921 # Verify that the blob message first (string) field matches the 

922 # key_type 

923 m = Message(key_blob) 

924 blob_type = m.get_text() 

925 if blob_type != key_type: 

926 deets = "key type={!r}, but blob type={!r}".format( 

927 key_type, blob_type 

928 ) 

929 raise ValueError("Invalid PublicBlob contents: {}".format(deets)) 

930 # All good? All good. 

931 return cls(type_=key_type, blob=key_blob, comment=comment) 

932 

933 @classmethod 

934 def from_message(cls, message): 

935 """ 

936 Create a public blob from a network `.Message`. 

937 

938 Specifically, a cert-bearing pubkey auth packet, because by definition 

939 OpenSSH-style certificates 'are' their own network representation." 

940 """ 

941 type_ = message.get_text() 

942 return cls(type_=type_, blob=message.asbytes()) 

943 

944 def __str__(self): 

945 ret = "{} public key/certificate".format(self.key_type) 

946 if self.comment: 

947 ret += "- {}".format(self.comment) 

948 return ret 

949 

950 def __eq__(self, other): 

951 # Just piggyback on Message/BytesIO, since both of these should be one. 

952 return self and other and self.key_blob == other.key_blob 

953 

954 def __ne__(self, other): 

955 return not self == other