Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/pkey.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

364 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20Common API for all public keys. 

21""" 

22 

23import base64 

24from base64 import encodebytes, decodebytes 

25from binascii import unhexlify 

26import os 

27from pathlib import Path 

28from hashlib import md5, sha256 

29import re 

30import struct 

31 

32import bcrypt 

33 

34from cryptography.hazmat.backends import default_backend 

35from cryptography.hazmat.primitives import padding, serialization 

36from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher 

37from cryptography.hazmat.primitives import asymmetric 

38 

39from paramiko import util 

40from paramiko.util import u, b 

41from paramiko.common import o600 

42from paramiko.ssh_exception import SSHException, PasswordRequiredException 

43from paramiko.message import Message 

44 

45 

46# TripleDES is moving from `cryptography.hazmat.primitives.ciphers.algorithms` 

47# in cryptography>=43.0.0 to `cryptography.hazmat.decrepit.ciphers.algorithms` 

48# It will be removed from `cryptography.hazmat.primitives.ciphers.algorithms` 

49# in cryptography==48.0.0. 

50# 

51# Source References: 

52# - https://github.com/pyca/cryptography/commit/722a6393e61b3ac 

53# - https://github.com/pyca/cryptography/pull/11407/files 

54try: 

55 from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES 

56except ImportError: 

57 from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES 

58 

59 

60OPENSSH_AUTH_MAGIC = b"openssh-key-v1\x00" 

61 

62 

63def _unpad_openssh(data): 

64 # At the moment, this is only used for unpadding private keys on disk. This 

65 # really ought to be made constant time (possibly by upstreaming this logic 

66 # into pyca/cryptography). 

67 padding_length = data[-1] 

68 if 0x20 <= padding_length < 0x7F: 

69 return data # no padding, last byte part comment (printable ascii) 

70 if padding_length > 15: 

71 raise SSHException("Invalid key") 

72 for i in range(padding_length): 

73 if data[i - padding_length] != i + 1: 

74 raise SSHException("Invalid key") 

75 return data[:-padding_length] 

76 

77 

78class UnknownKeyType(Exception): 

79 """ 

80 An unknown public/private key algorithm was attempted to be read. 

81 """ 

82 

83 def __init__(self, key_type=None, key_bytes=None): 

84 self.key_type = key_type 

85 self.key_bytes = key_bytes 

86 

87 def __str__(self): 

88 return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa 

89 

90 

91class PKey: 

92 """ 

93 Base class for public keys. 

94 

95 Also includes some "meta" level convenience constructors such as 

96 `.from_type_string`. 

97 """ 

98 

99 # known encryption types for private key files: 

100 _CIPHER_TABLE = { 

101 "AES-128-CBC": { 

102 "cipher": algorithms.AES, 

103 "keysize": 16, 

104 "blocksize": 16, 

105 "mode": modes.CBC, 

106 }, 

107 "AES-256-CBC": { 

108 "cipher": algorithms.AES, 

109 "keysize": 32, 

110 "blocksize": 16, 

111 "mode": modes.CBC, 

112 }, 

113 "DES-EDE3-CBC": { 

114 "cipher": TripleDES, 

115 "keysize": 24, 

116 "blocksize": 8, 

117 "mode": modes.CBC, 

118 }, 

119 } 

120 _PRIVATE_KEY_FORMAT_ORIGINAL = 1 

121 _PRIVATE_KEY_FORMAT_OPENSSH = 2 

122 BEGIN_TAG = re.compile( 

123 r"^-{5}BEGIN (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$" 

124 ) 

125 END_TAG = re.compile(r"^-{5}END (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$") 

126 

127 @staticmethod 

128 def from_path(path, passphrase=None): 

129 """ 

130 Attempt to instantiate appropriate key subclass from given file path. 

131 

132 :param Path path: The path to load (may also be a `str`). 

133 

134 :returns: 

135 A `PKey` subclass instance. 

136 

137 :raises: 

138 `UnknownKeyType`, if our crypto backend doesn't know this key type. 

139 

140 .. versionadded:: 3.2 

