Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/ed25519key.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

117 statements  

1# This file is part of paramiko. 

2# 

3# Paramiko is free software; you can redistribute it and/or modify it under the 

4# terms of the GNU Lesser General Public License as published by the Free 

5# Software Foundation; either version 2.1 of the License, or (at your option) 

6# any later version. 

7# 

8# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

9# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

10# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

11# details. 

12# 

13# You should have received a copy of the GNU Lesser General Public License 

14# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

15# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

16 

17from typing import Union 

18 

19import bcrypt 

20import nacl.signing 

21from cryptography.hazmat.backends import default_backend 

22from cryptography.hazmat.primitives.ciphers import Cipher 

23 

24from paramiko.message import Message 

25from paramiko.pkey import OPENSSH_AUTH_MAGIC, PKey, _unpad_openssh 

26from paramiko.ssh_exception import PasswordRequiredException, SSHException 

27from paramiko.util import b 

28 

29 

30class Ed25519Key(PKey): 

31 """ 

32 Representation of an `Ed25519 <https://ed25519.cr.yp.to/>`_ key. 

33 

34 .. note:: 

35 Ed25519 key support was added to OpenSSH in version 6.5. 

36 

37 .. versionadded:: 2.2 

38 .. versionchanged:: 2.3 

39 Added a ``file_obj`` parameter to match other key classes. 

40 """ 

41 

42 name = "ssh-ed25519" 

43 

44 def __init__( 

45 self, msg=None, data=None, filename=None, password=None, file_obj=None 

46 ): 

47 self.public_blob = None 

48 self._verifying_key, self._signing_key = None, None 

49 if msg is None and data is not None: 

50 msg = Message(data) 

51 if msg is not None: 

52 self._check_type_and_load_cert( 

53 msg=msg, 

54 key_type=self.name, 

55 cert_type="ssh-ed25519-cert-v01@openssh.com", 

56 ) 

57 self._verifying_key = nacl.signing.VerifyKey(msg.get_binary()) 

58 elif filename is not None: 

59 with open(filename, "r") as f: 

60 pkformat, data = self._read_private_key("OPENSSH", f) 

61 elif file_obj is not None: 

62 pkformat, data = self._read_private_key("OPENSSH", file_obj) 

63 

64 if filename or file_obj: 

65 self._signing_key = self._parse_signing_key_data(data, password) 

66 

67 if self._signing_key is None and self._verifying_key is None: 

68 raise ValueError("need a key") 

69 

70 def _parse_signing_key_data(self, data, password): 

71 from paramiko.transport import Transport 

72 

73 # We may eventually want this to be usable for other key types, as 

74 # OpenSSH moves to it, but for now this is just for Ed25519 keys. 

75 # This format is described here: 

76 # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key 

77 # The description isn't totally complete, and I had to refer to the 

78 # source for a full implementation. 

79 message = Message(data) 

80 if message.get_bytes(len(OPENSSH_AUTH_MAGIC)) != OPENSSH_AUTH_MAGIC: 

81 raise SSHException("Invalid key") 

82 

83 ciphername = message.get_text() 

84 kdfname = message.get_text() 

85 kdfoptions = message.get_binary() 

86 num_keys = message.get_int() 

87 

88 if kdfname == "none": 

89 # kdfname of "none" must have an empty kdfoptions, the ciphername 

90 # must be "none" 

91 if kdfoptions or ciphername != "none": 

92 raise SSHException("Invalid key") 

93 elif kdfname == "bcrypt": 

94 if not password: 

95 raise PasswordRequiredException( 

96 "Private key file is encrypted" 

97 ) 

98 kdf = Message(kdfoptions) 

99 bcrypt_salt = kdf.get_binary() 

100 bcrypt_rounds = kdf.get_int() 

101 else: 

102 raise SSHException("Invalid key") 

103 

104 if ciphername != "none" and ciphername not in Transport._cipher_info: 

105 raise SSHException("Invalid key") 

106 

107 public_keys = [] 

108 for _ in range(num_keys): 

109 pubkey = Message(message.get_binary()) 

110 if pubkey.get_text() != self.name: 

111 raise SSHException("Invalid key") 

112 public_keys.append(pubkey.get_binary()) 

113 

114 private_ciphertext = message.get_binary() 

115 if ciphername == "none": 

116 private_data = private_ciphertext 

117 else: 

118 cipher = Transport._cipher_info[ciphername] 

119 key = bcrypt.kdf( 

120 password=b(password), 

121 salt=bcrypt_salt, 

122 desired_key_bytes=cipher["key-size"] + cipher["block-size"], 

123 rounds=bcrypt_rounds, 

124 # We can't control how many rounds are on disk, so no sense 

125 # warning about it. 

126 ignore_few_rounds=True, 

127 ) 

128 decryptor = Cipher( 

129 cipher["class"](key[: cipher["key-size"]]), 

130 cipher["mode"](key[cipher["key-size"] :]), 

131 backend=default_backend(), 

132 ).decryptor() 

133 private_data = ( 

134 decryptor.update(private_ciphertext) + decryptor.finalize() 

135 ) 

136 

137 message = Message(_unpad_openssh(private_data)) 

138 if message.get_int() != message.get_int(): 

139 raise SSHException("Invalid key") 

140 

141 signing_keys = [] 

142 for i in range(num_keys): 

143 if message.get_text() != self.name: 

144 raise SSHException("Invalid key") 

145 # A copy of the public key, again, ignore. 

146 public = message.get_binary() 

147 key_data = message.get_binary() 

148 # The second half of the key data is yet another copy of the public 

149 # key... 

150 signing_key = nacl.signing.SigningKey(key_data[:32]) 

151 # Verify that all the public keys are the same... 

152 assert ( 

153 signing_key.verify_key.encode() 

154 == public 

155 == public_keys[i] 

156 == key_data[32:] 

157 ) 

158 signing_keys.append(signing_key) 

159 # Comment, ignore. 

160 message.get_binary() 

161 

162 if len(signing_keys) != 1: 

163 raise SSHException("Invalid key") 

164 return signing_keys[0] 

165 

166 def asbytes(self): 

167 v = self.verifying_key 

168 # Handle not-fully-initialized situations gracefully 

169 my_bytes = v.encode() if v is not None else b"" 

170 m = Message() 

171 m.add_string(self.name) 

172 m.add_string(my_bytes) 

173 return m.asbytes() 

174 

175 @property 

176 def _fields(self): 

177 return (self.get_name(), self.verifying_key) 

178 

179 # TODO (backwards incompat): remove 

180 def get_name(self): 

181 return self.name 

182 

183 def get_bits(self): 

184 return 256 

185 

186 def can_sign(self): 

187 return self._signing_key is not None 

188 

189 def can_verify(self): 

190 return self._verifying_key is not None 

191 

192 @property 

193 def verifying_key(self) -> Union[nacl.signing.VerifyKey, None]: 

194 if self.can_sign(): 

195 return self._signing_key.verify_key 

196 elif self.can_verify(): 

197 return self._verifying_key 

198 return None 

199 

200 def sign_ssh_data(self, data, algorithm=None): 

201 m = Message() 

202 m.add_string(self.name) 

203 m.add_string(self._signing_key.sign(data).signature) 

204 return m 

205 

206 def verify_ssh_sig(self, data, msg): 

207 if msg.get_text() != self.name: 

208 return False 

209 

210 try: 

211 self._verifying_key.verify(data, msg.get_binary()) 

212 except nacl.exceptions.BadSignatureError: 

213 return False 

214 else: 

215 return True