Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/kex_gex.py: 15%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20Variant on `KexGroup1 <paramiko.kex_group1.KexGroup1>` where the prime "p" and
21generator "g" are provided by the server. A bit more work is required on the
22client side, and a **lot** more on the server side.
23"""
25import os
26from hashlib import sha1, sha256
28from paramiko import util
29from paramiko.common import DEBUG, byte_chr, byte_ord, byte_mask
30from paramiko.message import Message
31from paramiko.ssh_exception import SSHException
34(
35 _MSG_KEXDH_GEX_REQUEST_OLD,
36 _MSG_KEXDH_GEX_GROUP,
37 _MSG_KEXDH_GEX_INIT,
38 _MSG_KEXDH_GEX_REPLY,
39 _MSG_KEXDH_GEX_REQUEST,
40) = range(30, 35)
42(
43 c_MSG_KEXDH_GEX_REQUEST_OLD,
44 c_MSG_KEXDH_GEX_GROUP,
45 c_MSG_KEXDH_GEX_INIT,
46 c_MSG_KEXDH_GEX_REPLY,
47 c_MSG_KEXDH_GEX_REQUEST,
48) = [byte_chr(c) for c in range(30, 35)]
51class KexGex:
53 name = "diffie-hellman-group-exchange-sha1"
54 min_bits = 1024
55 max_bits = 8192
56 preferred_bits = 2048
57 hash_algo = sha1
59 def __init__(self, transport):
60 self.transport = transport
61 self.p = None
62 self.q = None
63 self.g = None
64 self.x = None
65 self.e = None
66 self.f = None
67 self.old_style = False
69 def start_kex(self, _test_old_style=False):
70 if self.transport.server_mode:
71 self.transport._expect_packet(
72 _MSG_KEXDH_GEX_REQUEST, _MSG_KEXDH_GEX_REQUEST_OLD
73 )
74 return
75 # request a bit range: we accept (min_bits) to (max_bits), but prefer
76 # (preferred_bits). according to the spec, we shouldn't pull the
77 # minimum up above 1024.
78 m = Message()
79 if _test_old_style:
80 # only used for unit tests: we shouldn't ever send this
81 m.add_byte(c_MSG_KEXDH_GEX_REQUEST_OLD)
82 m.add_int(self.preferred_bits)
83 self.old_style = True
84 else:
85 m.add_byte(c_MSG_KEXDH_GEX_REQUEST)
86 m.add_int(self.min_bits)
87 m.add_int(self.preferred_bits)
88 m.add_int(self.max_bits)
89 self.transport._send_message(m)
90 self.transport._expect_packet(_MSG_KEXDH_GEX_GROUP)
92 def parse_next(self, ptype, m):
93 if ptype == _MSG_KEXDH_GEX_REQUEST:
94 return self._parse_kexdh_gex_request(m)
95 elif ptype == _MSG_KEXDH_GEX_GROUP:
96 return self._parse_kexdh_gex_group(m)
97 elif ptype == _MSG_KEXDH_GEX_INIT:
98 return self._parse_kexdh_gex_init(m)
99 elif ptype == _MSG_KEXDH_GEX_REPLY:
100 return self._parse_kexdh_gex_reply(m)
101 elif ptype == _MSG_KEXDH_GEX_REQUEST_OLD:
102 return self._parse_kexdh_gex_request_old(m)
103 msg = "KexGex {} asked to handle packet type {:d}"
104 raise SSHException(msg.format(self.name, ptype))
106 # ...internals...
108 def _generate_x(self):
109 # generate an "x" (1 < x < (p-1)/2).
110 q = (self.p - 1) // 2
111 qnorm = util.deflate_long(q, 0)
112 qhbyte = byte_ord(qnorm[0])
113 byte_count = len(qnorm)
114 qmask = 0xFF
115 while not (qhbyte & 0x80):
116 qhbyte <<= 1
117 qmask >>= 1
118 while True:
119 x_bytes = os.urandom(byte_count)
120 x_bytes = byte_mask(x_bytes[0], qmask) + x_bytes[1:]
121 x = util.inflate_long(x_bytes, 1)
122 if (x > 1) and (x < q):
123 break
124 self.x = x
126 def _parse_kexdh_gex_request(self, m):
127 minbits = m.get_int()
128 preferredbits = m.get_int()
129 maxbits = m.get_int()
130 # smoosh the user's preferred size into our own limits
131 if preferredbits > self.max_bits:
132 preferredbits = self.max_bits
133 if preferredbits < self.min_bits:
134 preferredbits = self.min_bits
135 # fix min/max if they're inconsistent. technically, we could just pout
136 # and hang up, but there's no harm in giving them the benefit of the
137 # doubt and just picking a bitsize for them.
138 if minbits > preferredbits:
139 minbits = preferredbits
140 if maxbits < preferredbits:
141 maxbits = preferredbits
142 # now save a copy
143 self.min_bits = minbits
144 self.preferred_bits = preferredbits
145 self.max_bits = maxbits
146 # generate prime
147 pack = self.transport._get_modulus_pack()
148 if pack is None:
149 raise SSHException("Can't do server-side gex with no modulus pack")
150 self.transport._log(
151 DEBUG,
152 "Picking p ({} <= {} <= {} bits)".format(
153 minbits, preferredbits, maxbits
154 ),
155 )
156 self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits)
157 m = Message()
158 m.add_byte(c_MSG_KEXDH_GEX_GROUP)
159 m.add_mpint(self.p)
160 m.add_mpint(self.g)
161 self.transport._send_message(m)
162 self.transport._expect_packet(_MSG_KEXDH_GEX_INIT)
164 def _parse_kexdh_gex_request_old(self, m):
165 # same as above, but without min_bits or max_bits (used by older
166 # clients like putty)
167 self.preferred_bits = m.get_int()
168 # smoosh the user's preferred size into our own limits
169 if self.preferred_bits > self.max_bits:
170 self.preferred_bits = self.max_bits
171 if self.preferred_bits < self.min_bits:
172 self.preferred_bits = self.min_bits
173 # generate prime
174 pack = self.transport._get_modulus_pack()
175 if pack is None:
176 raise SSHException("Can't do server-side gex with no modulus pack")
177 self.transport._log(
178 DEBUG, "Picking p (~ {} bits)".format(self.preferred_bits)
179 )
180 self.g, self.p = pack.get_modulus(
181 self.min_bits, self.preferred_bits, self.max_bits
182 )
183 m = Message()
184 m.add_byte(c_MSG_KEXDH_GEX_GROUP)
185 m.add_mpint(self.p)
186 m.add_mpint(self.g)
187 self.transport._send_message(m)
188 self.transport._expect_packet(_MSG_KEXDH_GEX_INIT)
189 self.old_style = True
191 def _parse_kexdh_gex_group(self, m):
192 self.p = m.get_mpint()
193 self.g = m.get_mpint()
194 # reject if p's bit length < 1024 or > 8192
195 bitlen = util.bit_length(self.p)
196 if (bitlen < 1024) or (bitlen > 8192):
197 raise SSHException(
198 "Server-generated gex p (don't ask) is out of range "
199 "({} bits)".format(bitlen)
200 )
201 self.transport._log(DEBUG, "Got server p ({} bits)".format(bitlen))
202 self._generate_x()
203 # now compute e = g^x mod p
204 self.e = pow(self.g, self.x, self.p)
205 m = Message()
206 m.add_byte(c_MSG_KEXDH_GEX_INIT)
207 m.add_mpint(self.e)
208 self.transport._send_message(m)
209 self.transport._expect_packet(_MSG_KEXDH_GEX_REPLY)
211 def _parse_kexdh_gex_init(self, m):
212 self.e = m.get_mpint()
213 if (self.e < 1) or (self.e > self.p - 1):
214 raise SSHException('Client kex "e" is out of range')
215 self._generate_x()
216 self.f = pow(self.g, self.x, self.p)
217 K = pow(self.e, self.x, self.p)
218 key = self.transport.get_server_key().asbytes()
219 # okay, build up the hash H of
220 # (V_C || V_S || I_C || I_S || K_S || min || n || max || p || g || e || f || K) # noqa
221 hm = Message()
222 hm.add(
223 self.transport.remote_version,
224 self.transport.local_version,
225 self.transport.remote_kex_init,
226 self.transport.local_kex_init,
227 key,
228 )
229 if not self.old_style:
230 hm.add_int(self.min_bits)
231 hm.add_int(self.preferred_bits)
232 if not self.old_style:
233 hm.add_int(self.max_bits)
234 hm.add_mpint(self.p)
235 hm.add_mpint(self.g)
236 hm.add_mpint(self.e)
237 hm.add_mpint(self.f)
238 hm.add_mpint(K)
239 H = self.hash_algo(hm.asbytes()).digest()
240 self.transport._set_K_H(K, H)
241 # sign it
242 sig = self.transport.get_server_key().sign_ssh_data(
243 H, self.transport.host_key_type
244 )
245 # send reply
246 m = Message()
247 m.add_byte(c_MSG_KEXDH_GEX_REPLY)
248 m.add_string(key)
249 m.add_mpint(self.f)
250 m.add_string(sig)
251 self.transport._send_message(m)
252 self.transport._activate_outbound()
254 def _parse_kexdh_gex_reply(self, m):
255 host_key = m.get_string()
256 self.f = m.get_mpint()
257 sig = m.get_string()
258 if (self.f < 1) or (self.f > self.p - 1):
259 raise SSHException('Server kex "f" is out of range')
260 K = pow(self.f, self.x, self.p)
261 # okay, build up the hash H of
262 # (V_C || V_S || I_C || I_S || K_S || min || n || max || p || g || e || f || K) # noqa
263 hm = Message()
264 hm.add(
265 self.transport.local_version,
266 self.transport.remote_version,
267 self.transport.local_kex_init,
268 self.transport.remote_kex_init,
269 host_key,
270 )
271 if not self.old_style:
272 hm.add_int(self.min_bits)
273 hm.add_int(self.preferred_bits)
274 if not self.old_style:
275 hm.add_int(self.max_bits)
276 hm.add_mpint(self.p)
277 hm.add_mpint(self.g)
278 hm.add_mpint(self.e)
279 hm.add_mpint(self.f)
280 hm.add_mpint(K)
281 self.transport._set_K_H(K, self.hash_algo(hm.asbytes()).digest())
282 self.transport._verify_key(host_key, sig)
283 self.transport._activate_outbound()
286class KexGexSHA256(KexGex):
287 name = "diffie-hellman-group-exchange-sha256"
288 hash_algo = sha256