141 """ 

142 # TODO: make sure sphinx is reading Path right in param list... 

143 

144 # Lazy import to avoid circular import issues 

145 from paramiko import DSSKey, RSAKey, Ed25519Key, ECDSAKey 

146 

147 # Normalize to string, as cert suffix isn't quite an extension, so 

148 # pathlib isn't useful for this. 

149 path = str(path) 

150 

151 # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API 

152 # /either/ the key /or/ the cert, when there is a key/cert pair. 

153 cert_suffix = "-cert.pub" 

154 if str(path).endswith(cert_suffix): 

155 key_path = path[: -len(cert_suffix)] 

156 cert_path = path 

157 else: 

158 key_path = path 

159 cert_path = path + cert_suffix 

160 

161 key_path = Path(key_path).expanduser() 

162 cert_path = Path(cert_path).expanduser() 

163 

164 data = key_path.read_bytes() 

165 # Like OpenSSH, try modern/OpenSSH-specific key load first 

166 try: 

167 loaded = serialization.load_ssh_private_key( 

168 data=data, password=passphrase 

169 ) 

170 # Then fall back to assuming legacy PEM type 

171 except ValueError: 

172 loaded = serialization.load_pem_private_key( 

173 data=data, password=passphrase 

174 ) 

175 # TODO Python 3.10: match statement? (NOTE: we cannot use a dict 

176 # because the results from the loader are literal backend, eg openssl, 

177 # private classes, so isinstance tests work but exact 'x class is y' 

178 # tests will not work) 

179 # TODO: leverage already-parsed/math'd obj to avoid duplicate cpu 

180 # cycles? seemingly requires most of our key subclasses to be rewritten 

181 # to be cryptography-object-forward. this is still likely faster than 

182 # the old SSHClient code that just tried instantiating every class! 

183 key_class = None 

184 if isinstance(loaded, asymmetric.dsa.DSAPrivateKey): 

185 key_class = DSSKey 

186 elif isinstance(loaded, asymmetric.rsa.RSAPrivateKey): 

187 key_class = RSAKey 

188 elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey): 

189 key_class = Ed25519Key 

190 elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey): 

191 key_class = ECDSAKey 

192 else: 

193 raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__) 

194 with key_path.open() as fd: 

195 key = key_class.from_private_key(fd, password=passphrase) 

196 if cert_path.exists(): 

197 # load_certificate can take Message, path-str, or value-str 

198 key.load_certificate(str(cert_path)) 

199 return key 

200 

201 @staticmethod 

202 def from_type_string(key_type, key_bytes): 

203 """ 

204 Given type `str` & raw `bytes`, return a `PKey` subclass instance. 

205 

206 For example, ``PKey.from_type_string("ssh-ed25519", <public bytes>)`` 

207 will (if successful) return a new `.Ed25519Key`. 

208 

209 :param str key_type: 

210 The key type, eg ``"ssh-ed25519"``. 

211 :param bytes key_bytes: 

212 The raw byte data forming the key material, as expected by 

213 subclasses' ``data`` parameter. 

214 

215 :returns: 

216 A `PKey` subclass instance. 

217 

218 :raises: 

219 `UnknownKeyType`, if no registered classes knew about this type. 

220 

221 .. versionadded:: 3.2 

222 """ 

223 from paramiko import key_classes 

224 

225 for key_class in key_classes: 

226 if key_type in key_class.identifiers(): 

227 # TODO: needs to passthru things like passphrase 

228 return key_class(data=key_bytes) 

229 raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes) 

230 

231 @classmethod 

232 def identifiers(cls): 

233 """ 

234 returns an iterable of key format/name strings this class can handle. 

235 

236 Most classes only have a single identifier, and thus this default 

237 implementation suffices; see `.ECDSAKey` for one example of an 

238 override. 

239 """ 

240 return [cls.name] 

241 

242 # TODO 4.0: make this and subclasses consistent, some of our own 

243 # classmethods even assume kwargs we don't define! 

244 # TODO 4.0: prob also raise NotImplementedError instead of pass'ing; the 

245 # contract is pretty obviously that you need to handle msg/data/filename 

246 # appropriately. (If 'pass' is a concession to testing, see about doing the 

247 # work to fix the tests instead) 

248 def __init__(self, msg=None, data=None): 

249 """ 

250 Create a new instance of this public key type. If ``msg`` is given, 

251 the key's public part(s) will be filled in from the message. If 

252 ``data`` is given, the key's public part(s) will be filled in from 

253 the string. 

254 

255 :param .Message msg: 

256 an optional SSH `.Message` containing a public key of this type. 

257 :param bytes data: 

258 optional, the bytes of a public key of this type 

259 

260 :raises: `.SSHException` -- 

261 if a key cannot be created from the ``data`` or ``msg`` given, or 

262 no key was passed in. 

