Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/hostkeys.py: 20%
199 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20from base64 import encodebytes, decodebytes
21import binascii
22import os
23import re
25from collections.abc import MutableMapping
26from hashlib import sha1
27from hmac import HMAC
30from paramiko.dsskey import DSSKey
31from paramiko.rsakey import RSAKey
32from paramiko.util import get_logger, constant_time_bytes_eq, b, u
33from paramiko.ecdsakey import ECDSAKey
34from paramiko.ed25519key import Ed25519Key
35from paramiko.ssh_exception import SSHException
38class HostKeys(MutableMapping):
39 """
40 Representation of an OpenSSH-style "known hosts" file. Host keys can be
41 read from one or more files, and then individual hosts can be looked up to
42 verify server keys during SSH negotiation.
44 A `.HostKeys` object can be treated like a dict; any dict lookup is
45 equivalent to calling `lookup`.
47 .. versionadded:: 1.5.3
48 """
50 def __init__(self, filename=None):
51 """
52 Create a new HostKeys object, optionally loading keys from an OpenSSH
53 style host-key file.
55 :param str filename: filename to load host keys from, or ``None``
56 """
57 # emulate a dict of { hostname: { keytype: PKey } }
58 self._entries = []
59 if filename is not None:
60 self.load(filename)
62 def add(self, hostname, keytype, key):
63 """
64 Add a host key entry to the table. Any existing entry for a
65 ``(hostname, keytype)`` pair will be replaced.
67 :param str hostname: the hostname (or IP) to add
68 :param str keytype: key type (``"ssh-rsa"`` or ``"ssh-dss"``)
69 :param .PKey key: the key to add
70 """
71 for e in self._entries:
72 if (hostname in e.hostnames) and (e.key.get_name() == keytype):
73 e.key = key
74 return
75 self._entries.append(HostKeyEntry([hostname], key))
77 def load(self, filename):
78 """
79 Read a file of known SSH host keys, in the format used by OpenSSH.
80 This type of file unfortunately doesn't exist on Windows, but on
81 posix, it will usually be stored in
82 ``os.path.expanduser("~/.ssh/known_hosts")``.
84 If this method is called multiple times, the host keys are merged,
85 not cleared. So multiple calls to `load` will just call `add`,
86 replacing any existing entries and adding new ones.
88 :param str filename: name of the file to read host keys from
90 :raises: ``IOError`` -- if there was an error reading the file
91 """
92 with open(filename, "r") as f:
93 for lineno, line in enumerate(f, 1):
94 line = line.strip()
95 if (len(line) == 0) or (line[0] == "#"):
96 continue
97 try:
98 e = HostKeyEntry.from_line(line, lineno)
99 except SSHException:
100 continue
101 if e is not None:
102 _hostnames = e.hostnames
103 for h in _hostnames:
104 if self.check(h, e.key):
105 e.hostnames.remove(h)
106 if len(e.hostnames):
107 self._entries.append(e)
109 def save(self, filename):
110 """
111 Save host keys into a file, in the format used by OpenSSH. The order
112 of keys in the file will be preserved when possible (if these keys were
113 loaded from a file originally). The single exception is that combined
114 lines will be split into individual key lines, which is arguably a bug.
116 :param str filename: name of the file to write
118 :raises: ``IOError`` -- if there was an error writing the file
120 .. versionadded:: 1.6.1
121 """
122 with open(filename, "w") as f:
123 for e in self._entries:
124 line = e.to_line()
125 if line:
126 f.write(line)
128 def lookup(self, hostname):
129 """
130 Find a hostkey entry for a given hostname or IP. If no entry is found,
131 ``None`` is returned. Otherwise a dictionary of keytype to key is
132 returned. The keytype will be either ``"ssh-rsa"`` or ``"ssh-dss"``.
134 :param str hostname: the hostname (or IP) to lookup
135 :return: dict of `str` -> `.PKey` keys associated with this host
136 (or ``None``)
137 """
139 class SubDict(MutableMapping):
140 def __init__(self, hostname, entries, hostkeys):
141 self._hostname = hostname
142 self._entries = entries
143 self._hostkeys = hostkeys
145 def __iter__(self):
146 for k in self.keys():
147 yield k
149 def __len__(self):
150 return len(self.keys())
152 def __delitem__(self, key):
153 for e in list(self._entries):
154 if e.key.get_name() == key:
155 self._entries.remove(e)
156 break
157 else:
158 raise KeyError(key)
160 def __getitem__(self, key):
161 for e in self._entries:
162 if e.key.get_name() == key:
163 return e.key
164 raise KeyError(key)
166 def __setitem__(self, key, val):
167 for e in self._entries:
168 if e.key is None:
169 continue
170 if e.key.get_name() == key:
171 # replace
172 e.key = val
173 break
174 else:
175 # add a new one
176 e = HostKeyEntry([hostname], val)
177 self._entries.append(e)
178 self._hostkeys._entries.append(e)
180 def keys(self):
181 return [
182 e.key.get_name()
183 for e in self._entries
184 if e.key is not None
185 ]
187 entries = []
188 for e in self._entries:
189 if self._hostname_matches(hostname, e):
190 entries.append(e)
191 if len(entries) == 0:
192 return None
193 return SubDict(hostname, entries, self)
195 def _hostname_matches(self, hostname, entry):
196 """
197 Tests whether ``hostname`` string matches given SubDict ``entry``.
199 :returns bool:
200 """
201 for h in entry.hostnames:
202 if (
203 h == hostname
204 or h.startswith("|1|")
205 and not hostname.startswith("|1|")
206 and constant_time_bytes_eq(self.hash_host(hostname, h), h)
207 ):
208 return True
209 return False
211 def check(self, hostname, key):
212 """
213 Return True if the given key is associated with the given hostname
214 in this dictionary.
216 :param str hostname: hostname (or IP) of the SSH server
217 :param .PKey key: the key to check
218 :return:
219 ``True`` if the key is associated with the hostname; else ``False``
220 """
221 k = self.lookup(hostname)
222 if k is None:
223 return False
224 host_key = k.get(key.get_name(), None)
225 if host_key is None:
226 return False
227 return host_key.asbytes() == key.asbytes()
229 def clear(self):
230 """
231 Remove all host keys from the dictionary.
232 """
233 self._entries = []
235 def __iter__(self):
236 for k in self.keys():
237 yield k
239 def __len__(self):
240 return len(self.keys())
242 def __getitem__(self, key):
243 ret = self.lookup(key)
244 if ret is None:
245 raise KeyError(key)
246 return ret
248 def __delitem__(self, key):
249 index = None
250 for i, entry in enumerate(self._entries):
251 if self._hostname_matches(key, entry):
252 index = i
253 break
254 if index is None:
255 raise KeyError(key)
256 self._entries.pop(index)
258 def __setitem__(self, hostname, entry):
259 # don't use this please.
260 if len(entry) == 0:
261 self._entries.append(HostKeyEntry([hostname], None))
262 return
263 for key_type in entry.keys():
264 found = False
265 for e in self._entries:
266 if (hostname in e.hostnames) and e.key.get_name() == key_type:
267 # replace
268 e.key = entry[key_type]
269 found = True
270 if not found:
271 self._entries.append(HostKeyEntry([hostname], entry[key_type]))
273 def keys(self):
274 ret = []
275 for e in self._entries:
276 for h in e.hostnames:
277 if h not in ret:
278 ret.append(h)
279 return ret
281 def values(self):
282 ret = []
283 for k in self.keys():
284 ret.append(self.lookup(k))
285 return ret
287 @staticmethod
288 def hash_host(hostname, salt=None):
289 """
290 Return a "hashed" form of the hostname, as used by OpenSSH when storing
291 hashed hostnames in the known_hosts file.
293 :param str hostname: the hostname to hash
294 :param str salt: optional salt to use when hashing
295 (must be 20 bytes long)
296 :return: the hashed hostname as a `str`
297 """
298 if salt is None:
299 salt = os.urandom(sha1().digest_size)
300 else:
301 if salt.startswith("|1|"):
302 salt = salt.split("|")[2]
303 salt = decodebytes(b(salt))
304 assert len(salt) == sha1().digest_size
305 hmac = HMAC(salt, b(hostname), sha1).digest()
306 hostkey = "|1|{}|{}".format(u(encodebytes(salt)), u(encodebytes(hmac)))
307 return hostkey.replace("\n", "")
310class InvalidHostKey(Exception):
311 def __init__(self, line, exc):
312 self.line = line
313 self.exc = exc
314 self.args = (line, exc)
317class HostKeyEntry:
318 """
319 Representation of a line in an OpenSSH-style "known hosts" file.
320 """
322 def __init__(self, hostnames=None, key=None):
323 self.valid = (hostnames is not None) and (key is not None)
324 self.hostnames = hostnames
325 self.key = key
327 @classmethod
328 def from_line(cls, line, lineno=None):
329 """
330 Parses the given line of text to find the names for the host,
331 the type of key, and the key data. The line is expected to be in the
332 format used by the OpenSSH known_hosts file. Fields are separated by a
333 single space or tab.
335 Lines are expected to not have leading or trailing whitespace.
336 We don't bother to check for comments or empty lines. All of
337 that should be taken care of before sending the line to us.
339 :param str line: a line from an OpenSSH known_hosts file
340 """
341 log = get_logger("paramiko.hostkeys")
342 fields = re.split(" |\t", line)
343 if len(fields) < 3:
344 # Bad number of fields
345 msg = "Not enough fields found in known_hosts in line {} ({!r})"
346 log.info(msg.format(lineno, line))
347 return None
348 fields = fields[:3]
350 names, keytype, key = fields
351 names = names.split(",")
353 # Decide what kind of key we're looking at and create an object
354 # to hold it accordingly.
355 try:
356 key = b(key)
357 if keytype == "ssh-rsa":
358 key = RSAKey(data=decodebytes(key))
359 elif keytype == "ssh-dss":
360 key = DSSKey(data=decodebytes(key))
361 elif keytype in ECDSAKey.supported_key_format_identifiers():
362 key = ECDSAKey(data=decodebytes(key), validate_point=False)
363 elif keytype == "ssh-ed25519":
364 key = Ed25519Key(data=decodebytes(key))
365 else:
366 log.info("Unable to handle key of type {}".format(keytype))
367 return None
369 except binascii.Error as e:
370 raise InvalidHostKey(line, e)
372 return cls(names, key)
374 def to_line(self):
375 """
376 Returns a string in OpenSSH known_hosts file format, or None if
377 the object is not in a valid state. A trailing newline is
378 included.
379 """
380 if self.valid:
381 return "{} {} {}\n".format(
382 ",".join(self.hostnames),
383 self.key.get_name(),
384 self.key.get_base64(),
385 )
386 return None
388 def __repr__(self):
389 return "<HostKeyEntry {!r}: {!r}>".format(self.hostnames, self.key)