Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/pkey.py: 23%

284 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20Common API for all public keys. 

21""" 

22 

23import base64 

24from base64 import encodebytes, decodebytes 

25from binascii import unhexlify 

26import os 

27from hashlib import md5 

28import re 

29import struct 

30 

31import bcrypt 

32 

33from cryptography.hazmat.backends import default_backend 

34from cryptography.hazmat.primitives import serialization 

35from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher 

36 

37from paramiko import util 

38from paramiko.util import u, b 

39from paramiko.common import o600 

40from paramiko.ssh_exception import SSHException, PasswordRequiredException 

41from paramiko.message import Message 

42 

43 

44OPENSSH_AUTH_MAGIC = b"openssh-key-v1\x00" 

45 

46 

47def _unpad_openssh(data): 

48 # At the moment, this is only used for unpadding private keys on disk. This 

49 # really ought to be made constant time (possibly by upstreaming this logic 

50 # into pyca/cryptography). 

51 padding_length = data[-1] 

52 if 0x20 <= padding_length < 0x7F: 

53 return data # no padding, last byte part comment (printable ascii) 

54 if padding_length > 15: 

55 raise SSHException("Invalid key") 

56 for i in range(padding_length): 

57 if data[i - padding_length] != i + 1: 

58 raise SSHException("Invalid key") 

59 return data[:-padding_length] 

60 

61 

62class PKey: 

63 """ 

64 Base class for public keys. 

65 """ 

66 

67 # known encryption types for private key files: 

68 _CIPHER_TABLE = { 

69 "AES-128-CBC": { 

70 "cipher": algorithms.AES, 

71 "keysize": 16, 

72 "blocksize": 16, 

73 "mode": modes.CBC, 

74 }, 

75 "AES-256-CBC": { 

76 "cipher": algorithms.AES, 

77 "keysize": 32, 

78 "blocksize": 16, 

79 "mode": modes.CBC, 

80 }, 

81 "DES-EDE3-CBC": { 

82 "cipher": algorithms.TripleDES, 

83 "keysize": 24, 

84 "blocksize": 8, 

85 "mode": modes.CBC, 

86 }, 

87 } 

88 _PRIVATE_KEY_FORMAT_ORIGINAL = 1 

89 _PRIVATE_KEY_FORMAT_OPENSSH = 2 

90 BEGIN_TAG = re.compile( 

91 r"^-{5}BEGIN (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$" 

92 ) 

93 END_TAG = re.compile(r"^-{5}END (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$") 

94 

95 def __init__(self, msg=None, data=None): 

96 """ 

97 Create a new instance of this public key type. If ``msg`` is given, 

98 the key's public part(s) will be filled in from the message. If 

99 ``data`` is given, the key's public part(s) will be filled in from 

100 the string. 

101 

102 :param .Message msg: 

103 an optional SSH `.Message` containing a public key of this type. 

104 :param str data: an optional string containing a public key 

105 of this type 

106 

107 :raises: `.SSHException` -- 

108 if a key cannot be created from the ``data`` or ``msg`` given, or 

109 no key was passed in. 

110 """ 

111 pass 

112 

113 # TODO 4.0: just merge into __bytes__ (everywhere) 

114 def asbytes(self): 

115 """ 

116 Return a string of an SSH `.Message` made up of the public part(s) of 

117 this key. This string is suitable for passing to `__init__` to 

118 re-create the key object later. 

119 """ 

120 return bytes() 

121 

122 def __bytes__(self): 

123 return self.asbytes() 

124 

125 def __eq__(self, other): 

126 return isinstance(other, PKey) and self._fields == other._fields 

127 

128 def __hash__(self): 

129 return hash(self._fields) 

130 

131 @property 

132 def _fields(self): 

133 raise NotImplementedError 

134 

135 def get_name(self): 

136 """ 

137 Return the name of this private key implementation. 

138 

139 :return: 