263 """ 

264 pass 

265 

266 # TODO: arguably this might want to be __str__ instead? ehh 

267 # TODO: ditto the interplay between showing class name (currently we just 

268 # say PKey writ large) and algorithm (usually == class name, but not 

269 # always, also sometimes shows certificate-ness) 

270 # TODO: if we do change it, we also want to tweak eg AgentKey, as it 

271 # currently displays agent-ness with a suffix 

272 def __repr__(self): 

273 comment = "" 

274 # Works for AgentKey, may work for others? 

275 if hasattr(self, "comment") and self.comment: 

276 comment = f", comment={self.comment!r}" 

277 return f"PKey(alg={self.algorithm_name}, bits={self.get_bits()}, fp={self.fingerprint}{comment})" # noqa 

278 

279 # TODO 4.0: just merge into __bytes__ (everywhere) 

280 def asbytes(self): 

281 """ 

282 Return a string of an SSH `.Message` made up of the public part(s) of 

283 this key. This string is suitable for passing to `__init__` to 

284 re-create the key object later. 

285 """ 

286 return bytes() 

287 

288 def __bytes__(self): 

289 return self.asbytes() 

290 

291 def __eq__(self, other): 

292 return isinstance(other, PKey) and self._fields == other._fields 

293 

294 def __hash__(self): 

295 return hash(self._fields) 

296 

297 @property 

298 def _fields(self): 

299 raise NotImplementedError 

300 

301 def get_name(self): 

302 """ 

303 Return the name of this private key implementation. 

304 

305 :return: 

306 name of this private key type, in SSH terminology, as a `str` (for 

307 example, ``"ssh-rsa"``). 

308 """ 

309 return "" 

310 

311 @property 

312 def algorithm_name(self): 

313 """ 

314 Return the key algorithm identifier for this key. 

315 

316 Similar to `get_name`, but aimed at pure algorithm name instead of SSH 

317 protocol field value. 

318 """ 

319 # Nuke the leading 'ssh-' 

320 # TODO in Python 3.9: use .removeprefix() 

321 name = self.get_name().replace("ssh-", "") 

322 # Trim any cert suffix (but leave the -cert, as OpenSSH does) 

323 cert_tail = "-cert-v01@openssh.com" 

324 if cert_tail in name: 

325 name = name.replace(cert_tail, "-cert") 

326 # Nuke any eg ECDSA suffix, OpenSSH does basically this too. 

327 else: 

328 name = name.split("-")[0] 

329 return name.upper() 

330 

331 def get_bits(self): 

332 """ 

333 Return the number of significant bits in this key. This is useful 

334 for judging the relative security of a key. 

335 

336 :return: bits in the key (as an `int`) 

337 """ 

338 # TODO 4.0: raise NotImplementedError, 0 is unlikely to ever be 

339 # _correct_ and nothing in the critical path seems to use this. 

340 return 0 

341 

342 def can_sign(self): 

343 """ 

344 Return ``True`` if this key has the private part necessary for signing 

345 data. 

346 """ 

347 return False 

348 

349 def get_fingerprint(self): 

350 """ 

351 Return an MD5 fingerprint of the public part of this key. Nothing 

352 secret is revealed. 

353 

354 :return: 

355 a 16-byte `string <str>` (binary) of the MD5 fingerprint, in SSH 

356 format. 

357 """ 

358 return md5(self.asbytes()).digest() 

359 

360 @property 

361 def fingerprint(self): 

362 """ 

363 Modern fingerprint property designed to be comparable to OpenSSH. 

364 

365 Currently only does SHA256 (the OpenSSH default). 

366 

367 .. versionadded:: 3.2 

368 """ 

369 hashy = sha256(bytes(self)) 

370 hash_name = hashy.name.upper() 

371 b64ed = encodebytes(hashy.digest()) 

372 cleaned = u(b64ed).strip().rstrip("=") # yes, OpenSSH does this too! 

373 return f"{hash_name}:{cleaned}" 

374 

375 def get_base64(self): 

376 """ 

377 Return a base64 string containing the public part of this key. Nothing 

378 secret is revealed. This format is compatible with that used to store 

379 public key files or recognized host keys. 

380 

381 :return: a base64 `string <str>` containing the public part of the key. 

382 """ 

383 return u(encodebytes(self.asbytes())).replace("\n", "") 

384 

385 def sign_ssh_data(self, data, algorithm=None): 

386 """ 

387 Sign a blob of data with this private key, and return a `.Message` 

388 representing an SSH signature message. 

389 

