Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/dsskey.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

124 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20DSS keys. 

21""" 

22 

23from cryptography.exceptions import InvalidSignature 

24from cryptography.hazmat.backends import default_backend 

25from cryptography.hazmat.primitives import hashes, serialization 

26from cryptography.hazmat.primitives.asymmetric import dsa 

27from cryptography.hazmat.primitives.asymmetric.utils import ( 

28 decode_dss_signature, 

29 encode_dss_signature, 

30) 

31 

32from paramiko import util 

33from paramiko.common import zero_byte 

34from paramiko.ssh_exception import SSHException 

35from paramiko.message import Message 

36from paramiko.ber import BER, BERException 

37from paramiko.pkey import PKey 

38 

39 

40class DSSKey(PKey): 

41 """ 

42 Representation of a DSS key which can be used to sign an verify SSH2 

43 data. 

44 """ 

45 

46 name = "ssh-dss" 

47 

48 def __init__( 

49 self, 

50 msg=None, 

51 data=None, 

52 filename=None, 

53 password=None, 

54 vals=None, 

55 file_obj=None, 

56 ): 

57 self.p = None 

58 self.q = None 

59 self.g = None 

60 self.y = None 

61 self.x = None 

62 self.public_blob = None 

63 if file_obj is not None: 

64 self._from_private_key(file_obj, password) 

65 return 

66 if filename is not None: 

67 self._from_private_key_file(filename, password) 

68 return 

69 if (msg is None) and (data is not None): 

70 msg = Message(data) 

71 if vals is not None: 

72 self.p, self.q, self.g, self.y = vals 

73 else: 

74 self._check_type_and_load_cert( 

75 msg=msg, 

76 key_type=self.name, 

77 cert_type=f"{self.name}-cert-v01@openssh.com", 

78 ) 

79 self.p = msg.get_mpint() 

80 self.q = msg.get_mpint() 

81 self.g = msg.get_mpint() 

82 self.y = msg.get_mpint() 

83 self.size = util.bit_length(self.p) 

84 

85 def asbytes(self): 

86 m = Message() 

87 m.add_string(self.name) 

88 m.add_mpint(self.p) 

89 m.add_mpint(self.q) 

90 m.add_mpint(self.g) 

91 m.add_mpint(self.y) 

92 return m.asbytes() 

93 

94 def __str__(self): 

95 return self.asbytes() 

96 

97 @property 

98 def _fields(self): 

99 return (self.get_name(), self.p, self.q, self.g, self.y) 

100 

101 # TODO 4.0: remove 

102 def get_name(self): 

103 return self.name 

104 

105 def get_bits(self): 

106 return self.size 

107 

108 def can_sign(self): 

109 return self.x is not None 

110 

111 def sign_ssh_data(self, data, algorithm=None): 

112 key = dsa.DSAPrivateNumbers( 

113 x=self.x, 

114 public_numbers=dsa.DSAPublicNumbers( 

115 y=self.y, 

116 parameter_numbers=dsa.DSAParameterNumbers( 

117 p=self.p, q=self.q, g=self.g 

118 ), 

119 ), 

120 ).private_key(backend=default_backend()) 

121 sig = key.sign(data, hashes.SHA1()) 

122 r, s = decode_dss_signature(sig) 

123 

124 m = Message() 

125 m.add_string(self.name) 

126 # apparently, in rare cases, r or s may be shorter than 20 bytes! 

127 rstr = util.deflate_long(r, 0) 

128 sstr = util.deflate_long(s, 0) 

129 if len(rstr) < 20: 

130 rstr = zero_byte * (20 - len(rstr)) + rstr 

131 if len(sstr) < 20: 

132 sstr = zero_byte * (20 - len(sstr)) + sstr 

133 m.add_string(rstr + sstr) 

134 return m 

135 

136 def verify_ssh_sig(self, data, msg): 

137 if len(msg.asbytes()) == 40: 

138 # spies.com bug: signature has no header 

139 sig = msg.asbytes() 

140 else: 

141 kind = msg.get_text() 

142 if kind != self.name: 

143 return 0 

144 sig = msg.get_binary() 

145 

146 # pull out (r, s) which are NOT encoded as mpints 

147 sigR = util.inflate_long(sig[:20], 1) 

148 sigS = util.inflate_long(sig[20:], 1) 

149 

150 signature = encode_dss_signature(sigR, sigS) 

151 

152 key = dsa.DSAPublicNumbers( 

153 y=self.y, 

154 parameter_numbers=dsa.DSAParameterNumbers( 

155 p=self.p, q=self.q, g=self.g 

156 ), 

157 ).public_key(backend=default_backend()) 

158 try: 

159 key.verify(signature, data, hashes.SHA1()) 

160 except InvalidSignature: 

161 return False 

162 else: 

163 return True 

164 

165 def write_private_key_file(self, filename, password=None): 

166 key = dsa.DSAPrivateNumbers( 

167 x=self.x, 

168 public_numbers=dsa.DSAPublicNumbers( 

169 y=self.y, 

170 parameter_numbers=dsa.DSAParameterNumbers( 

171 p=self.p, q=self.q, g=self.g 

172 ), 

173 ), 

174 ).private_key(backend=default_backend()) 

175 

176 self._write_private_key_file( 

177 filename, 

178 key, 

179 serialization.PrivateFormat.TraditionalOpenSSL, 

180 password=password, 

181 ) 

182 

183 def write_private_key(self, file_obj, password=None): 

184 key = dsa.DSAPrivateNumbers( 

185 x=self.x, 

186 public_numbers=dsa.DSAPublicNumbers( 

187 y=self.y, 

188 parameter_numbers=dsa.DSAParameterNumbers( 

189 p=self.p, q=self.q, g=self.g 

190 ), 

191 ), 

192 ).private_key(backend=default_backend()) 

193 

194 self._write_private_key( 

195 file_obj, 

196 key, 

197 serialization.PrivateFormat.TraditionalOpenSSL, 

198 password=password, 

199 ) 

200 

201 @staticmethod 

202 def generate(bits=1024, progress_func=None): 

203 """ 

