Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/hostkeys.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20import binascii
21import os
22import re
23from base64 import decodebytes, encodebytes
24from collections.abc import MutableMapping
25from hashlib import sha1
26from hmac import HMAC
28from paramiko.pkey import PKey, UnknownKeyType
29from paramiko.ssh_exception import SSHException
30from paramiko.util import b, constant_time_bytes_eq, get_logger, u
33class HostKeys(MutableMapping):
34 """
35 Representation of an OpenSSH-style "known hosts" file. Host keys can be
36 read from one or more files, and then individual hosts can be looked up to
37 verify server keys during SSH negotiation.
39 A `.HostKeys` object can be treated like a dict; any dict lookup is
40 equivalent to calling `lookup`.
42 .. versionadded:: 1.5.3
43 """
45 def __init__(self, filename=None):
46 """
47 Create a new HostKeys object, optionally loading keys from an OpenSSH
48 style host-key file.
50 :param str filename: filename to load host keys from, or ``None``
51 """
52 # emulate a dict of { hostname: { keytype: PKey } }
53 self._entries = []
54 if filename is not None:
55 self.load(filename)
57 def add(self, hostname, keytype, key):
58 """
59 Add a host key entry to the table. Any existing entry for a
60 ``(hostname, keytype)`` pair will be replaced.
62 :param str hostname: the hostname (or IP) to add
63 :param str keytype: key type (in ``"ssh-<type>"`` format)
64 :param .PKey key: the key to add
65 """
66 for e in self._entries:
67 if (hostname in e.hostnames) and (e.key.get_name() == keytype):
68 e.key = key
69 return
70 self._entries.append(HostKeyEntry([hostname], key))
72 def load(self, filename):
73 """
74 Read a file of known SSH host keys, in the format used by OpenSSH.
75 This type of file unfortunately doesn't exist on Windows, but on
76 posix, it will usually be stored in
77 ``os.path.expanduser("~/.ssh/known_hosts")``.
79 If this method is called multiple times, the host keys are merged,
80 not cleared. So multiple calls to `load` will just call `add`,
81 replacing any existing entries and adding new ones.
83 :param str filename: name of the file to read host keys from
85 :raises: ``IOError`` -- if there was an error reading the file
86 """
87 with open(filename, "r") as f:
88 for lineno, line in enumerate(f, 1):
89 line = line.strip()
90 if (len(line) == 0) or (line[0] == "#"):
91 continue
92 try:
93 entry = HostKeyEntry.from_line(line, lineno)
94 except SSHException:
95 continue
96 if entry is not None:
97 _hostnames = entry.hostnames
98 for h in _hostnames:
99 if self.check(h, entry.key):
100 entry.hostnames.remove(h)
101 if len(entry.hostnames):
102 self._entries.append(entry)
104 def save(self, filename):
105 """
106 Save host keys into a file, in the format used by OpenSSH. The order
107 of keys in the file will be preserved when possible (if these keys were
108 loaded from a file originally). The single exception is that combined
109 lines will be split into individual key lines, which is arguably a bug.
111 :param str filename: name of the file to write
113 :raises: ``IOError`` -- if there was an error writing the file
115 .. versionadded:: 1.6.1
116 """
117 with open(filename, "w") as f:
118 for e in self._entries:
119 line = e.to_line()
120 if line:
121 f.write(line)
123 def lookup(self, hostname):
124 """
125 Find a hostkey entry for a given hostname or IP. If no entry is found,
126 ``None`` is returned. Otherwise a dictionary of keytype to key is
127 returned.
129 :param str hostname: the hostname (or IP) to lookup
130 :return: dict of `str` -> `.PKey` keys associated with this host
131 (or ``None``)
132 """
134 class SubDict(MutableMapping):
135 def __init__(self, hostname, entries, hostkeys):
136 self._hostname = hostname
137 self._entries = entries
138 self._hostkeys = hostkeys
140 def __iter__(self):
141 for k in self.keys():
142 yield k
144 def __len__(self):
145 return len(self.keys())
147 def __delitem__(self, key):
148 for e in list(self._entries):
149 if e.key.get_name() == key:
150 self._entries.remove(e)
151 break
152 else:
153 raise KeyError(key)
155 def __getitem__(self, key):
156 for e in self._entries:
157 if e.key.get_name() == key:
158 return e.key
159 raise KeyError(key)
161 def __setitem__(self, key, val):
162 for e in self._entries:
163 if e.key is None:
164 continue
165 if e.key.get_name() == key:
166 # replace
167 e.key = val
168 break
169 else:
170 # add a new one
171 e = HostKeyEntry([hostname], val)
172 self._entries.append(e)
173 self._hostkeys._entries.append(e)
175 def keys(self):
176 return [
177 e.key.get_name()
178 for e in self._entries
179 if e.key is not None
180 ]
182 entries = []
183 for e in self._entries:
184 if self._hostname_matches(hostname, e):
185 entries.append(e)
186 if len(entries) == 0:
187 return None
188 return SubDict(hostname, entries, self)
190 def _hostname_matches(self, hostname, entry):
191 """
192 Tests whether ``hostname`` string matches given SubDict ``entry``.
194 :returns bool:
195 """
196 for h in entry.hostnames:
197 if (
198 h == hostname
199 or h.startswith("|1|")
200 and not hostname.startswith("|1|")
201 and constant_time_bytes_eq(self.hash_host(hostname, h), h)
202 ):
203 return True
204 return False
206 def check(self, hostname, key):
207 """
208 Return True if the given key is associated with the given hostname
209 in this dictionary.
211 :param str hostname: hostname (or IP) of the SSH server
212 :param .PKey key: the key to check
213 :return:
214 ``True`` if the key is associated with the hostname; else ``False``
215 """
216 k = self.lookup(hostname)
217 if k is None:
218 return False
219 host_key = k.get(key.get_name(), None)
220 if host_key is None:
221 return False
222 return host_key.asbytes() == key.asbytes()
224 def clear(self):
225 """
226 Remove all host keys from the dictionary.
227 """
228 self._entries = []
230 def __iter__(self):
231 for k in self.keys():
232 yield k
234 def __len__(self):
235 return len(self.keys())
237 def __getitem__(self, key):
238 ret = self.lookup(key)
239 if ret is None:
240 raise KeyError(key)
241 return ret
243 def __delitem__(self, key):
244 index = None
245 for i, entry in enumerate(self._entries):
246 if self._hostname_matches(key, entry):
247 index = i
248 break
249 if index is None:
250 raise KeyError(key)
251 self._entries.pop(index)
253 def __setitem__(self, hostname, entry):
254 # don't use this please.
255 if len(entry) == 0:
256 self._entries.append(HostKeyEntry([hostname], None))
257 return
258 for key_type in entry.keys():
259 found = False
260 for e in self._entries:
261 if (hostname in e.hostnames) and e.key.get_name() == key_type:
262 # replace
263 e.key = entry[key_type]
264 found = True
265 if not found:
266 self._entries.append(HostKeyEntry([hostname], entry[key_type]))
268 def keys(self):
269 ret = []
270 for e in self._entries:
271 for h in e.hostnames:
272 if h not in ret:
273 ret.append(h)
274 return ret
276 def values(self):
277 ret = []
278 for k in self.keys():
279 ret.append(self.lookup(k))
280 return ret
282 @staticmethod
283 def hash_host(hostname, salt=None):
284 """
285 Return a "hashed" form of the hostname, as used by OpenSSH when storing
286 hashed hostnames in the known_hosts file.
288 :param str hostname: the hostname to hash
289 :param str salt: optional salt to use when hashing
290 (must be 20 bytes long)
291 :return: the hashed hostname as a `str`
292 """
293 if salt is None:
294 salt = os.urandom(sha1().digest_size)
295 else:
296 if salt.startswith("|1|"):
297 salt = salt.split("|")[2]
298 salt = decodebytes(b(salt))
299 assert len(salt) == sha1().digest_size
300 hmac = HMAC(salt, b(hostname), sha1).digest()
301 hostkey = "|1|{}|{}".format(u(encodebytes(salt)), u(encodebytes(hmac)))
302 return hostkey.replace("\n", "")
305class InvalidHostKey(Exception):
306 def __init__(self, line, exc):
307 self.line = line
308 self.exc = exc
309 self.args = (line, exc)
312class HostKeyEntry:
313 """
314 Representation of a line in an OpenSSH-style "known hosts" file.
315 """
317 def __init__(self, hostnames=None, key=None):
318 self.valid = (hostnames is not None) and (key is not None)
319 self.hostnames = hostnames
320 self.key = key
322 @classmethod
323 def from_line(cls, line, lineno=None):
324 """
325 Parses the given line of text to find the names for the host,
326 the type of key, and the key data. The line is expected to be in the
327 format used by the OpenSSH known_hosts file. Fields are separated by a
328 single space or tab.
330 Lines are expected to not have leading or trailing whitespace.
331 We don't bother to check for comments or empty lines. All of
332 that should be taken care of before sending the line to us.
334 :param str line: a line from an OpenSSH known_hosts file
335 """
336 log = get_logger("paramiko.hostkeys")
337 fields = re.split(" |\t", line)
338 if len(fields) < 3:
339 # Bad number of fields
340 msg = "Not enough fields found in known_hosts in line {} ({!r})"
341 log.info(msg.format(lineno, line))
342 return None
343 fields = fields[:3]
345 names, key_type, key = fields
346 names = names.split(",")
348 # Decide what kind of key we're looking at and create an object
349 # to hold it accordingly.
350 try:
351 # TODO: this grew organically and doesn't seem /wrong/ per se (file
352 # read -> unicode str -> bytes for base64 decode -> decoded bytes);
353 # but in Python 3 forever land, can we simply use
354 # `base64.b64decode(str-from-file)` here?
355 key_bytes = decodebytes(b(key))
356 except binascii.Error as e:
357 raise InvalidHostKey(line, e)
359 try:
360 return cls(names, PKey.from_type_string(key_type, key_bytes))
361 except UnknownKeyType:
362 # TODO (backwards incompat): consider changing HostKeys API so this
363 # just raises naturally and the exception is muted higher up in the
364 # stack?
365 log.info("Unable to handle key of type {}".format(key_type))
366 return None
368 def to_line(self):
369 """
370 Returns a string in OpenSSH known_hosts file format, or None if
371 the object is not in a valid state. A trailing newline is
372 included.
373 """
374 if self.valid:
375 return "{} {} {}\n".format(
376 ",".join(self.hostnames),
377 self.key.get_name(),
378 self.key.get_base64(),
379 )
380 return None
382 def __repr__(self):
383 return "<HostKeyEntry {!r}: {!r}>".format(self.hostnames, self.key)