390 :param bytes data: 

391 the data to sign. 

392 :param str algorithm: 

393 the signature algorithm to use, if different from the key's 

394 internal name. Default: ``None``. 

395 :return: an SSH signature `message <.Message>`. 

396 

397 .. versionchanged:: 2.9 

398 Added the ``algorithm`` kwarg. 

399 """ 

400 return bytes() 

401 

402 def verify_ssh_sig(self, data, msg): 

403 """ 

404 Given a blob of data, and an SSH message representing a signature of 

405 that data, verify that it was signed with this key. 

406 

407 :param bytes data: the data that was signed. 

408 :param .Message msg: an SSH signature message 

409 :return: 

410 ``True`` if the signature verifies correctly; ``False`` otherwise. 

411 """ 

412 return False 

413 

414 @classmethod 

415 def from_private_key_file(cls, filename, password=None): 

416 """ 

417 Create a key object by reading a private key file. If the private 

418 key is encrypted and ``password`` is not ``None``, the given password 

419 will be used to decrypt the key (otherwise `.PasswordRequiredException` 

420 is thrown). Through the magic of Python, this factory method will 

421 exist in all subclasses of PKey (such as `.RSAKey` or `.DSSKey`), but 

422 is useless on the abstract PKey class. 

423 

424 :param str filename: name of the file to read 

425 :param str password: 

426 an optional password to use to decrypt the key file, if it's 

427 encrypted 

428 :return: a new `.PKey` based on the given private key 

429 

430 :raises: ``IOError`` -- if there was an error reading the file 

431 :raises: `.PasswordRequiredException` -- if the private key file is 

432 encrypted, and ``password`` is ``None`` 

433 :raises: `.SSHException` -- if the key file is invalid 

434 """ 

435 key = cls(filename=filename, password=password) 

436 return key 

437 

438 @classmethod 

439 def from_private_key(cls, file_obj, password=None): 

440 """ 

441 Create a key object by reading a private key from a file (or file-like) 

442 object. If the private key is encrypted and ``password`` is not 

443 ``None``, the given password will be used to decrypt the key (otherwise 

444 `.PasswordRequiredException` is thrown). 

445 

446 :param file_obj: the file-like object to read from 

447 :param str password: 

448 an optional password to use to decrypt the key, if it's encrypted 

449 :return: a new `.PKey` based on the given private key 

450 

451 :raises: ``IOError`` -- if there was an error reading the key 

452 :raises: `.PasswordRequiredException` -- 

453 if the private key file is encrypted, and ``password`` is ``None`` 

454 :raises: `.SSHException` -- if the key file is invalid 

455 """ 

456 key = cls(file_obj=file_obj, password=password) 

457 return key 

458 

459 def write_private_key_file(self, filename, password=None): 

460 """ 

461 Write private key contents into a file. If the password is not 

462 ``None``, the key is encrypted before writing. 

463 

464 :param str filename: name of the file to write 

465 :param str password: 

466 an optional password to use to encrypt the key file 

467 

468 :raises: ``IOError`` -- if there was an error writing the file 

469 :raises: `.SSHException` -- if the key is invalid 

470 """ 

471 raise Exception("Not implemented in PKey") 

472 

473 def write_private_key(self, file_obj, password=None): 

474 """ 

475 Write private key contents into a file (or file-like) object. If the 

476 password is not ``None``, the key is encrypted before writing. 

477 

478 :param file_obj: the file-like object to write into 

479 :param str password: an optional password to use to encrypt the key 

480 

481 :raises: ``IOError`` -- if there was an error writing to the file 

482 :raises: `.SSHException` -- if the key is invalid 

483 """ 

484 # TODO 4.0: NotImplementedError (plus everywhere else in here) 

485 raise Exception("Not implemented in PKey") 

486 

487 def _read_private_key_file(self, tag, filename, password=None): 

488 """ 

489 Read an SSH2-format private key file, looking for a string of the type 

490 ``"BEGIN xxx PRIVATE KEY"`` for some ``xxx``, base64-decode the text we 

491 find, and return it as a string. If the private key is encrypted and 

492 ``password`` is not ``None``, the given password will be used to 

493 decrypt the key (otherwise `.PasswordRequiredException` is thrown). 

494 

495 :param str tag: ``"RSA"`` or ``"DSA"``, the tag used to mark the 

496 data block. 

497 :param str filename: name of the file to read. 

498 :param str password: 

499 an optional password to use to decrypt the key file, if it's 

500 encrypted. 