140 name of this private key type, in SSH terminology, as a `str` (for 

141 example, ``"ssh-rsa"``). 

142 """ 

143 return "" 

144 

145 def get_bits(self): 

146 """ 

147 Return the number of significant bits in this key. This is useful 

148 for judging the relative security of a key. 

149 

150 :return: bits in the key (as an `int`) 

151 """ 

152 return 0 

153 

154 def can_sign(self): 

155 """ 

156 Return ``True`` if this key has the private part necessary for signing 

157 data. 

158 """ 

159 return False 

160 

161 def get_fingerprint(self): 

162 """ 

163 Return an MD5 fingerprint of the public part of this key. Nothing 

164 secret is revealed. 

165 

166 :return: 

167 a 16-byte `string <str>` (binary) of the MD5 fingerprint, in SSH 

168 format. 

169 """ 

170 return md5(self.asbytes()).digest() 

171 

172 def get_base64(self): 

173 """ 

174 Return a base64 string containing the public part of this key. Nothing 

175 secret is revealed. This format is compatible with that used to store 

176 public key files or recognized host keys. 

177 

178 :return: a base64 `string <str>` containing the public part of the key. 

179 """ 

180 return u(encodebytes(self.asbytes())).replace("\n", "") 

181 

182 def sign_ssh_data(self, data, algorithm=None): 

183 """ 

184 Sign a blob of data with this private key, and return a `.Message` 

185 representing an SSH signature message. 

186 

187 :param bytes data: 

188 the data to sign. 

189 :param str algorithm: 

190 the signature algorithm to use, if different from the key's 

191 internal name. Default: ``None``. 

192 :return: an SSH signature `message <.Message>`. 

193 

194 .. versionchanged:: 2.9 

195 Added the ``algorithm`` kwarg. 

196 """ 

197 return bytes() 

198 

199 def verify_ssh_sig(self, data, msg): 

200 """ 

201 Given a blob of data, and an SSH message representing a signature of 

202 that data, verify that it was signed with this key. 

203 

204 :param bytes data: the data that was signed. 

205 :param .Message msg: an SSH signature message 

206 :return: 

207 ``True`` if the signature verifies correctly; ``False`` otherwise. 

208 """ 

209 return False 

210 

211 @classmethod 

212 def from_private_key_file(cls, filename, password=None): 

213 """ 

214 Create a key object by reading a private key file. If the private 

215 key is encrypted and ``password`` is not ``None``, the given password 

216 will be used to decrypt the key (otherwise `.PasswordRequiredException` 

217 is thrown). Through the magic of Python, this factory method will 

218 exist in all subclasses of PKey (such as `.RSAKey` or `.DSSKey`), but 

219 is useless on the abstract PKey class. 

220 

221 :param str filename: name of the file to read 

222 :param str password: 

223 an optional password to use to decrypt the key file, if it's 

224 encrypted 

225 :return: a new `.PKey` based on the given private key 

226 

227 :raises: ``IOError`` -- if there was an error reading the file 

228 :raises: `.PasswordRequiredException` -- if the private key file is 

229 encrypted, and ``password`` is ``None`` 

230 :raises: `.SSHException` -- if the key file is invalid 

231 """ 

232 key = cls(filename=filename, password=password) 

233 return key 

234 

235 @classmethod 

236 def from_private_key(cls, file_obj, password=None): 

237 """ 

238 Create a key object by reading a private key from a file (or file-like) 

239 object. If the private key is encrypted and ``password`` is not 

240 ``None``, the given password will be used to decrypt the key (otherwise 

241 `.PasswordRequiredException` is thrown). 

242 

243 :param file_obj: the file-like object to read from 

244 :param str password: 

245 an optional password to use to decrypt the key, if it's encrypted 

246 :return: a new `.PKey` based on the given private key 

247 

248 :raises: ``IOError`` -- if there was an error reading the key 

249 :raises: `.PasswordRequiredException` -- 

