Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/kex_gex.py: 13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

179 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20Variant on the (now deprecated/removed) ``KexGroup1`` where the prime "p" and 

21generator "g" are provided by the server. A bit more work is required on the 

22client side, and a **lot** more on the server side. 

23""" 

24 

25import os 

26from hashlib import sha256 

27 

28from paramiko import util 

29from paramiko.common import DEBUG, byte_chr, byte_mask, byte_ord 

30from paramiko.message import Message 

31from paramiko.ssh_exception import SSHException 

32 

33( 

34 _MSG_KEXDH_GEX_REQUEST_OLD, 

35 _MSG_KEXDH_GEX_GROUP, 

36 _MSG_KEXDH_GEX_INIT, 

37 _MSG_KEXDH_GEX_REPLY, 

38 _MSG_KEXDH_GEX_REQUEST, 

39) = range(30, 35) 

40 

41( 

42 c_MSG_KEXDH_GEX_REQUEST_OLD, 

43 c_MSG_KEXDH_GEX_GROUP, 

44 c_MSG_KEXDH_GEX_INIT, 

45 c_MSG_KEXDH_GEX_REPLY, 

46 c_MSG_KEXDH_GEX_REQUEST, 

47) = [byte_chr(c) for c in range(30, 35)] 

48 

49 

50class KexGexSHA256: 

51 name = "diffie-hellman-group-exchange-sha256" 

52 min_bits = 2048 

53 max_bits = 8192 

54 preferred_bits = 2048 

55 hash_algo = sha256 

56 

57 def __init__(self, transport): 

58 self.transport = transport 

59 self.p = None 

60 self.q = None 

61 self.g = None 

62 self.x = None 

63 self.e = None 

64 self.f = None 

65 self.old_style = False 

66 

67 def start_kex(self, _test_old_style=False): 

68 if self.transport.server_mode: 

69 self.transport._expect_packet( 

70 _MSG_KEXDH_GEX_REQUEST, _MSG_KEXDH_GEX_REQUEST_OLD 

71 ) 

72 return 

73 # request a bit range: we accept (min_bits) to (max_bits), but prefer 

74 # (preferred_bits). 

75 m = Message() 

76 if _test_old_style: 

77 # only used for unit tests: we shouldn't ever send this 

78 m.add_byte(c_MSG_KEXDH_GEX_REQUEST_OLD) 

79 m.add_int(self.preferred_bits) 

80 self.old_style = True 

81 else: 

82 m.add_byte(c_MSG_KEXDH_GEX_REQUEST) 

83 m.add_int(self.min_bits) 

84 m.add_int(self.preferred_bits) 

85 m.add_int(self.max_bits) 

86 self.transport._send_message(m) 

87 self.transport._expect_packet(_MSG_KEXDH_GEX_GROUP) 

88 

89 def parse_next(self, ptype, m): 

90 if ptype == _MSG_KEXDH_GEX_REQUEST: 

91 return self._parse_kexdh_gex_request(m) 

92 elif ptype == _MSG_KEXDH_GEX_GROUP: 

93 return self._parse_kexdh_gex_group(m) 

94 elif ptype == _MSG_KEXDH_GEX_INIT: 

95 return self._parse_kexdh_gex_init(m) 

96 elif ptype == _MSG_KEXDH_GEX_REPLY: 

97 return self._parse_kexdh_gex_reply(m) 

98 elif ptype == _MSG_KEXDH_GEX_REQUEST_OLD: 

99 return self._parse_kexdh_gex_request_old(m) 

100 msg = "KexGex {} asked to handle packet type {:d}" 

101 raise SSHException(msg.format(self.name, ptype)) 

102 

103 # ...internals... 

104 

105 def _generate_x(self): 

106 # generate an "x" (1 < x < (p-1)/2). 

107 q = (self.p - 1) // 2 

108 qnorm = util.deflate_long(q, 0) 

109 qhbyte = byte_ord(qnorm[0]) 

110 byte_count = len(qnorm) 

111 qmask = 0xFF 

112 while not (qhbyte & 0x80): 

113 qhbyte <<= 1 

114 qmask >>= 1 

115 while True: 

116 x_bytes = os.urandom(byte_count) 

117 x_bytes = byte_mask(x_bytes[0], qmask) + x_bytes[1:] 

118 x = util.inflate_long(x_bytes, 1) 

119 if (x > 1) and (x < q): 

120 break 

121 self.x = x 

122 

123 def _parse_kexdh_gex_request(self, m): 

124 minbits = m.get_int() 

125 preferredbits = m.get_int() 

126 maxbits = m.get_int() 

127 # smoosh the user's preferred size into our own limits 

128 if preferredbits > self.max_bits: 

129 preferredbits = self.max_bits 

130 if preferredbits < self.min_bits: 

131 preferredbits = self.min_bits 

132 # fix min/max if they're inconsistent. technically, we could just pout 

133 # and hang up, but there's no harm in giving them the benefit of the 

134 # doubt and just picking a bitsize for them. 

135 if minbits > preferredbits: 

136 minbits = preferredbits 

137 if maxbits < preferredbits: 

138 maxbits = preferredbits 

139 # now save a copy 

140 self.min_bits = minbits 

141 self.preferred_bits = preferredbits 

142 self.max_bits = maxbits 

143 # generate prime 

144 pack = self.transport._get_modulus_pack() 

145 if pack is None: 

146 raise SSHException("Can't do server-side gex with no modulus pack") 

147 self.transport._log( 

148 DEBUG, 

149 "Picking p ({} <= {} <= {} bits)".format( 

150 minbits, preferredbits, maxbits 

151 ), 

152 ) 

153 self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits) 

154 m = Message() 

155 m.add_byte(c_MSG_KEXDH_GEX_GROUP) 

156 m.add_mpint(self.p) 

157 m.add_mpint(self.g) 

158 self.transport._send_message(m) 

159 self.transport._expect_packet(_MSG_KEXDH_GEX_INIT) 

160 

161 def _parse_kexdh_gex_request_old(self, m): 

162 # same as above, but without min_bits or max_bits (used by older 

163 # clients like putty) 

164 self.preferred_bits = m.get_int() 

165 # smoosh the user's preferred size into our own limits 

166 if self.preferred_bits > self.max_bits: 

167 self.preferred_bits = self.max_bits 

168 if self.preferred_bits < self.min_bits: 

169 self.preferred_bits = self.min_bits 

170 # generate prime 

171 pack = self.transport._get_modulus_pack() 

172 if pack is None: 

173 raise SSHException("Can't do server-side gex with no modulus pack") 

174 self.transport._log( 

175 DEBUG, "Picking p (~ {} bits)".format(self.preferred_bits) 

176 ) 

177 self.g, self.p = pack.get_modulus( 

178 self.min_bits, self.preferred_bits, self.max_bits 

179 ) 

180 m = Message() 

181 m.add_byte(c_MSG_KEXDH_GEX_GROUP) 

182 m.add_mpint(self.p) 

183 m.add_mpint(self.g) 

184 self.transport._send_message(m) 

185 self.transport._expect_packet(_MSG_KEXDH_GEX_INIT) 

186 self.old_style = True 

187 

188 def _parse_kexdh_gex_group(self, m): 

189 self.p = m.get_mpint() 

190 self.g = m.get_mpint() 

191 # reject if p's bit length < 1024 or > 8192 

192 bitlen = util.bit_length(self.p) 

193 if (bitlen < 1024) or (bitlen > 8192): 

194 raise SSHException( 

195 "Server-generated gex p (don't ask) is out of range " 

196 "({} bits)".format(bitlen) 

197 ) 

198 self.transport._log(DEBUG, "Got server p ({} bits)".format(bitlen)) 

199 self._generate_x() 

200 # now compute e = g^x mod p 

201 self.e = pow(self.g, self.x, self.p) 

202 m = Message() 

203 m.add_byte(c_MSG_KEXDH_GEX_INIT) 

204 m.add_mpint(self.e) 

205 self.transport._send_message(m) 

206 self.transport._expect_packet(_MSG_KEXDH_GEX_REPLY) 

207 

208 def _parse_kexdh_gex_init(self, m): 

209 self.e = m.get_mpint() 

210 if (self.e < 1) or (self.e > self.p - 1): 

211 raise SSHException('Client kex "e" is out of range') 

212 self._generate_x() 

213 self.f = pow(self.g, self.x, self.p) 

214 K = pow(self.e, self.x, self.p) 

215 key = self.transport.get_server_key().asbytes() 

216 # okay, build up the hash H of 

217 # (V_C || V_S || I_C || I_S || K_S || min || n || max || p || g || e || f || K) # noqa 

218 hm = Message() 

219 hm.add( 

220 self.transport.remote_version, 

221 self.transport.local_version, 

222 self.transport.remote_kex_init, 

223 self.transport.local_kex_init, 

224 key, 

225 ) 

226 if not self.old_style: 

227 hm.add_int(self.min_bits) 

228 hm.add_int(self.preferred_bits) 

229 if not self.old_style: 

230 hm.add_int(self.max_bits) 

231 hm.add_mpint(self.p) 

232 hm.add_mpint(self.g) 

233 hm.add_mpint(self.e) 

234 hm.add_mpint(self.f) 

235 hm.add_mpint(K) 

236 H = self.hash_algo(hm.asbytes()).digest() 

237 self.transport._set_K_H(K, H) 

238 # sign it 

239 sig = self.transport.get_server_key().sign_ssh_data( 

240 H, self.transport.host_key_type 

241 ) 

242 # send reply 

243 m = Message() 

244 m.add_byte(c_MSG_KEXDH_GEX_REPLY) 

245 m.add_string(key) 

246 m.add_mpint(self.f) 

247 m.add_string(sig) 

248 self.transport._send_message(m) 

249 self.transport._activate_outbound() 

250 

251 def _parse_kexdh_gex_reply(self, m): 

252 host_key = m.get_string() 

253 self.f = m.get_mpint() 

254 sig = m.get_string() 

255 if (self.f < 1) or (self.f > self.p - 1): 

256 raise SSHException('Server kex "f" is out of range') 

257 K = pow(self.f, self.x, self.p) 

258 # okay, build up the hash H of 

259 # (V_C || V_S || I_C || I_S || K_S || min || n || max || p || g || e || f || K) # noqa 

260 hm = Message() 

261 hm.add( 

262 self.transport.local_version, 

263 self.transport.remote_version, 

264 self.transport.local_kex_init, 

265 self.transport.remote_kex_init, 

266 host_key, 

267 ) 

268 if not self.old_style: 

269 hm.add_int(self.min_bits) 

270 hm.add_int(self.preferred_bits) 

271 if not self.old_style: 

272 hm.add_int(self.max_bits) 

273 hm.add_mpint(self.p) 

274 hm.add_mpint(self.g) 

275 hm.add_mpint(self.e) 

276 hm.add_mpint(self.f) 

277 hm.add_mpint(K) 

278 self.transport._set_K_H(K, self.hash_algo(hm.asbytes()).digest()) 

279 self.transport._verify_key(host_key, sig) 

280 self.transport._activate_outbound()