Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/ed25519key.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

113 statements  

1# This file is part of paramiko. 

2# 

3# Paramiko is free software; you can redistribute it and/or modify it under the 

4# terms of the GNU Lesser General Public License as published by the Free 

5# Software Foundation; either version 2.1 of the License, or (at your option) 

6# any later version. 

7# 

8# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

9# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

10# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

11# details. 

12# 

13# You should have received a copy of the GNU Lesser General Public License 

14# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17import bcrypt 

18 

19from cryptography.hazmat.backends import default_backend 

20from cryptography.hazmat.primitives.ciphers import Cipher 

21 

22import nacl.signing 

23 

24from paramiko.message import Message 

25from paramiko.pkey import PKey, OPENSSH_AUTH_MAGIC, _unpad_openssh 

26from paramiko.util import b 

27from paramiko.ssh_exception import SSHException, PasswordRequiredException 

28 

29 

30class Ed25519Key(PKey): 

31 """ 

32 Representation of an `Ed25519 <https://ed25519.cr.yp.to/>`_ key. 

33 

34 .. note:: 

35 Ed25519 key support was added to OpenSSH in version 6.5. 

36 

37 .. versionadded:: 2.2 

38 .. versionchanged:: 2.3 

39 Added a ``file_obj`` parameter to match other key classes. 

40 """ 

41 

42 name = "ssh-ed25519" 

43 

44 def __init__( 

45 self, msg=None, data=None, filename=None, password=None, file_obj=None 

46 ): 

47 self.public_blob = None 

48 verifying_key = signing_key = None 

49 if msg is None and data is not None: 

50 msg = Message(data) 

51 if msg is not None: 

52 self._check_type_and_load_cert( 

53 msg=msg, 

54 key_type=self.name, 

55 cert_type="ssh-ed25519-cert-v01@openssh.com", 

56 ) 

57 verifying_key = nacl.signing.VerifyKey(msg.get_binary()) 

58 elif filename is not None: 

59 with open(filename, "r") as f: 

60 pkformat, data = self._read_private_key("OPENSSH", f) 

61 elif file_obj is not None: 

62 pkformat, data = self._read_private_key("OPENSSH", file_obj) 

63 

64 if filename or file_obj: 

65 signing_key = self._parse_signing_key_data(data, password) 

66 

67 if signing_key is None and verifying_key is None: 

68 raise ValueError("need a key") 

69 

70 self._signing_key = signing_key 

71 self._verifying_key = verifying_key 

72 

73 def _parse_signing_key_data(self, data, password): 

74 from paramiko.transport import Transport 

75 

76 # We may eventually want this to be usable for other key types, as 

77 # OpenSSH moves to it, but for now this is just for Ed25519 keys. 

78 # This format is described here: 

79 # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key 

80 # The description isn't totally complete, and I had to refer to the 

81 # source for a full implementation. 

82 message = Message(data) 

83 if message.get_bytes(len(OPENSSH_AUTH_MAGIC)) != OPENSSH_AUTH_MAGIC: 

84 raise SSHException("Invalid key") 

85 

86 ciphername = message.get_text() 

87 kdfname = message.get_text() 

88 kdfoptions = message.get_binary() 

89 num_keys = message.get_int() 

90 

91 if kdfname == "none": 

92 # kdfname of "none" must have an empty kdfoptions, the ciphername 

93 # must be "none" 

94 if kdfoptions or ciphername != "none": 

95 raise SSHException("Invalid key") 

96 elif kdfname == "bcrypt": 

97 if not password: 

98 raise PasswordRequiredException( 

99 "Private key file is encrypted" 

100 ) 

101 kdf = Message(kdfoptions) 

102 bcrypt_salt = kdf.get_binary() 

103 bcrypt_rounds = kdf.get_int() 

104 else: 

105 raise SSHException("Invalid key") 

106 

107 if ciphername != "none" and ciphername not in Transport._cipher_info: 

108 raise SSHException("Invalid key") 

109 

110 public_keys = [] 

111 for _ in range(num_keys): 

112 pubkey = Message(message.get_binary()) 

113 if pubkey.get_text() != self.name: 

114 raise SSHException("Invalid key") 

115 public_keys.append(pubkey.get_binary()) 

116 

117 private_ciphertext = message.get_binary() 

118 if ciphername == "none": 

119 private_data = private_ciphertext 

120 else: 

121 cipher = Transport._cipher_info[ciphername] 

122 key = bcrypt.kdf( 

123 password=b(password), 

124 salt=bcrypt_salt, 

125 desired_key_bytes=cipher["key-size"] + cipher["block-size"], 

126 rounds=bcrypt_rounds, 

127 # We can't control how many rounds are on disk, so no sense 

128 # warning about it. 

129 ignore_few_rounds=True, 

130 ) 

131 decryptor = Cipher( 

132 cipher["class"](key[: cipher["key-size"]]), 

133 cipher["mode"](key[cipher["key-size"] :]), 

134 backend=default_backend(), 

135 ).decryptor() 

136 private_data = ( 

137 decryptor.update(private_ciphertext) + decryptor.finalize() 

138 ) 

139 

140 message = Message(_unpad_openssh(private_data)) 

141 if message.get_int() != message.get_int(): 

142 raise SSHException("Invalid key") 

143 

144 signing_keys = [] 

145 for i in range(num_keys): 

146 if message.get_text() != self.name: 

147 raise SSHException("Invalid key") 

148 # A copy of the public key, again, ignore. 

149 public = message.get_binary() 

150 key_data = message.get_binary() 

151 # The second half of the key data is yet another copy of the public 

152 # key... 

153 signing_key = nacl.signing.SigningKey(key_data[:32]) 

154 # Verify that all the public keys are the same... 

155 assert ( 

156 signing_key.verify_key.encode() 

157 == public 

158 == public_keys[i] 

159 == key_data[32:] 

160 ) 

161 signing_keys.append(signing_key) 

162 # Comment, ignore. 

163 message.get_binary() 

164 

165 if len(signing_keys) != 1: 

166 raise SSHException("Invalid key") 

167 return signing_keys[0] 

168 

169 def asbytes(self): 

170 if self.can_sign(): 

171 v = self._signing_key.verify_key 

172 else: 

173 v = self._verifying_key 

174 m = Message() 

175 m.add_string(self.name) 

176 m.add_string(v.encode()) 

177 return m.asbytes() 

178 

179 @property 

180 def _fields(self): 

181 if self.can_sign(): 

182 v = self._signing_key.verify_key 

183 else: 

184 v = self._verifying_key 

185 return (self.get_name(), v) 

186 

187 # TODO 4.0: remove 

188 def get_name(self): 

189 return self.name 

190 

191 def get_bits(self): 

192 return 256 

193 

194 def can_sign(self): 

195 return self._signing_key is not None 

196 

197 def sign_ssh_data(self, data, algorithm=None): 

198 m = Message() 

199 m.add_string(self.name) 

200 m.add_string(self._signing_key.sign(data).signature) 

201 return m 

202 

203 def verify_ssh_sig(self, data, msg): 

204 if msg.get_text() != self.name: 

205 return False 

206 

207 try: 

208 self._verifying_key.verify(data, msg.get_binary()) 

209 except nacl.exceptions.BadSignatureError: 

210 return False 

211 else: 

212 return True