250 if the private key file is encrypted, and ``password`` is ``None`` 

251 :raises: `.SSHException` -- if the key file is invalid 

252 """ 

253 key = cls(file_obj=file_obj, password=password) 

254 return key 

255 

256 def write_private_key_file(self, filename, password=None): 

257 """ 

258 Write private key contents into a file. If the password is not 

259 ``None``, the key is encrypted before writing. 

260 

261 :param str filename: name of the file to write 

262 :param str password: 

263 an optional password to use to encrypt the key file 

264 

265 :raises: ``IOError`` -- if there was an error writing the file 

266 :raises: `.SSHException` -- if the key is invalid 

267 """ 

268 raise Exception("Not implemented in PKey") 

269 

270 def write_private_key(self, file_obj, password=None): 

271 """ 

272 Write private key contents into a file (or file-like) object. If the 

273 password is not ``None``, the key is encrypted before writing. 

274 

275 :param file_obj: the file-like object to write into 

276 :param str password: an optional password to use to encrypt the key 

277 

278 :raises: ``IOError`` -- if there was an error writing to the file 

279 :raises: `.SSHException` -- if the key is invalid 

280 """ 

281 raise Exception("Not implemented in PKey") 

282 

283 def _read_private_key_file(self, tag, filename, password=None): 

284 """ 

285 Read an SSH2-format private key file, looking for a string of the type 

286 ``"BEGIN xxx PRIVATE KEY"`` for some ``xxx``, base64-decode the text we 

287 find, and return it as a string. If the private key is encrypted and 

288 ``password`` is not ``None``, the given password will be used to 

289 decrypt the key (otherwise `.PasswordRequiredException` is thrown). 

290 

291 :param str tag: ``"RSA"`` or ``"DSA"``, the tag used to mark the 

292 data block. 

293 :param str filename: name of the file to read. 

294 :param str password: 

295 an optional password to use to decrypt the key file, if it's 

296 encrypted. 

297 :return: the `bytes` that make up the private key. 

298 

299 :raises: ``IOError`` -- if there was an error reading the file. 

300 :raises: `.PasswordRequiredException` -- if the private key file is 

301 encrypted, and ``password`` is ``None``. 

302 :raises: `.SSHException` -- if the key file is invalid. 

