Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/dsskey.py: 25%

122 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20DSS keys. 

21""" 

22 

23from cryptography.exceptions import InvalidSignature 

24from cryptography.hazmat.backends import default_backend 

25from cryptography.hazmat.primitives import hashes, serialization 

26from cryptography.hazmat.primitives.asymmetric import dsa 

27from cryptography.hazmat.primitives.asymmetric.utils import ( 

28 decode_dss_signature, 

29 encode_dss_signature, 

30) 

31 

32from paramiko import util 

33from paramiko.common import zero_byte 

34from paramiko.ssh_exception import SSHException 

35from paramiko.message import Message 

36from paramiko.ber import BER, BERException 

37from paramiko.pkey import PKey 

38 

39 

40class DSSKey(PKey): 

41 """ 

42 Representation of a DSS key which can be used to sign an verify SSH2 

43 data. 

44 """ 

45 

46 def __init__( 

47 self, 

48 msg=None, 

49 data=None, 

50 filename=None, 

51 password=None, 

52 vals=None, 

53 file_obj=None, 

54 ): 

55 self.p = None 

56 self.q = None 

57 self.g = None 

58 self.y = None 

59 self.x = None 

60 self.public_blob = None 

61 if file_obj is not None: 

62 self._from_private_key(file_obj, password) 

63 return 

64 if filename is not None: 

65 self._from_private_key_file(filename, password) 

66 return 

67 if (msg is None) and (data is not None): 

68 msg = Message(data) 

69 if vals is not None: 

70 self.p, self.q, self.g, self.y = vals 

71 else: 

72 self._check_type_and_load_cert( 

73 msg=msg, 

74 key_type="ssh-dss", 

75 cert_type="ssh-dss-cert-v01@openssh.com", 

76 ) 

77 self.p = msg.get_mpint() 

78 self.q = msg.get_mpint() 

79 self.g = msg.get_mpint() 

80 self.y = msg.get_mpint() 

81 self.size = util.bit_length(self.p) 

82 

83 def asbytes(self): 

84 m = Message() 

85 m.add_string("ssh-dss") 

86 m.add_mpint(self.p) 

87 m.add_mpint(self.q) 

88 m.add_mpint(self.g) 

89 m.add_mpint(self.y) 

90 return m.asbytes() 

91 

92 def __str__(self): 

93 return self.asbytes() 

94 

95 @property 

96 def _fields(self): 

97 return (self.get_name(), self.p, self.q, self.g, self.y) 

98 

99 def get_name(self): 

100 return "ssh-dss" 

101 

102 def get_bits(self): 

103 return self.size 

104 

105 def can_sign(self): 

106 return self.x is not None 

107 

108 def sign_ssh_data(self, data, algorithm=None): 

109 key = dsa.DSAPrivateNumbers( 

110 x=self.x, 

111 public_numbers=dsa.DSAPublicNumbers( 

112 y=self.y, 

113 parameter_numbers=dsa.DSAParameterNumbers( 

114 p=self.p, q=self.q, g=self.g 

115 ), 

116 ), 

117 ).private_key(backend=default_backend()) 

118 sig = key.sign(data, hashes.SHA1()) 

119 r, s = decode_dss_signature(sig) 

120 

121 m = Message() 

122 m.add_string("ssh-dss") 

123 # apparently, in rare cases, r or s may be shorter than 20 bytes! 

124 rstr = util.deflate_long(r, 0) 

125 sstr = util.deflate_long(s, 0) 

126 if len(rstr) < 20: 

127 rstr = zero_byte * (20 - len(rstr)) + rstr 

128 if len(sstr) < 20: 

129 sstr = zero_byte * (20 - len(sstr)) + sstr 

130 m.add_string(rstr + sstr) 

131 return m 

132 

133 def verify_ssh_sig(self, data, msg): 

134 if len(msg.asbytes()) == 40: 

135 # spies.com bug: signature has no header 

136 sig = msg.asbytes() 

137 else: 

138 kind = msg.get_text() 

139 if kind != "ssh-dss": 

140 return 0 

141 sig = msg.get_binary() 

142 

143 # pull out (r, s) which are NOT encoded as mpints 

144 sigR = util.inflate_long(sig[:20], 1) 

145 sigS = util.inflate_long(sig[20:], 1) 

146 

147 signature = encode_dss_signature(sigR, sigS) 

148 

149 key = dsa.DSAPublicNumbers( 

150 y=self.y, 

151 parameter_numbers=dsa.DSAParameterNumbers( 

152 p=self.p, q=self.q, g=self.g 

153 ), 

154 ).public_key(backend=default_backend()) 

155 try: 

156 key.verify(signature, data, hashes.SHA1()) 

157 except InvalidSignature: 

158 return False 

159 else: 

160 return True 

161 

162 def write_private_key_file(self, filename, password=None): 

163 key = dsa.DSAPrivateNumbers( 

164 x=self.x, 

165 public_numbers=dsa.DSAPublicNumbers( 

166 y=self.y, 

167 parameter_numbers=dsa.DSAParameterNumbers( 

168 p=self.p, q=self.q, g=self.g 

169 ), 

170 ), 

171 ).private_key(backend=default_backend()) 

172 

173 self._write_private_key_file( 

174 filename, 

175 key, 

176 serialization.PrivateFormat.TraditionalOpenSSL, 

177 password=password, 

178 ) 

179 

180 def write_private_key(self, file_obj, password=None): 

181 key = dsa.DSAPrivateNumbers( 

182 x=self.x, 

183 public_numbers=dsa.DSAPublicNumbers( 

184 y=self.y, 

185 parameter_numbers=dsa.DSAParameterNumbers( 

186 p=self.p, q=self.q, g=self.g 

187 ), 

188 ), 

189 ).private_key(backend=default_backend()) 

190 

191 self._write_private_key( 

192 file_obj, 

193 key, 

194 serialization.PrivateFormat.TraditionalOpenSSL, 

195 password=password, 

196 ) 

197 

198 @staticmethod 

199 def generate(bits=1024, progress_func=None): 

200 """ 