204 Generate a new private DSS key. This factory function can be used to 

205 generate a new host key or authentication key. 

206 

207 :param int bits: number of bits the generated key should be. 

208 :param progress_func: Unused 

209 :return: new `.DSSKey` private key 

210 """ 

211 numbers = dsa.generate_private_key( 

212 bits, backend=default_backend() 

213 ).private_numbers() 

214 key = DSSKey( 

215 vals=( 

216 numbers.public_numbers.parameter_numbers.p, 

217 numbers.public_numbers.parameter_numbers.q, 

218 numbers.public_numbers.parameter_numbers.g, 

219 numbers.public_numbers.y, 

220 ) 

221 ) 

222 key.x = numbers.x 

223 return key 

224 

225 # ...internals... 

226 

227 def _from_private_key_file(self, filename, password): 

228 data = self._read_private_key_file("DSA", filename, password) 

229 self._decode_key(data) 

230 

231 def _from_private_key(self, file_obj, password): 

232 data = self._read_private_key("DSA", file_obj, password) 

233 self._decode_key(data) 

234 

235 def _decode_key(self, data): 

236 pkformat, data = data 

237 # private key file contains: 

238 # DSAPrivateKey = { version = 0, p, q, g, y, x } 

239 if pkformat == self._PRIVATE_KEY_FORMAT_ORIGINAL: 

240 try: 

241 keylist = BER(data).decode() 

242 except BERException as e: 

243 raise SSHException("Unable to parse key file: {}".format(e)) 

244 elif pkformat == self._PRIVATE_KEY_FORMAT_OPENSSH: 

245 keylist = self._uint32_cstruct_unpack(data, "iiiii") 

246 keylist = [0] + list(keylist) 

247 else: 

248 self._got_bad_key_format_id(pkformat) 

249 if type(keylist) is not list or len(keylist) < 6 or keylist[0] != 0: 

250 raise SSHException( 

251 "not a valid DSA private key file (bad ber encoding)" 

252 ) 

253 self.p = keylist[1] 

254 self.q = keylist[2] 

255 self.g = keylist[3] 

256 self.y = keylist[4] 

257 self.x = keylist[5] 

258 self.size = util.bit_length(self.p)