501 :return: the `bytes` that make up the private key. 

502 

503 :raises: ``IOError`` -- if there was an error reading the file. 

504 :raises: `.PasswordRequiredException` -- if the private key file is 

505 encrypted, and ``password`` is ``None``. 

506 :raises: `.SSHException` -- if the key file is invalid. 

507 """ 

508 with open(filename, "r") as f: 

509 data = self._read_private_key(tag, f, password) 

510 return data 

511 

512 def _read_private_key(self, tag, f, password=None): 

513 lines = f.readlines() 

514 if not lines: 

515 raise SSHException("no lines in {} private key file".format(tag)) 

516 

517 # find the BEGIN tag 

518 start = 0 

519 m = self.BEGIN_TAG.match(lines[start]) 

520 line_range = len(lines) - 1 

521 while start < line_range and not m: 

522 start += 1 

523 m = self.BEGIN_TAG.match(lines[start]) 

524 start += 1 

525 keytype = m.group(1) if m else None 

526 if start >= len(lines) or keytype is None: 

527 raise SSHException("not a valid {} private key file".format(tag)) 

528 

529 # find the END tag 

530 end = start 

531 m = self.END_TAG.match(lines[end]) 

532 while end < line_range and not m: 

533 end += 1 

534 m = self.END_TAG.match(lines[end]) 

535 

536 if keytype == tag: 

537 data = self._read_private_key_pem(lines, end, password) 

538 pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL 

539 elif keytype == "OPENSSH": 

540 data = self._read_private_key_openssh(lines[start:end], password) 

541 pkformat = self._PRIVATE_KEY_FORMAT_OPENSSH 

542 else: 

543 raise SSHException( 

544 "encountered {} key, expected {} key".format(keytype, tag) 

545 ) 

546 

547 return pkformat, data 

548 

549 def _got_bad_key_format_id(self, id_): 

550 err = "{}._read_private_key() spat out an unknown key format id '{}'" 

551 raise SSHException(err.format(self.__class__.__name__, id_)) 

552 

553 def _read_private_key_pem(self, lines, end, password): 

554 start = 0 

555 # parse any headers first 

556 headers = {} 

557 start += 1 

558 while start < len(lines): 

559 line = lines[start].split(": ") 

560 if len(line) == 1: 

561 break 

562 headers[line[0].lower()] = line[1].strip() 

563 start += 1 

564 # if we trudged to the end of the file, just try to cope. 

565 try: 

566 data = decodebytes(b("".join(lines[start:end]))) 

567 except base64.binascii.Error as e: 

568 raise SSHException("base64 decoding error: {}".format(e)) 

569 if "proc-type" not in headers: 

570 # unencryped: done 

571 return data 

572 # encrypted keyfile: will need a password 

573 proc_type = headers["proc-type"] 

574 if proc_type != "4,ENCRYPTED": 

575 raise SSHException( 

576 'Unknown private key structure "{}"'.format(proc_type) 

577 ) 

578 try: 

579 encryption_type, saltstr = headers["dek-info"].split(",") 

580 except: 

581 raise SSHException("Can't parse DEK-info in private key file") 

582 if encryption_type not in self._CIPHER_TABLE: 

583 raise SSHException( 

584 'Unknown private key cipher "{}"'.format(encryption_type) 

585 ) 

586 # if no password was passed in, 

587 # raise an exception pointing out that we need one 

588 if password is None: 

589 raise PasswordRequiredException("Private key file is encrypted") 

590 cipher = self._CIPHER_TABLE[encryption_type]["cipher"] 

591 keysize = self._CIPHER_TABLE[encryption_type]["keysize"] 

592 mode = self._CIPHER_TABLE[encryption_type]["mode"] 

593 salt = unhexlify(b(saltstr)) 

594 key = util.generate_key_bytes(md5, salt, password, keysize) 

595 decryptor = Cipher( 

596 cipher(key), mode(salt), backend=default_backend() 

597 ).decryptor() 

598 decrypted_data = decryptor.update(data) + decryptor.finalize() 

599 unpadder = padding.PKCS7(cipher.block_size).unpadder() 

600 try: 

601 return unpadder.update(decrypted_data) + unpadder.finalize() 

602 except ValueError: 

603 raise SSHException("Bad password or corrupt private key file") 

604 

605 def _read_private_key_openssh(self, lines, password): 

606 """ 

607 Read the new OpenSSH SSH2 private key format available 

608 since OpenSSH version 6.5 

609 Reference: 

