Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/hostkeys.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20from base64 import encodebytes, decodebytes
21import binascii
22import os
23import re
25from collections.abc import MutableMapping
26from hashlib import sha1
27from hmac import HMAC
30from paramiko.pkey import PKey, UnknownKeyType
31from paramiko.util import get_logger, constant_time_bytes_eq, b, u
32from paramiko.ssh_exception import SSHException
35class HostKeys(MutableMapping):
36 """
37 Representation of an OpenSSH-style "known hosts" file. Host keys can be
38 read from one or more files, and then individual hosts can be looked up to
39 verify server keys during SSH negotiation.
41 A `.HostKeys` object can be treated like a dict; any dict lookup is
42 equivalent to calling `lookup`.
44 .. versionadded:: 1.5.3
45 """
47 def __init__(self, filename=None):
48 """
49 Create a new HostKeys object, optionally loading keys from an OpenSSH
50 style host-key file.
52 :param str filename: filename to load host keys from, or ``None``
53 """
54 # emulate a dict of { hostname: { keytype: PKey } }
55 self._entries = []
56 if filename is not None:
57 self.load(filename)
59 def add(self, hostname, keytype, key):
60 """
61 Add a host key entry to the table. Any existing entry for a
62 ``(hostname, keytype)`` pair will be replaced.
64 :param str hostname: the hostname (or IP) to add
65 :param str keytype: key type (``"ssh-rsa"`` or ``"ssh-dss"``)
66 :param .PKey key: the key to add
67 """
68 for e in self._entries:
69 if (hostname in e.hostnames) and (e.key.get_name() == keytype):
70 e.key = key
71 return
72 self._entries.append(HostKeyEntry([hostname], key))
74 def load(self, filename):
75 """
76 Read a file of known SSH host keys, in the format used by OpenSSH.
77 This type of file unfortunately doesn't exist on Windows, but on
78 posix, it will usually be stored in
79 ``os.path.expanduser("~/.ssh/known_hosts")``.
81 If this method is called multiple times, the host keys are merged,
82 not cleared. So multiple calls to `load` will just call `add`,
83 replacing any existing entries and adding new ones.
85 :param str filename: name of the file to read host keys from
87 :raises: ``IOError`` -- if there was an error reading the file
88 """
89 with open(filename, "r") as f:
90 for lineno, line in enumerate(f, 1):
91 line = line.strip()
92 if (len(line) == 0) or (line[0] == "#"):
93 continue
94 try:
95 entry = HostKeyEntry.from_line(line, lineno)
96 except SSHException:
97 continue
98 if entry is not None:
99 _hostnames = entry.hostnames
100 for h in _hostnames:
101 if self.check(h, entry.key):
102 entry.hostnames.remove(h)
103 if len(entry.hostnames):
104 self._entries.append(entry)
106 def save(self, filename):
107 """
108 Save host keys into a file, in the format used by OpenSSH. The order
109 of keys in the file will be preserved when possible (if these keys were
110 loaded from a file originally). The single exception is that combined
111 lines will be split into individual key lines, which is arguably a bug.
113 :param str filename: name of the file to write
115 :raises: ``IOError`` -- if there was an error writing the file
117 .. versionadded:: 1.6.1
118 """
119 with open(filename, "w") as f:
120 for e in self._entries:
121 line = e.to_line()
122 if line:
123 f.write(line)
125 def lookup(self, hostname):
126 """
127 Find a hostkey entry for a given hostname or IP. If no entry is found,
128 ``None`` is returned. Otherwise a dictionary of keytype to key is
129 returned. The keytype will be either ``"ssh-rsa"`` or ``"ssh-dss"``.
131 :param str hostname: the hostname (or IP) to lookup
132 :return: dict of `str` -> `.PKey` keys associated with this host
133 (or ``None``)
134 """
136 class SubDict(MutableMapping):
137 def __init__(self, hostname, entries, hostkeys):
138 self._hostname = hostname
139 self._entries = entries
140 self._hostkeys = hostkeys
142 def __iter__(self):
143 for k in self.keys():
144 yield k
146 def __len__(self):
147 return len(self.keys())
149 def __delitem__(self, key):
150 for e in list(self._entries):
151 if e.key.get_name() == key:
152 self._entries.remove(e)
153 break
154 else:
155 raise KeyError(key)
157 def __getitem__(self, key):
158 for e in self._entries:
159 if e.key.get_name() == key:
160 return e.key
161 raise KeyError(key)
163 def __setitem__(self, key, val):
164 for e in self._entries:
165 if e.key is None:
166 continue
167 if e.key.get_name() == key:
168 # replace
169 e.key = val
170 break
171 else:
172 # add a new one
173 e = HostKeyEntry([hostname], val)
174 self._entries.append(e)
175 self._hostkeys._entries.append(e)
177 def keys(self):
178 return [
179 e.key.get_name()
180 for e in self._entries
181 if e.key is not None
182 ]
184 entries = []
185 for e in self._entries:
186 if self._hostname_matches(hostname, e):
187 entries.append(e)
188 if len(entries) == 0:
189 return None
190 return SubDict(hostname, entries, self)
192 def _hostname_matches(self, hostname, entry):
193 """
194 Tests whether ``hostname`` string matches given SubDict ``entry``.
196 :returns bool:
197 """
198 for h in entry.hostnames:
199 if (
200 h == hostname
201 or h.startswith("|1|")
202 and not hostname.startswith("|1|")
203 and constant_time_bytes_eq(self.hash_host(hostname, h), h)
204 ):
205 return True
206 return False
208 def check(self, hostname, key):
209 """
210 Return True if the given key is associated with the given hostname
211 in this dictionary.
213 :param str hostname: hostname (or IP) of the SSH server
214 :param .PKey key: the key to check
215 :return:
216 ``True`` if the key is associated with the hostname; else ``False``
217 """
218 k = self.lookup(hostname)
219 if k is None:
220 return False
221 host_key = k.get(key.get_name(), None)
222 if host_key is None:
223 return False
224 return host_key.asbytes() == key.asbytes()
226 def clear(self):
227 """
228 Remove all host keys from the dictionary.
229 """
230 self._entries = []
232 def __iter__(self):
233 for k in self.keys():
234 yield k
236 def __len__(self):
237 return len(self.keys())
239 def __getitem__(self, key):
240 ret = self.lookup(key)
241 if ret is None:
242 raise KeyError(key)
243 return ret
245 def __delitem__(self, key):
246 index = None
247 for i, entry in enumerate(self._entries):
248 if self._hostname_matches(key, entry):
249 index = i
250 break
251 if index is None:
252 raise KeyError(key)
253 self._entries.pop(index)
255 def __setitem__(self, hostname, entry):
256 # don't use this please.
257 if len(entry) == 0:
258 self._entries.append(HostKeyEntry([hostname], None))
259 return
260 for key_type in entry.keys():
261 found = False
262 for e in self._entries:
263 if (hostname in e.hostnames) and e.key.get_name() == key_type:
264 # replace
265 e.key = entry[key_type]
266 found = True
267 if not found:
268 self._entries.append(HostKeyEntry([hostname], entry[key_type]))
270 def keys(self):
271 ret = []
272 for e in self._entries:
273 for h in e.hostnames:
274 if h not in ret:
275 ret.append(h)
276 return ret
278 def values(self):
279 ret = []
280 for k in self.keys():
281 ret.append(self.lookup(k))
282 return ret
284 @staticmethod
285 def hash_host(hostname, salt=None):
286 """
287 Return a "hashed" form of the hostname, as used by OpenSSH when storing
288 hashed hostnames in the known_hosts file.
290 :param str hostname: the hostname to hash
291 :param str salt: optional salt to use when hashing
292 (must be 20 bytes long)
293 :return: the hashed hostname as a `str`
294 """
295 if salt is None:
296 salt = os.urandom(sha1().digest_size)
297 else:
298 if salt.startswith("|1|"):
299 salt = salt.split("|")[2]
300 salt = decodebytes(b(salt))
301 assert len(salt) == sha1().digest_size
302 hmac = HMAC(salt, b(hostname), sha1).digest()
303 hostkey = "|1|{}|{}".format(u(encodebytes(salt)), u(encodebytes(hmac)))
304 return hostkey.replace("\n", "")
307class InvalidHostKey(Exception):
308 def __init__(self, line, exc):
309 self.line = line
310 self.exc = exc
311 self.args = (line, exc)
314class HostKeyEntry:
315 """
316 Representation of a line in an OpenSSH-style "known hosts" file.
317 """
319 def __init__(self, hostnames=None, key=None):
320 self.valid = (hostnames is not None) and (key is not None)
321 self.hostnames = hostnames
322 self.key = key
324 @classmethod
325 def from_line(cls, line, lineno=None):
326 """
327 Parses the given line of text to find the names for the host,
328 the type of key, and the key data. The line is expected to be in the
329 format used by the OpenSSH known_hosts file. Fields are separated by a
330 single space or tab.
332 Lines are expected to not have leading or trailing whitespace.
333 We don't bother to check for comments or empty lines. All of
334 that should be taken care of before sending the line to us.
336 :param str line: a line from an OpenSSH known_hosts file
337 """
338 log = get_logger("paramiko.hostkeys")
339 fields = re.split(" |\t", line)
340 if len(fields) < 3:
341 # Bad number of fields
342 msg = "Not enough fields found in known_hosts in line {} ({!r})"
343 log.info(msg.format(lineno, line))
344 return None
345 fields = fields[:3]
347 names, key_type, key = fields
348 names = names.split(",")
350 # Decide what kind of key we're looking at and create an object
351 # to hold it accordingly.
352 try:
353 # TODO: this grew organically and doesn't seem /wrong/ per se (file
354 # read -> unicode str -> bytes for base64 decode -> decoded bytes);
355 # but in Python 3 forever land, can we simply use
356 # `base64.b64decode(str-from-file)` here?
357 key_bytes = decodebytes(b(key))
358 except binascii.Error as e:
359 raise InvalidHostKey(line, e)
361 try:
362 return cls(names, PKey.from_type_string(key_type, key_bytes))
363 except UnknownKeyType:
364 # TODO 4.0: consider changing HostKeys API so this just raises
365 # naturally and the exception is muted higher up in the stack?
366 log.info("Unable to handle key of type {}".format(key_type))
367 return None
369 def to_line(self):
370 """
371 Returns a string in OpenSSH known_hosts file format, or None if
372 the object is not in a valid state. A trailing newline is
373 included.
374 """
375 if self.valid:
376 return "{} {} {}\n".format(
377 ",".join(self.hostnames),
378 self.key.get_name(),
379 self.key.get_base64(),
380 )
381 return None
383 def __repr__(self):
384 return "<HostKeyEntry {!r}: {!r}>".format(self.hostnames, self.key)