303 """ 

304 with open(filename, "r") as f: 

305 data = self._read_private_key(tag, f, password) 

306 return data 

307 

308 def _read_private_key(self, tag, f, password=None): 

309 lines = f.readlines() 

310 if not lines: 

311 raise SSHException("no lines in {} private key file".format(tag)) 

312 

313 # find the BEGIN tag 

314 start = 0 

315 m = self.BEGIN_TAG.match(lines[start]) 

316 line_range = len(lines) - 1 

317 while start < line_range and not m: 

318 start += 1 

319 m = self.BEGIN_TAG.match(lines[start]) 

320 start += 1 

321 keytype = m.group(1) if m else None 

322 if start >= len(lines) or keytype is None: 

323 raise SSHException("not a valid {} private key file".format(tag)) 

324 

325 # find the END tag 

326 end = start 

327 m = self.END_TAG.match(lines[end]) 

328 while end < line_range and not m: 

329 end += 1 

330 m = self.END_TAG.match(lines[end]) 

331 

332 if keytype == tag: 

333 data = self._read_private_key_pem(lines, end, password) 

334 pkformat = self._PRIVATE_KEY_FORMAT_ORIGINAL 

335 elif keytype == "OPENSSH": 

336 data = self._read_private_key_openssh(lines[start:end], password) 

337 pkformat = self._PRIVATE_KEY_FORMAT_OPENSSH 

338 else: 

339 raise SSHException( 

340 "encountered {} key, expected {} key".format(keytype, tag) 

341 ) 

342 

343 return pkformat, data 

344 

345 def _got_bad_key_format_id(self, id_): 

346 err = "{}._read_private_key() spat out an unknown key format id '{}'" 

347 raise SSHException(err.format(self.__class__.__name__, id_)) 

348 

349 def _read_private_key_pem(self, lines, end, password): 

350 start = 0 

351 # parse any headers first 

352 headers = {} 

353 start += 1 

354 while start < len(lines): 

355 line = lines[start].split(": ") 

356 if len(line) == 1: 

357 break 

358 headers[line[0].lower()] = line[1].strip() 

359 start += 1 

360 # if we trudged to the end of the file, just try to cope. 

361 try: 

362 data = decodebytes(b("".join(lines[start:end]))) 

363 except base64.binascii.Error as e: 

364 raise SSHException("base64 decoding error: {}".format(e)) 

365 if "proc-type" not in headers: 

366 # unencryped: done 

367 return data 

368 # encrypted keyfile: will need a password 

369 proc_type = headers["proc-type"] 

370 if proc_type != "4,ENCRYPTED": 

371 raise SSHException( 

372 'Unknown private key structure "{}"'.format(proc_type) 

373 ) 

374 try: 

375 encryption_type, saltstr = headers["dek-info"].split(",") 

376 except: 

377 raise SSHException("Can't parse DEK-info in private key file") 

378 if encryption_type not in self._CIPHER_TABLE: 

379 raise SSHException( 

380 'Unknown private key cipher "{}"'.format(encryption_type) 

381 ) 

382 # if no password was passed in, 

383 # raise an exception pointing out that we need one 

384 if password is None: 

385 raise PasswordRequiredException("Private key file is encrypted") 

386 cipher = self._CIPHER_TABLE[encryption_type]["cipher"] 

387 keysize = self._CIPHER_TABLE[encryption_type]["keysize"] 

388 mode = self._CIPHER_TABLE[encryption_type]["mode"] 

389 salt = unhexlify(b(saltstr)) 

390 key = util.generate_key_bytes(md5, salt, password, keysize) 

391 decryptor = Cipher( 

392 cipher(key), mode(salt), backend=default_backend() 

393 ).decryptor() 

394 return decryptor.update(data) + decryptor.finalize() 

395 

396 def _read_private_key_openssh(self, lines, password): 

397 """ 

398 Read the new OpenSSH SSH2 private key format available 

399 since OpenSSH version 6.5 

400 Reference: 

401 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key 

402 """ 

403 try: 

404 data = decodebytes(b("".join(lines))) 

405 except base64.binascii.Error as e: 

406 raise SSHException("base64 decoding error: {}".format(e)) 

407 

408 # read data struct 

409 auth_magic = data[:15] 

410 if auth_magic != OPENSSH_AUTH_MAGIC: 

411 raise SSHException("unexpected OpenSSH key header encountered") 

412 

413 cstruct = self._uint32_cstruct_unpack(data[15:], "sssur") 

414 cipher, kdfname, kdf_options, num_pubkeys, remainder = cstruct 

415 # For now, just support 1 key. 

416 if num_pubkeys > 1: 

417 raise SSHException( 

418 "unsupported: private keyfile has multiple keys" 

419 ) 

420 pubkey, privkey_blob = self._uint32_cstruct_unpack(remainder, "ss") 

421 

422 if kdfname == b("bcrypt"): 

423 if cipher == b("aes256-cbc"): 

424 mode = modes.CBC 

425 elif cipher == b("aes256-ctr"): 

426 mode = modes.CTR 

427 else: 

428 raise SSHException( 

429 "unknown cipher `{}` used in private key file".format( 

430 cipher.decode("utf-8") 

431 ) 

432 ) 

433 # Encrypted private key. 

434 # If no password was passed in, raise an exception pointing 

435 # out that we need one 

436 if password is None: 

437 raise PasswordRequiredException( 

438 "private key file is encrypted" 

439 ) 

440 

441 # Unpack salt and rounds from kdfoptions 

442 salt, rounds = self._uint32_cstruct_unpack(kdf_options, "su") 

443 

444 # run bcrypt kdf to derive key and iv/nonce (32 + 16 bytes) 

445 key_iv = bcrypt.kdf( 

446 b(password), 

447 b(salt), 

448 48, 

449 rounds, 

450 # We can't control how many rounds are on disk, so no sense 

451 # warning about it. 

452 ignore_few_rounds=True, 

453 ) 

454 key = key_iv[:32] 

455 iv = key_iv[32:] 

456 

457 # decrypt private key blob 

458 decryptor = Cipher( 

459 algorithms.AES(key), mode(iv), default_backend() 

460 ).decryptor() 

461 decrypted_privkey = decryptor.update(privkey_blob) 

462 decrypted_privkey += decryptor.finalize() 

463 elif cipher == b("none") and kdfname == b("none"): 

464 # Unencrypted private key 

465 decrypted_privkey = privkey_blob 

466 else: 

467 raise SSHException( 

468 "unknown cipher or kdf used in private key file" 

469 ) 

470 

471 # Unpack private key and verify checkints 

472 cstruct = self._uint32_cstruct_unpack(decrypted_privkey, "uusr") 

473 checkint1, checkint2, keytype, keydata = cstruct 

474 

475 if checkint1 != checkint2: 

476 raise SSHException( 

477 "OpenSSH private key file checkints do not match" 

478 ) 

479 

480 return _unpad_openssh(keydata) 

481 

482 def _uint32_cstruct_unpack(self, data, strformat): 

483 """ 