610 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key 

611 """ 

612 try: 

613 data = decodebytes(b("".join(lines))) 

614 except base64.binascii.Error as e: 

615 raise SSHException("base64 decoding error: {}".format(e)) 

616 

617 # read data struct 

618 auth_magic = data[:15] 

619 if auth_magic != OPENSSH_AUTH_MAGIC: 

620 raise SSHException("unexpected OpenSSH key header encountered") 

621 

622 cstruct = self._uint32_cstruct_unpack(data[15:], "sssur") 

623 cipher, kdfname, kdf_options, num_pubkeys, remainder = cstruct 

624 # For now, just support 1 key. 

625 if num_pubkeys > 1: 

626 raise SSHException( 

627 "unsupported: private keyfile has multiple keys" 

628 ) 

629 pubkey, privkey_blob = self._uint32_cstruct_unpack(remainder, "ss") 

630 

631 if kdfname == b("bcrypt"): 

632 if cipher == b("aes256-cbc"): 

633 mode = modes.CBC 

634 elif cipher == b("aes256-ctr"): 

635 mode = modes.CTR 

636 else: 

637 raise SSHException( 

638 "unknown cipher `{}` used in private key file".format( 

639 cipher.decode("utf-8") 

640 ) 

641 ) 

642 # Encrypted private key. 

643 # If no password was passed in, raise an exception pointing 

644 # out that we need one 

645 if password is None: 

646 raise PasswordRequiredException( 

647 "private key file is encrypted" 

648 ) 

649 

650 # Unpack salt and rounds from kdfoptions 

651 salt, rounds = self._uint32_cstruct_unpack(kdf_options, "su") 

652 

653 # run bcrypt kdf to derive key and iv/nonce (32 + 16 bytes) 

654 key_iv = bcrypt.kdf( 

655 b(password), 

656 b(salt), 

657 48, 

658 rounds, 

659 # We can't control how many rounds are on disk, so no sense 

660 # warning about it. 

661 ignore_few_rounds=True, 

662 ) 

663 key = key_iv[:32] 

664 iv = key_iv[32:] 

665 

666 # decrypt private key blob 

667 decryptor = Cipher( 

668 algorithms.AES(key), mode(iv), default_backend() 

669 ).decryptor() 

670 decrypted_privkey = decryptor.update(privkey_blob) 

671 decrypted_privkey += decryptor.finalize() 

672 elif cipher == b("none") and kdfname == b("none"): 

673 # Unencrypted private key 

674 decrypted_privkey = privkey_blob 

675 else: 

676 raise SSHException( 

677 "unknown cipher or kdf used in private key file" 

678 ) 

679 

680 # Unpack private key and verify checkints 

681 cstruct = self._uint32_cstruct_unpack(decrypted_privkey, "uusr") 

682 checkint1, checkint2, keytype, keydata = cstruct 

683 

684 if checkint1 != checkint2: 

685 raise SSHException( 

686 "OpenSSH private key file checkints do not match" 

687 ) 

688 

689 return _unpad_openssh(keydata) 

690 

691 def _uint32_cstruct_unpack(self, data, strformat): 

692 """ 

693 Used to read new OpenSSH private key format. 

694 Unpacks a c data structure containing a mix of 32-bit uints and 

695 variable length strings prefixed by 32-bit uint size field, 

696 according to the specified format. Returns the unpacked vars 

697 in a tuple. 

698 Format strings: 

699 s - denotes a string 

700 i - denotes a long integer, encoded as a byte string 

701 u - denotes a 32-bit unsigned integer 

702 r - the remainder of the input string, returned as a string 

703 """ 

704 arr = [] 

705 idx = 0 

706 try: 

707 for f in strformat: 

708 if f == "s": 

709 # string 

710 s_size = struct.unpack(">L", data[idx : idx + 4])[0] 

711 idx += 4 

712 s = data[idx : idx + s_size] 

713 idx += s_size 

714 arr.append(s) 

715 if f == "i": 

716 # long integer 

717 s_size = struct.unpack(">L", data[idx : idx + 4])[0] 

718 idx += 4 

719 s = data[idx : idx + s_size] 

720 idx += s_size 

721 i = util.inflate_long(s, True) 

722 arr.append(i) 

723 elif f == "u": 

724 # 32-bit unsigned int 

725 u = struct.unpack(">L", data[idx : idx + 4])[0] 

726 idx += 4 

727 arr.append(u) 

728 elif f == "r": 

729 # remainder as string 

730 s = data[idx:] 

731 arr.append(s) 

732 break 

733 except Exception as e: 

734 # PKey-consuming code frequently wants to save-and-skip-over issues 

735 # with loading keys, and uses SSHException as the (really friggin 

736 # awful) signal for this. So for now...we do this. 

737 raise SSHException(str(e)) 

738 return tuple(arr) 

739 

740 def _write_private_key_file(self, filename, key, format, password=None): 

741 """ 