201 Generate a new private DSS key. This factory function can be used to 

202 generate a new host key or authentication key. 

203 

204 :param int bits: number of bits the generated key should be. 

205 :param progress_func: Unused 

206 :return: new `.DSSKey` private key 

207 """ 

208 numbers = dsa.generate_private_key( 

209 bits, backend=default_backend() 

210 ).private_numbers() 

211 key = DSSKey( 

212 vals=( 

213 numbers.public_numbers.parameter_numbers.p, 

214 numbers.public_numbers.parameter_numbers.q, 

215 numbers.public_numbers.parameter_numbers.g, 

216 numbers.public_numbers.y, 

217 ) 

218 ) 

219 key.x = numbers.x 

220 return key 

221 

222 # ...internals... 

223 

224 def _from_private_key_file(self, filename, password): 

225 data = self._read_private_key_file("DSA", filename, password) 

226 self._decode_key(data) 

227 

228 def _from_private_key(self, file_obj, password): 

229 data = self._read_private_key("DSA", file_obj, password) 

230 self._decode_key(data) 

231 

232 def _decode_key(self, data): 

233 pkformat, data = data 

234 # private key file contains: 

235 # DSAPrivateKey = { version = 0, p, q, g, y, x } 

236 if pkformat == self._PRIVATE_KEY_FORMAT_ORIGINAL: 

237 try: 

238 keylist = BER(data).decode() 

239 except BERException as e: 

240 raise SSHException("Unable to parse key file: {}".format(e)) 

241 elif pkformat == self._PRIVATE_KEY_FORMAT_OPENSSH: 

242 keylist = self._uint32_cstruct_unpack(data, "iiiii") 

243 keylist = [0] + list(keylist) 

244 else: 

245 self._got_bad_key_format_id(pkformat) 

246 if type(keylist) is not list or len(keylist) < 6 or keylist[0] != 0: 

247 raise SSHException( 

248 "not a valid DSA private key file (bad ber encoding)" 

249 ) 

250 self.p = keylist[1] 

251 self.q = keylist[2] 

252 self.g = keylist[3] 

253 self.y = keylist[4] 

254 self.x = keylist[5] 

255 self.size = util.bit_length(self.p)