484 Used to read new OpenSSH private key format. 

485 Unpacks a c data structure containing a mix of 32-bit uints and 

486 variable length strings prefixed by 32-bit uint size field, 

487 according to the specified format. Returns the unpacked vars 

488 in a tuple. 

489 Format strings: 

490 s - denotes a string 

491 i - denotes a long integer, encoded as a byte string 

492 u - denotes a 32-bit unsigned integer 

493 r - the remainder of the input string, returned as a string 

494 """ 

495 arr = [] 

496 idx = 0 

497 try: 

498 for f in strformat: 

499 if f == "s": 

500 # string 

501 s_size = struct.unpack(">L", data[idx : idx + 4])[0] 

502 idx += 4 

503 s = data[idx : idx + s_size] 

504 idx += s_size 

505 arr.append(s) 

506 if f == "i": 

507 # long integer 

508 s_size = struct.unpack(">L", data[idx : idx + 4])[0] 

509 idx += 4 

510 s = data[idx : idx + s_size] 

511 idx += s_size 

512 i = util.inflate_long(s, True) 

513 arr.append(i) 

514 elif f == "u": 

515 # 32-bit unsigned int 

516 u = struct.unpack(">L", data[idx : idx + 4])[0] 

517 idx += 4 

518 arr.append(u) 

519 elif f == "r": 

520 # remainder as string 

521 s = data[idx:] 

522 arr.append(s) 

523 break 

524 except Exception as e: 

525 # PKey-consuming code frequently wants to save-and-skip-over issues 

526 # with loading keys, and uses SSHException as the (really friggin 

527 # awful) signal for this. So for now...we do this. 

528 raise SSHException(str(e)) 

529 return tuple(arr) 

530 

531 def _write_private_key_file(self, filename, key, format, password=None): 

532 """ 

533 Write an SSH2-format private key file in a form that can be read by 

534 paramiko or openssh. If no password is given, the key is written in 

535 a trivially-encoded format (base64) which is completely insecure. If 

536 a password is given, DES-EDE3-CBC is used. 

537 

538 :param str tag: 

539 ``"RSA"`` or ``"DSA"``, the tag used to mark the data block. 

540 :param filename: name of the file to write. 

541 :param bytes data: data blob that makes up the private key. 

542 :param str password: an optional password to use to encrypt the file. 

543 

544 :raises: ``IOError`` -- if there was an error writing the file. 

