Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/kex_gex.py: 13%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20Variant on the (now deprecated/removed) ``KexGroup1`` where the prime "p" and
21generator "g" are provided by the server. A bit more work is required on the
22client side, and a **lot** more on the server side.
23"""
25import os
26from hashlib import sha256
28from paramiko import util
29from paramiko.common import DEBUG, byte_chr, byte_mask, byte_ord
30from paramiko.message import Message
31from paramiko.ssh_exception import SSHException
33(
34 _MSG_KEXDH_GEX_REQUEST_OLD,
35 _MSG_KEXDH_GEX_GROUP,
36 _MSG_KEXDH_GEX_INIT,
37 _MSG_KEXDH_GEX_REPLY,
38 _MSG_KEXDH_GEX_REQUEST,
39) = range(30, 35)
41(
42 c_MSG_KEXDH_GEX_REQUEST_OLD,
43 c_MSG_KEXDH_GEX_GROUP,
44 c_MSG_KEXDH_GEX_INIT,
45 c_MSG_KEXDH_GEX_REPLY,
46 c_MSG_KEXDH_GEX_REQUEST,
47) = [byte_chr(c) for c in range(30, 35)]
50class KexGexSHA256:
51 name = "diffie-hellman-group-exchange-sha256"
52 min_bits = 2048
53 max_bits = 8192
54 preferred_bits = 2048
55 hash_algo = sha256
57 def __init__(self, transport):
58 self.transport = transport
59 self.p = None
60 self.q = None
61 self.g = None
62 self.x = None
63 self.e = None
64 self.f = None
65 self.old_style = False
67 def start_kex(self, _test_old_style=False):
68 if self.transport.server_mode:
69 self.transport._expect_packet(
70 _MSG_KEXDH_GEX_REQUEST, _MSG_KEXDH_GEX_REQUEST_OLD
71 )
72 return
73 # request a bit range: we accept (min_bits) to (max_bits), but prefer
74 # (preferred_bits).
75 m = Message()
76 if _test_old_style:
77 # only used for unit tests: we shouldn't ever send this
78 m.add_byte(c_MSG_KEXDH_GEX_REQUEST_OLD)
79 m.add_int(self.preferred_bits)
80 self.old_style = True
81 else:
82 m.add_byte(c_MSG_KEXDH_GEX_REQUEST)
83 m.add_int(self.min_bits)
84 m.add_int(self.preferred_bits)
85 m.add_int(self.max_bits)
86 self.transport._send_message(m)
87 self.transport._expect_packet(_MSG_KEXDH_GEX_GROUP)
89 def parse_next(self, ptype, m):
90 if ptype == _MSG_KEXDH_GEX_REQUEST:
91 return self._parse_kexdh_gex_request(m)
92 elif ptype == _MSG_KEXDH_GEX_GROUP:
93 return self._parse_kexdh_gex_group(m)
94 elif ptype == _MSG_KEXDH_GEX_INIT:
95 return self._parse_kexdh_gex_init(m)
96 elif ptype == _MSG_KEXDH_GEX_REPLY:
97 return self._parse_kexdh_gex_reply(m)
98 elif ptype == _MSG_KEXDH_GEX_REQUEST_OLD:
99 return self._parse_kexdh_gex_request_old(m)
100 msg = "KexGex {} asked to handle packet type {:d}"
101 raise SSHException(msg.format(self.name, ptype))
103 # ...internals...
105 def _generate_x(self):
106 # generate an "x" (1 < x < (p-1)/2).
107 q = (self.p - 1) // 2
108 qnorm = util.deflate_long(q, 0)
109 qhbyte = byte_ord(qnorm[0])
110 byte_count = len(qnorm)
111 qmask = 0xFF
112 while not (qhbyte & 0x80):
113 qhbyte <<= 1
114 qmask >>= 1
115 while True:
116 x_bytes = os.urandom(byte_count)
117 x_bytes = byte_mask(x_bytes[0], qmask) + x_bytes[1:]
118 x = util.inflate_long(x_bytes, 1)
119 if (x > 1) and (x < q):
120 break
121 self.x = x
123 def _parse_kexdh_gex_request(self, m):
124 minbits = m.get_int()
125 preferredbits = m.get_int()
126 maxbits = m.get_int()
127 # smoosh the user's preferred size into our own limits
128 if preferredbits > self.max_bits:
129 preferredbits = self.max_bits
130 if preferredbits < self.min_bits:
131 preferredbits = self.min_bits
132 # fix min/max if they're inconsistent. technically, we could just pout
133 # and hang up, but there's no harm in giving them the benefit of the
134 # doubt and just picking a bitsize for them.
135 if minbits > preferredbits:
136 minbits = preferredbits
137 if maxbits < preferredbits:
138 maxbits = preferredbits
139 # now save a copy
140 self.min_bits = minbits
141 self.preferred_bits = preferredbits
142 self.max_bits = maxbits
143 # generate prime
144 pack = self.transport._get_modulus_pack()
145 if pack is None:
146 raise SSHException("Can't do server-side gex with no modulus pack")
147 self.transport._log(
148 DEBUG,
149 "Picking p ({} <= {} <= {} bits)".format(
150 minbits, preferredbits, maxbits
151 ),
152 )
153 self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits)
154 m = Message()
155 m.add_byte(c_MSG_KEXDH_GEX_GROUP)
156 m.add_mpint(self.p)
157 m.add_mpint(self.g)
158 self.transport._send_message(m)
159 self.transport._expect_packet(_MSG_KEXDH_GEX_INIT)
161 def _parse_kexdh_gex_request_old(self, m):
162 # same as above, but without min_bits or max_bits (used by older
163 # clients like putty)
164 self.preferred_bits = m.get_int()
165 # smoosh the user's preferred size into our own limits
166 if self.preferred_bits > self.max_bits:
167 self.preferred_bits = self.max_bits
168 if self.preferred_bits < self.min_bits:
169 self.preferred_bits = self.min_bits
170 # generate prime
171 pack = self.transport._get_modulus_pack()
172 if pack is None:
173 raise SSHException("Can't do server-side gex with no modulus pack")
174 self.transport._log(
175 DEBUG, "Picking p (~ {} bits)".format(self.preferred_bits)
176 )
177 self.g, self.p = pack.get_modulus(
178 self.min_bits, self.preferred_bits, self.max_bits
179 )
180 m = Message()
181 m.add_byte(c_MSG_KEXDH_GEX_GROUP)
182 m.add_mpint(self.p)
183 m.add_mpint(self.g)
184 self.transport._send_message(m)
185 self.transport._expect_packet(_MSG_KEXDH_GEX_INIT)
186 self.old_style = True
188 def _parse_kexdh_gex_group(self, m):
189 self.p = m.get_mpint()
190 self.g = m.get_mpint()
191 # reject if p's bit length < 1024 or > 8192
192 bitlen = util.bit_length(self.p)
193 if (bitlen < 1024) or (bitlen > 8192):
194 raise SSHException(
195 "Server-generated gex p (don't ask) is out of range "
196 "({} bits)".format(bitlen)
197 )
198 self.transport._log(DEBUG, "Got server p ({} bits)".format(bitlen))
199 self._generate_x()
200 # now compute e = g^x mod p
201 self.e = pow(self.g, self.x, self.p)
202 m = Message()
203 m.add_byte(c_MSG_KEXDH_GEX_INIT)
204 m.add_mpint(self.e)
205 self.transport._send_message(m)
206 self.transport._expect_packet(_MSG_KEXDH_GEX_REPLY)
208 def _parse_kexdh_gex_init(self, m):
209 self.e = m.get_mpint()
210 if (self.e < 1) or (self.e > self.p - 1):
211 raise SSHException('Client kex "e" is out of range')
212 self._generate_x()
213 self.f = pow(self.g, self.x, self.p)
214 K = pow(self.e, self.x, self.p)
215 key = self.transport.get_server_key().asbytes()
216 # okay, build up the hash H of
217 # (V_C || V_S || I_C || I_S || K_S || min || n || max || p || g || e || f || K) # noqa
218 hm = Message()
219 hm.add(
220 self.transport.remote_version,
221 self.transport.local_version,
222 self.transport.remote_kex_init,
223 self.transport.local_kex_init,
224 key,
225 )
226 if not self.old_style:
227 hm.add_int(self.min_bits)
228 hm.add_int(self.preferred_bits)
229 if not self.old_style:
230 hm.add_int(self.max_bits)
231 hm.add_mpint(self.p)
232 hm.add_mpint(self.g)
233 hm.add_mpint(self.e)
234 hm.add_mpint(self.f)
235 hm.add_mpint(K)
236 H = self.hash_algo(hm.asbytes()).digest()
237 self.transport._set_K_H(K, H)
238 # sign it
239 sig = self.transport.get_server_key().sign_ssh_data(
240 H, self.transport.host_key_type
241 )
242 # send reply
243 m = Message()
244 m.add_byte(c_MSG_KEXDH_GEX_REPLY)
245 m.add_string(key)
246 m.add_mpint(self.f)
247 m.add_string(sig)
248 self.transport._send_message(m)
249 self.transport._activate_outbound()
251 def _parse_kexdh_gex_reply(self, m):
252 host_key = m.get_string()
253 self.f = m.get_mpint()
254 sig = m.get_string()
255 if (self.f < 1) or (self.f > self.p - 1):
256 raise SSHException('Server kex "f" is out of range')
257 K = pow(self.f, self.x, self.p)
258 # okay, build up the hash H of
259 # (V_C || V_S || I_C || I_S || K_S || min || n || max || p || g || e || f || K) # noqa
260 hm = Message()
261 hm.add(
262 self.transport.local_version,
263 self.transport.remote_version,
264 self.transport.local_kex_init,
265 self.transport.remote_kex_init,
266 host_key,
267 )
268 if not self.old_style:
269 hm.add_int(self.min_bits)
270 hm.add_int(self.preferred_bits)
271 if not self.old_style:
272 hm.add_int(self.max_bits)
273 hm.add_mpint(self.p)
274 hm.add_mpint(self.g)
275 hm.add_mpint(self.e)
276 hm.add_mpint(self.f)
277 hm.add_mpint(K)
278 self.transport._set_K_H(K, self.hash_algo(hm.asbytes()).digest())
279 self.transport._verify_key(host_key, sig)
280 self.transport._activate_outbound()