742 Write an SSH2-format private key file in a form that can be read by 

743 paramiko or openssh. If no password is given, the key is written in 

744 a trivially-encoded format (base64) which is completely insecure. If 

745 a password is given, DES-EDE3-CBC is used. 

746 

747 :param str tag: 

748 ``"RSA"`` or ``"DSA"``, the tag used to mark the data block. 

749 :param filename: name of the file to write. 

750 :param bytes data: data blob that makes up the private key. 

751 :param str password: an optional password to use to encrypt the file. 

752 

753 :raises: ``IOError`` -- if there was an error writing the file. 

754 """ 

755 # Ensure that we create new key files directly with a user-only mode, 

756 # instead of opening, writing, then chmodding, which leaves us open to 

757 # CVE-2022-24302. 

758 with os.fdopen( 

759 os.open( 

760 filename, 

761 # NOTE: O_TRUNC is a noop on new files, and O_CREAT is a noop 

762 # on existing files, so using all 3 in both cases is fine. 

763 flags=os.O_WRONLY | os.O_TRUNC | os.O_CREAT, 

764 # Ditto the use of the 'mode' argument; it should be safe to 

765 # give even for existing files (though it will not act like a 

766 # chmod in that case). 

767 mode=o600, 

768 ), 

769 # Yea, you still gotta inform the FLO that it is in "write" mode. 

770 "w", 

771 ) as f: 

772 self._write_private_key(f, key, format, password=password) 

773 

774 def _write_private_key(self, f, key, format, password=None): 

775 if password is None: 

776 encryption = serialization.NoEncryption() 

777 else: 

778 encryption = serialization.BestAvailableEncryption(b(password)) 

779 

780 f.write( 

781 key.private_bytes( 

782 serialization.Encoding.PEM, format, encryption 

783 ).decode() 

784 ) 

785 

786 def _check_type_and_load_cert(self, msg, key_type, cert_type): 

787 """ 

788 Perform message type-checking & optional certificate loading. 

789 

790 This includes fast-forwarding cert ``msg`` objects past the nonce, so 

791 that the subsequent fields are the key numbers; thus the caller may 

792 expect to treat the message as key material afterwards either way. 

793 

794 The obtained key type is returned for classes which need to know what 

795 it was (e.g. ECDSA.) 

796 """ 

797 # Normalization; most classes have a single key type and give a string, 

798 # but eg ECDSA is a 1:N mapping. 

799 key_types = key_type 

800 cert_types = cert_type 

801 if isinstance(key_type, str): 

802 key_types = [key_types] 

803 if isinstance(cert_types, str): 

804 cert_types = [cert_types] 

805 # Can't do much with no message, that should've been handled elsewhere 

806 if msg is None: 

807 raise SSHException("Key object may not be empty") 

808 # First field is always key type, in either kind of object. (make sure 

809 # we rewind before grabbing it - sometimes caller had to do their own 

810 # introspection first!) 

811 msg.rewind() 

812 type_ = msg.get_text() 

813 # Regular public key - nothing special to do besides the implicit 

814 # type check. 

815 if type_ in key_types: 

816 pass 

817 # OpenSSH-compatible certificate - store full copy as .public_blob 

818 # (so signing works correctly) and then fast-forward past the 

819 # nonce. 

820 elif type_ in cert_types: 

821 # This seems the cleanest way to 'clone' an already-being-read 

822 # message; they're *IO objects at heart and their .getvalue() 

823 # always returns the full value regardless of pointer position. 

824 self.load_certificate(Message(msg.asbytes())) 

825 # Read out nonce as it comes before the public numbers - our caller 

826 # is likely going to use the (only borrowed by us, not owned) 

827 # 'msg' object for loading those numbers right after this. 

828 # TODO: usefully interpret it & other non-public-number fields 

829 # (requires going back into per-type subclasses.) 

830 msg.get_string() 

831 else: 

832 err = "Invalid key (class: {}, data type: {}" 

833 raise SSHException(err.format(self.__class__.__name__, type_)) 

834 

835 def load_certificate(self, value): 

836 """ 