545 """ 

546 # Ensure that we create new key files directly with a user-only mode, 

547 # instead of opening, writing, then chmodding, which leaves us open to 

548 # CVE-2022-24302. 

549 with os.fdopen( 

550 os.open( 

551 filename, 

552 # NOTE: O_TRUNC is a noop on new files, and O_CREAT is a noop 

553 # on existing files, so using all 3 in both cases is fine. 

554 flags=os.O_WRONLY | os.O_TRUNC | os.O_CREAT, 

555 # Ditto the use of the 'mode' argument; it should be safe to 

556 # give even for existing files (though it will not act like a 

557 # chmod in that case). 

558 mode=o600, 

559 ), 

560 # Yea, you still gotta inform the FLO that it is in "write" mode. 

561 "w", 

562 ) as f: 

563 self._write_private_key(f, key, format, password=password) 

564 

565 def _write_private_key(self, f, key, format, password=None): 

566 if password is None: 

567 encryption = serialization.NoEncryption() 

568 else: 

569 encryption = serialization.BestAvailableEncryption(b(password)) 

570 

571 f.write( 

572 key.private_bytes( 

573 serialization.Encoding.PEM, format, encryption 

574 ).decode() 

575 ) 

576 

577 def _check_type_and_load_cert(self, msg, key_type, cert_type): 

578 """ 

579 Perform message type-checking & optional certificate loading. 

580 

581 This includes fast-forwarding cert ``msg`` objects past the nonce, so 

582 that the subsequent fields are the key numbers; thus the caller may 

583 expect to treat the message as key material afterwards either way. 

584 

585 The obtained key type is returned for classes which need to know what 

586 it was (e.g. ECDSA.) 

587 """ 

588 # Normalization; most classes have a single key type and give a string, 

589 # but eg ECDSA is a 1:N mapping. 

590 key_types = key_type 

591 cert_types = cert_type 

592 if isinstance(key_type, str): 

593 key_types = [key_types] 

594 if isinstance(cert_types, str): 

595 cert_types = [cert_types] 

596 # Can't do much with no message, that should've been handled elsewhere 

597 if msg is None: 

598 raise SSHException("Key object may not be empty") 

599 # First field is always key type, in either kind of object. (make sure 

600 # we rewind before grabbing it - sometimes caller had to do their own 

601 # introspection first!) 

602 msg.rewind() 

603 type_ = msg.get_text() 

604 # Regular public key - nothing special to do besides the implicit 

605 # type check. 

606 if type_ in key_types: 

607 pass 

608 # OpenSSH-compatible certificate - store full copy as .public_blob 

609 # (so signing works correctly) and then fast-forward past the 

610 # nonce. 

611 elif type_ in cert_types: 

612 # This seems the cleanest way to 'clone' an already-being-read 

613 # message; they're *IO objects at heart and their .getvalue() 

614 # always returns the full value regardless of pointer position. 

615 self.load_certificate(Message(msg.asbytes())) 

616 # Read out nonce as it comes before the public numbers. 

617 # TODO: usefully interpret it & other non-public-number fields 

618 # (requires going back into per-type subclasses.) 

619 msg.get_string() 

620 else: 

621 err = "Invalid key (class: {}, data type: {}" 

622 raise SSHException(err.format(self.__class__.__name__, type_)) 

623 

624 def load_certificate(self, value): 

625 """ 

626 Supplement the private key contents with data loaded from an OpenSSH 

627 public key (``.pub``) or certificate (``-cert.pub``) file, a string 

628 containing such a file, or a `.Message` object. 

629 

630 The .pub contents adds no real value, since the private key 

631 file includes sufficient information to derive the public 

632 key info. For certificates, however, this can be used on 

633 the client side to offer authentication requests to the server 

634 based on certificate instead of raw public key. 

635 

636 See: 

637 https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys 

638 

639 Note: very little effort is made to validate the certificate contents, 

