Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/ed25519key.py: 17%

111 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# This file is part of paramiko. 

2# 

3# Paramiko is free software; you can redistribute it and/or modify it under the 

4# terms of the GNU Lesser General Public License as published by the Free 

5# Software Foundation; either version 2.1 of the License, or (at your option) 

6# any later version. 

7# 

8# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

9# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

10# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

11# details. 

12# 

13# You should have received a copy of the GNU Lesser General Public License 

14# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17import bcrypt 

18 

19from cryptography.hazmat.backends import default_backend 

20from cryptography.hazmat.primitives.ciphers import Cipher 

21 

22import nacl.signing 

23 

24from paramiko.message import Message 

25from paramiko.pkey import PKey, OPENSSH_AUTH_MAGIC, _unpad_openssh 

26from paramiko.util import b 

27from paramiko.ssh_exception import SSHException, PasswordRequiredException 

28 

29 

30class Ed25519Key(PKey): 

31 """ 

32 Representation of an `Ed25519 <https://ed25519.cr.yp.to/>`_ key. 

33 

34 .. note:: 

35 Ed25519 key support was added to OpenSSH in version 6.5. 

36 

37 .. versionadded:: 2.2 

38 .. versionchanged:: 2.3 

39 Added a ``file_obj`` parameter to match other key classes. 

40 """ 

41 

42 def __init__( 

43 self, msg=None, data=None, filename=None, password=None, file_obj=None 

44 ): 

45 self.public_blob = None 

46 verifying_key = signing_key = None 

47 if msg is None and data is not None: 

48 msg = Message(data) 

49 if msg is not None: 

50 self._check_type_and_load_cert( 

51 msg=msg, 

52 key_type="ssh-ed25519", 

53 cert_type="ssh-ed25519-cert-v01@openssh.com", 

54 ) 

55 verifying_key = nacl.signing.VerifyKey(msg.get_binary()) 

56 elif filename is not None: 

57 with open(filename, "r") as f: 

58 pkformat, data = self._read_private_key("OPENSSH", f) 

59 elif file_obj is not None: 

60 pkformat, data = self._read_private_key("OPENSSH", file_obj) 

61 

62 if filename or file_obj: 

63 signing_key = self._parse_signing_key_data(data, password) 

64 

65 if signing_key is None and verifying_key is None: 

66 raise ValueError("need a key") 

67 

68 self._signing_key = signing_key 

69 self._verifying_key = verifying_key 

70 

71 def _parse_signing_key_data(self, data, password): 

72 from paramiko.transport import Transport 

73 

74 # We may eventually want this to be usable for other key types, as 

75 # OpenSSH moves to it, but for now this is just for Ed25519 keys. 

76 # This format is described here: 

77 # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key 

78 # The description isn't totally complete, and I had to refer to the 

79 # source for a full implementation. 

80 message = Message(data) 

81 if message.get_bytes(len(OPENSSH_AUTH_MAGIC)) != OPENSSH_AUTH_MAGIC: 

82 raise SSHException("Invalid key") 

83 

84 ciphername = message.get_text() 

85 kdfname = message.get_text() 

86 kdfoptions = message.get_binary() 

87 num_keys = message.get_int() 

88 

89 if kdfname == "none": 

90 # kdfname of "none" must have an empty kdfoptions, the ciphername 

91 # must be "none" 

92 if kdfoptions or ciphername != "none": 

93 raise SSHException("Invalid key") 

94 elif kdfname == "bcrypt": 

95 if not password: 

96 raise PasswordRequiredException( 

97 "Private key file is encrypted" 

98 ) 

99 kdf = Message(kdfoptions) 

100 bcrypt_salt = kdf.get_binary() 

101 bcrypt_rounds = kdf.get_int() 

102 else: 

103 raise SSHException("Invalid key") 

104 

105 if ciphername != "none" and ciphername not in Transport._cipher_info: 

106 raise SSHException("Invalid key") 

107 

108 public_keys = [] 

109 for _ in range(num_keys): 

110 pubkey = Message(message.get_binary()) 

111 if pubkey.get_text() != "ssh-ed25519": 

112 raise SSHException("Invalid key") 

113 public_keys.append(pubkey.get_binary()) 

114 

115 private_ciphertext = message.get_binary() 

116 if ciphername == "none": 

117 private_data = private_ciphertext 

118 else: 

119 cipher = Transport._cipher_info[ciphername] 

120 key = bcrypt.kdf( 

121 password=b(password), 

122 salt=bcrypt_salt, 

123 desired_key_bytes=cipher["key-size"] + cipher["block-size"], 

124 rounds=bcrypt_rounds, 

125 # We can't control how many rounds are on disk, so no sense 

126 # warning about it. 

127 ignore_few_rounds=True, 

128 ) 

129 decryptor = Cipher( 

130 cipher["class"](key[: cipher["key-size"]]), 

131 cipher["mode"](key[cipher["key-size"] :]), 

132 backend=default_backend(), 

133 ).decryptor() 

134 private_data = ( 

135 decryptor.update(private_ciphertext) + decryptor.finalize() 

136 ) 

137 

138 message = Message(_unpad_openssh(private_data)) 

139 if message.get_int() != message.get_int(): 

140 raise SSHException("Invalid key") 

141 

142 signing_keys = [] 

143 for i in range(num_keys): 

144 if message.get_text() != "ssh-ed25519": 

145 raise SSHException("Invalid key") 

146 # A copy of the public key, again, ignore. 

147 public = message.get_binary() 

148 key_data = message.get_binary() 

149 # The second half of the key data is yet another copy of the public 

150 # key... 

151 signing_key = nacl.signing.SigningKey(key_data[:32]) 

152 # Verify that all the public keys are the same... 

153 assert ( 

154 signing_key.verify_key.encode() 

155 == public 

156 == public_keys[i] 

157 == key_data[32:] 

158 ) 

159 signing_keys.append(signing_key) 

160 # Comment, ignore. 

161 message.get_binary() 

162 

163 if len(signing_keys) != 1: 

164 raise SSHException("Invalid key") 

165 return signing_keys[0] 

166 

167 def asbytes(self): 

168 if self.can_sign(): 

169 v = self._signing_key.verify_key 

170 else: 

171 v = self._verifying_key 

172 m = Message() 

173 m.add_string("ssh-ed25519") 

174 m.add_string(v.encode()) 

175 return m.asbytes() 

176 

177 @property 

178 def _fields(self): 

179 if self.can_sign(): 

180 v = self._signing_key.verify_key 

181 else: 

182 v = self._verifying_key 

183 return (self.get_name(), v) 

184 

185 def get_name(self): 

186 return "ssh-ed25519" 

187 

188 def get_bits(self): 

189 return 256 

190 

191 def can_sign(self): 

192 return self._signing_key is not None 

193 

194 def sign_ssh_data(self, data, algorithm=None): 

195 m = Message() 

196 m.add_string("ssh-ed25519") 

197 m.add_string(self._signing_key.sign(data).signature) 

198 return m 

199 

200 def verify_ssh_sig(self, data, msg): 

201 if msg.get_text() != "ssh-ed25519": 

202 return False 

203 

204 try: 

205 self._verifying_key.verify(data, msg.get_binary()) 

206 except nacl.exceptions.BadSignatureError: 

207 return False 

208 else: 

209 return True