837 Supplement the private key contents with data loaded from an OpenSSH 

838 public key (``.pub``) or certificate (``-cert.pub``) file, a string 

839 containing such a file, or a `.Message` object. 

840 

841 The .pub contents adds no real value, since the private key 

842 file includes sufficient information to derive the public 

843 key info. For certificates, however, this can be used on 

844 the client side to offer authentication requests to the server 

845 based on certificate instead of raw public key. 

846 

847 See: 

848 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys 

849 

850 Note: very little effort is made to validate the certificate contents, 

851 that is for the server to decide if it is good enough to authenticate 

852 successfully. 

853 """ 

854 if isinstance(value, Message): 

855 constructor = "from_message" 

856 elif os.path.isfile(value): 

857 constructor = "from_file" 

858 else: 

859 constructor = "from_string" 

860 blob = getattr(PublicBlob, constructor)(value) 

861 if not blob.key_type.startswith(self.get_name()): 

862 err = "PublicBlob type {} incompatible with key type {}" 

863 raise ValueError(err.format(blob.key_type, self.get_name())) 

864 self.public_blob = blob 

865 

866 

867# General construct for an OpenSSH style Public Key blob 

868# readable from a one-line file of the format: 

869# <key-name> <base64-blob> [<comment>] 

870# Of little value in the case of standard public keys 

871# {ssh-rsa, ssh-dss, ssh-ecdsa, ssh-ed25519}, but should 

872# provide rudimentary support for {*-cert.v01} 

873class PublicBlob: 

874 """ 

875 OpenSSH plain public key or OpenSSH signed public key (certificate). 

876 

877 Tries to be as dumb as possible and barely cares about specific 

878 per-key-type data. 

879 

880 .. note:: 

881 

882 Most of the time you'll want to call `from_file`, `from_string` or 

883 `from_message` for useful instantiation, the main constructor is 

884 basically "I should be using ``attrs`` for this." 

885 """ 

886 

887 def __init__(self, type_, blob, comment=None): 

888 """ 

889 Create a new public blob of given type and contents. 

890 

891 :param str type_: Type indicator, eg ``ssh-rsa``. 

892 :param bytes blob: The blob bytes themselves. 

893 :param str comment: A comment, if one was given (e.g. file-based.) 

894 """ 

895 self.key_type = type_ 

896 self.key_blob = blob 

897 self.comment = comment 

898 

899 @classmethod 

900 def from_file(cls, filename): 

901 """ 

902 Create a public blob from a ``-cert.pub``-style file on disk. 

903 """ 

904 with open(filename) as f: 

905 string = f.read() 

906 return cls.from_string(string) 

907 

908 @classmethod 

909 def from_string(cls, string): 

910 """ 

911 Create a public blob from a ``-cert.pub``-style string. 

912 """ 

913 fields = string.split(None, 2) 

914 if len(fields) < 2: 

915 msg = "Not enough fields for public blob: {}" 

916 raise ValueError(msg.format(fields)) 

917 key_type = fields[0] 

918 key_blob = decodebytes(b(fields[1])) 

919 try: 

920 comment = fields[2].strip() 

921 except IndexError: 

922 comment = None 

923 # Verify that the blob message first (string) field matches the 

924 # key_type 

925 m = Message(key_blob) 

926 blob_type = m.get_text() 

927 if blob_type != key_type: 

928 deets = "key type={!r}, but blob type={!r}".format( 

929 key_type, blob_type 

930 ) 

931 raise ValueError("Invalid PublicBlob contents: {}".format(deets)) 

932 # All good? All good. 

933 return cls(type_=key_type, blob=key_blob, comment=comment) 

934 

935 @classmethod 

936 def from_message(cls, message): 

937 """ 

938 Create a public blob from a network `.Message`. 

939 

940 Specifically, a cert-bearing pubkey auth packet, because by definition 

941 OpenSSH-style certificates 'are' their own network representation." 

942 """ 

943 type_ = message.get_text() 

944 return cls(type_=type_, blob=message.asbytes()) 

945 

946 def __str__(self): 

947 ret = "{} public key/certificate".format(self.key_type) 

948 if self.comment: 

949 ret += "- {}".format(self.comment) 

950 return ret 

951 

952 def __eq__(self, other): 

953 # Just piggyback on Message/BytesIO, since both of these should be one. 

954 return self and other and self.key_blob == other.key_blob 

955 

956 def __ne__(self, other): 

957 return not self == other