640 that is for the server to decide if it is good enough to authenticate 

641 successfully. 

642 """ 

643 if isinstance(value, Message): 

644 constructor = "from_message" 

645 elif os.path.isfile(value): 

646 constructor = "from_file" 

647 else: 

648 constructor = "from_string" 

649 blob = getattr(PublicBlob, constructor)(value) 

650 if not blob.key_type.startswith(self.get_name()): 

651 err = "PublicBlob type {} incompatible with key type {}" 

652 raise ValueError(err.format(blob.key_type, self.get_name())) 

653 self.public_blob = blob 

654 

655 

656# General construct for an OpenSSH style Public Key blob 

657# readable from a one-line file of the format: 

658# <key-name> <base64-blob> [<comment>] 

659# Of little value in the case of standard public keys 

660# {ssh-rsa, ssh-dss, ssh-ecdsa, ssh-ed25519}, but should 

661# provide rudimentary support for {*-cert.v01} 

662class PublicBlob: 

663 """ 

664 OpenSSH plain public key or OpenSSH signed public key (certificate). 

665 

666 Tries to be as dumb as possible and barely cares about specific 

667 per-key-type data. 

668 

669 .. note:: 

670 

671 Most of the time you'll want to call `from_file`, `from_string` or 

672 `from_message` for useful instantiation, the main constructor is 

673 basically "I should be using ``attrs`` for this." 

674 """ 

675 

676 def __init__(self, type_, blob, comment=None): 

677 """ 

678 Create a new public blob of given type and contents. 

679 

680 :param str type_: Type indicator, eg ``ssh-rsa``. 

681 :param bytes blob: The blob bytes themselves. 

682 :param str comment: A comment, if one was given (e.g. file-based.) 

683 """ 

684 self.key_type = type_ 

685 self.key_blob = blob 

686 self.comment = comment 

687 

688 @classmethod 

689 def from_file(cls, filename): 

690 """ 

691 Create a public blob from a ``-cert.pub``-style file on disk. 

692 """ 

693 with open(filename) as f: 

694 string = f.read() 

695 return cls.from_string(string) 

696 

697 @classmethod 

698 def from_string(cls, string): 

699 """ 

700 Create a public blob from a ``-cert.pub``-style string. 

701 """ 

702 fields = string.split(None, 2) 

703 if len(fields) < 2: 

704 msg = "Not enough fields for public blob: {}" 

705 raise ValueError(msg.format(fields)) 

706 key_type = fields[0] 

707 key_blob = decodebytes(b(fields[1])) 

708 try: 

709 comment = fields[2].strip() 

710 except IndexError: 

711 comment = None 

712 # Verify that the blob message first (string) field matches the 

713 # key_type 

714 m = Message(key_blob) 

715 blob_type = m.get_text() 

716 if blob_type != key_type: 

717 deets = "key type={!r}, but blob type={!r}".format( 

718 key_type, blob_type 

719 ) 

720 raise ValueError("Invalid PublicBlob contents: {}".format(deets)) 

721 # All good? All good. 

722 return cls(type_=key_type, blob=key_blob, comment=comment) 

723 

724 @classmethod 

725 def from_message(cls, message): 

726 """ 

727 Create a public blob from a network `.Message`. 

728 

729 Specifically, a cert-bearing pubkey auth packet, because by definition 

730 OpenSSH-style certificates 'are' their own network representation." 

731 """ 

732 type_ = message.get_text() 

733 return cls(type_=type_, blob=message.asbytes()) 

734 

735 def __str__(self): 

736 ret = "{} public key/certificate".format(self.key_type) 

737 if self.comment: 

738 ret += "- {}".format(self.comment) 

739 return ret 

740 

741 def __eq__(self, other): 

742 # Just piggyback on Message/BytesIO, since both of these should be one. 

743 return self and other and self.key_blob == other.key_blob 

744 

745 def __ne__(self, other): 

746 return not self == other