Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/config.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com>
2# Copyright (C) 2012 Olle Lundberg <geek@nerd.sh>
3#
4# This file is part of paramiko.
5#
6# Paramiko is free software; you can redistribute it and/or modify it under the
7# terms of the GNU Lesser General Public License as published by the Free
8# Software Foundation; either version 2.1 of the License, or (at your option)
9# any later version.
10#
11# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
12# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14# details.
15#
16# You should have received a copy of the GNU Lesser General Public License
17# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20"""
21Configuration file (aka ``ssh_config``) support.
22"""
24import fnmatch
25import getpass
26import os
27import re
28import shlex
29import socket
30from hashlib import sha1
31from io import StringIO
32from functools import partial
34invoke, invoke_import_error = None, None
35try:
36 import invoke
37except ImportError as e:
38 invoke_import_error = e
40from .ssh_exception import CouldNotCanonicalize, ConfigParseError
43SSH_PORT = 22
46class SSHConfig:
47 """
48 Representation of config information as stored in the format used by
49 OpenSSH. Queries can be made via `lookup`. The format is described in
50 OpenSSH's ``ssh_config`` man page. This class is provided primarily as a
51 convenience to posix users (since the OpenSSH format is a de-facto
52 standard on posix) but should work fine on Windows too.
54 .. versionadded:: 1.6
55 """
57 SETTINGS_REGEX = re.compile(r"(\w+)(?:\s*=\s*|\s+)(.+)")
59 # TODO: do a full scan of ssh.c & friends to make sure we're fully
60 # compatible across the board, e.g. OpenSSH 8.1 added %n to ProxyCommand.
61 TOKENS_BY_CONFIG_KEY = {
62 "controlpath": ["%C", "%h", "%l", "%L", "%n", "%p", "%r", "%u"],
63 "hostname": ["%h"],
64 "identityfile": ["%C", "~", "%d", "%h", "%l", "%u", "%r"],
65 "proxycommand": ["~", "%h", "%p", "%r"],
66 "proxyjump": ["%h", "%p", "%r"],
67 # Doesn't seem worth making this 'special' for now, it will fit well
68 # enough (no actual match-exec config key to be confused with).
69 "match-exec": ["%C", "%d", "%h", "%L", "%l", "%n", "%p", "%r", "%u"],
70 }
72 def __init__(self):
73 """
74 Create a new OpenSSH config object.
76 Note: the newer alternate constructors `from_path`, `from_file` and
77 `from_text` are simpler to use, as they parse on instantiation. For
78 example, instead of::
80 config = SSHConfig()
81 config.parse(open("some-path.config")
83 you could::
85 config = SSHConfig.from_file(open("some-path.config"))
86 # Or more directly:
87 config = SSHConfig.from_path("some-path.config")
88 # Or if you have arbitrary ssh_config text from some other source:
89 config = SSHConfig.from_text("Host foo\\n\\tUser bar")
90 """
91 self._config = []
93 @classmethod
94 def from_text(cls, text):
95 """
96 Create a new, parsed `SSHConfig` from ``text`` string.
98 .. versionadded:: 2.7
99 """
100 return cls.from_file(StringIO(text))
102 @classmethod
103 def from_path(cls, path):
104 """
105 Create a new, parsed `SSHConfig` from the file found at ``path``.
107 .. versionadded:: 2.7
108 """
109 with open(path) as flo:
110 return cls.from_file(flo)
112 @classmethod
113 def from_file(cls, flo):
114 """
115 Create a new, parsed `SSHConfig` from file-like object ``flo``.
117 .. versionadded:: 2.7
118 """
119 obj = cls()
120 obj.parse(flo)
121 return obj
123 def parse(self, file_obj):
124 """
125 Read an OpenSSH config from the given file object.
127 :param file_obj: a file-like object to read the config file from
128 """
129 # Start out w/ implicit/anonymous global host-like block to hold
130 # anything not contained by an explicit one.
131 context = {"host": ["*"], "config": {}}
132 for line in file_obj:
133 # Strip any leading or trailing whitespace from the line.
134 # Refer to https://github.com/paramiko/paramiko/issues/499
135 line = line.strip()
136 # Skip blanks, comments
137 if not line or line.startswith("#"):
138 continue
140 # Parse line into key, value
141 match = re.match(self.SETTINGS_REGEX, line)
142 if not match:
143 raise ConfigParseError("Unparsable line {}".format(line))
144 key = match.group(1).lower()
145 value = match.group(2)
147 # Host keyword triggers switch to new block/context
148 if key in ("host", "match"):
149 self._config.append(context)
150 context = {"config": {}}
151 if key == "host":
152 # TODO 4.0: make these real objects or at least name this
153 # "hosts" to acknowledge it's an iterable. (Doing so prior
154 # to 3.0, despite it being a private API, feels bad -
155 # surely such an old codebase has folks actually relying on
156 # these keys.)
157 context["host"] = self._get_hosts(value)
158 else:
159 context["matches"] = self._get_matches(value)
160 # Special-case for noop ProxyCommands
161 elif key == "proxycommand" and value.lower() == "none":
162 # Store 'none' as None - not as a string implying that the
163 # proxycommand is the literal shell command "none"!
164 context["config"][key] = None
165 # All other keywords get stored, directly or via append
166 else:
167 if value.startswith('"') and value.endswith('"'):
168 value = value[1:-1]
170 # identityfile, localforward, remoteforward keys are special
171 # cases, since they are allowed to be specified multiple times
172 # and they should be tried in order of specification.
173 if key in ["identityfile", "localforward", "remoteforward"]:
174 if key in context["config"]:
175 context["config"][key].append(value)
176 else:
177 context["config"][key] = [value]
178 elif key not in context["config"]:
179 context["config"][key] = value
180 # Store last 'open' block and we're done
181 self._config.append(context)
183 def lookup(self, hostname):
184 """
185 Return a dict (`SSHConfigDict`) of config options for a given hostname.
187 The host-matching rules of OpenSSH's ``ssh_config`` man page are used:
188 For each parameter, the first obtained value will be used. The
189 configuration files contain sections separated by ``Host`` and/or
190 ``Match`` specifications, and that section is only applied for hosts
191 which match the given patterns or keywords
193 Since the first obtained value for each parameter is used, more host-
194 specific declarations should be given near the beginning of the file,
195 and general defaults at the end.
197 The keys in the returned dict are all normalized to lowercase (look for
198 ``"port"``, not ``"Port"``. The values are processed according to the
199 rules for substitution variable expansion in ``ssh_config``.
201 Finally, please see the docs for `SSHConfigDict` for deeper info on
202 features such as optional type conversion methods, e.g.::
204 conf = my_config.lookup('myhost')
205 assert conf['passwordauthentication'] == 'yes'
206 assert conf.as_bool('passwordauthentication') is True
208 .. note::
209 If there is no explicitly configured ``HostName`` value, it will be
210 set to the being-looked-up hostname, which is as close as we can
211 get to OpenSSH's behavior around that particular option.
213 :param str hostname: the hostname to lookup
215 .. versionchanged:: 2.5
216 Returns `SSHConfigDict` objects instead of dict literals.
217 .. versionchanged:: 2.7
218 Added canonicalization support.
219 .. versionchanged:: 2.7
220 Added ``Match`` support.
221 .. versionchanged:: 3.3
222 Added ``Match final`` support.
223 """
224 # First pass
225 options = self._lookup(hostname=hostname)
226 # Inject HostName if it was not set (this used to be done incidentally
227 # during tokenization, for some reason).
228 if "hostname" not in options:
229 options["hostname"] = hostname
230 # Handle canonicalization
231 canon = options.get("canonicalizehostname", None) in ("yes", "always")
232 maxdots = int(options.get("canonicalizemaxdots", 1))
233 if canon and hostname.count(".") <= maxdots:
234 # NOTE: OpenSSH manpage does not explicitly state this, but its
235 # implementation for CanonicalDomains is 'split on any whitespace'.
236 domains = options["canonicaldomains"].split()
237 hostname = self.canonicalize(hostname, options, domains)
238 # Overwrite HostName again here (this is also what OpenSSH does)
239 options["hostname"] = hostname
240 options = self._lookup(
241 hostname, options, canonical=True, final=True
242 )
243 else:
244 options = self._lookup(
245 hostname, options, canonical=False, final=True
246 )
247 return options
249 def _lookup(self, hostname, options=None, canonical=False, final=False):
250 # Init
251 if options is None:
252 options = SSHConfigDict()
253 # Iterate all stanzas, applying any that match, in turn (so that things
254 # like Match can reference currently understood state)
255 for context in self._config:
256 if not (
257 self._pattern_matches(context.get("host", []), hostname)
258 or self._does_match(
259 context.get("matches", []),
260 hostname,
261 canonical,
262 final,
263 options,
264 )
265 ):
266 continue
267 for key, value in context["config"].items():
268 if key not in options:
269 # Create a copy of the original value,
270 # else it will reference the original list
271 # in self._config and update that value too
272 # when the extend() is being called.
273 options[key] = value[:] if value is not None else value
274 elif key == "identityfile":
275 options[key].extend(
276 x for x in value if x not in options[key]
277 )
278 if final:
279 # Expand variables in resulting values
280 # (besides 'Match exec' which was already handled above)
281 options = self._expand_variables(options, hostname)
282 return options
284 def canonicalize(self, hostname, options, domains):
285 """
286 Return canonicalized version of ``hostname``.
288 :param str hostname: Target hostname.
289 :param options: An `SSHConfigDict` from a previous lookup pass.
290 :param domains: List of domains (e.g. ``["paramiko.org"]``).
292 :returns: A canonicalized hostname if one was found, else ``None``.
294 .. versionadded:: 2.7
295 """
296 found = False
297 for domain in domains:
298 candidate = "{}.{}".format(hostname, domain)
299 family_specific = _addressfamily_host_lookup(candidate, options)
300 if family_specific is not None:
301 # TODO: would we want to dig deeper into other results? e.g. to
302 # find something that satisfies PermittedCNAMEs when that is
303 # implemented?
304 found = family_specific[0]
305 else:
306 # TODO: what does ssh use here and is there a reason to use
307 # that instead of gethostbyname?
308 try:
309 found = socket.gethostbyname(candidate)
310 except socket.gaierror:
311 pass
312 if found:
313 # TODO: follow CNAME (implied by found != candidate?) if
314 # CanonicalizePermittedCNAMEs allows it
315 return candidate
316 # If we got here, it means canonicalization failed.
317 # When CanonicalizeFallbackLocal is undefined or 'yes', we just spit
318 # back the original hostname.
319 if options.get("canonicalizefallbacklocal", "yes") == "yes":
320 return hostname
321 # And here, we failed AND fallback was set to a non-yes value, so we
322 # need to get mad.
323 raise CouldNotCanonicalize(hostname)
325 def get_hostnames(self):
326 """
327 Return the set of literal hostnames defined in the SSH config (both
328 explicit hostnames and wildcard entries).
329 """
330 hosts = set()
331 for entry in self._config:
332 hosts.update(entry["host"])
333 return hosts
335 def _pattern_matches(self, patterns, target):
336 # Convenience auto-splitter if not already a list
337 if hasattr(patterns, "split"):
338 patterns = patterns.split(",")
339 match = False
340 for pattern in patterns:
341 # Short-circuit if target matches a negated pattern
342 if pattern.startswith("!") and fnmatch.fnmatch(
343 target, pattern[1:]
344 ):
345 return False
346 # Flag a match, but continue (in case of later negation) if regular
347 # match occurs
348 elif fnmatch.fnmatch(target, pattern):
349 match = True
350 return match
352 def _does_match(
353 self, match_list, target_hostname, canonical, final, options
354 ):
355 matched = []
356 candidates = match_list[:]
357 local_username = getpass.getuser()
358 while candidates:
359 candidate = candidates.pop(0)
360 passed = None
361 # Obtain latest host/user value every loop, so later Match may
362 # reference values assigned within a prior Match.
363 configured_host = options.get("hostname", None)
364 configured_user = options.get("user", None)
365 type_, param = candidate["type"], candidate["param"]
366 # Canonical is a hard pass/fail based on whether this is a
367 # canonicalized re-lookup.
368 if type_ == "canonical":
369 if self._should_fail(canonical, candidate):
370 return False
371 if type_ == "final":
372 passed = final
373 # The parse step ensures we only see this by itself or after
374 # canonical, so it's also an easy hard pass. (No negation here as
375 # that would be uh, pretty weird?)
376 elif type_ == "all":
377 return True
378 # From here, we are testing various non-hard criteria,
379 # short-circuiting only on fail
380 elif type_ == "host":
381 hostval = configured_host or target_hostname
382 passed = self._pattern_matches(param, hostval)
383 elif type_ == "originalhost":
384 passed = self._pattern_matches(param, target_hostname)
385 elif type_ == "user":
386 user = configured_user or local_username
387 passed = self._pattern_matches(param, user)
388 elif type_ == "localuser":
389 passed = self._pattern_matches(param, local_username)
390 elif type_ == "exec":
391 exec_cmd = self._tokenize(
392 options, target_hostname, "match-exec", param
393 )
394 # This is the laziest spot in which we can get mad about an
395 # inability to import Invoke.
396 if invoke is None:
397 raise invoke_import_error
398 # Like OpenSSH, we 'redirect' stdout but let stderr bubble up
399 passed = invoke.run(exec_cmd, hide="stdout", warn=True).ok
400 # Tackle any 'passed, but was negated' results from above
401 if passed is not None and self._should_fail(passed, candidate):
402 return False
403 # Made it all the way here? Everything matched!
404 matched.append(candidate)
405 # Did anything match? (To be treated as bool, usually.)
406 return matched
408 def _should_fail(self, would_pass, candidate):
409 return would_pass if candidate["negate"] else not would_pass
411 def _tokenize(self, config, target_hostname, key, value):
412 """
413 Tokenize a string based on current config/hostname data.
415 :param config: Current config data.
416 :param target_hostname: Original target connection hostname.
417 :param key: Config key being tokenized (used to filter token list).
418 :param value: Config value being tokenized.
420 :returns: The tokenized version of the input ``value`` string.
421 """
422 allowed_tokens = self._allowed_tokens(key)
423 # Short-circuit if no tokenization possible
424 if not allowed_tokens:
425 return value
426 # Obtain potentially configured hostname, for use with %h.
427 # Special-case where we are tokenizing the hostname itself, to avoid
428 # replacing %h with a %h-bearing value, etc.
429 configured_hostname = target_hostname
430 if key != "hostname":
431 configured_hostname = config.get("hostname", configured_hostname)
432 # Ditto the rest of the source values
433 if "port" in config:
434 port = config["port"]
435 else:
436 port = SSH_PORT
437 user = getpass.getuser()
438 if "user" in config:
439 remoteuser = config["user"]
440 else:
441 remoteuser = user
442 local_hostname = socket.gethostname().split(".")[0]
443 local_fqdn = LazyFqdn(config, local_hostname)
444 homedir = os.path.expanduser("~")
445 tohash = local_hostname + target_hostname + repr(port) + remoteuser
446 # The actual tokens!
447 replacements = {
448 # TODO: %%???
449 "%C": sha1(tohash.encode()).hexdigest(),
450 "%d": homedir,
451 "%h": configured_hostname,
452 # TODO: %i?
453 "%L": local_hostname,
454 "%l": local_fqdn,
455 # also this is pseudo buggy when not in Match exec mode so document
456 # that. also WHY is that the case?? don't we do all of this late?
457 "%n": target_hostname,
458 "%p": port,
459 "%r": remoteuser,
460 # TODO: %T? don't believe this is possible however
461 "%u": user,
462 "~": homedir,
463 }
464 # Do the thing with the stuff
465 tokenized = value
466 for find, replace in replacements.items():
467 if find not in allowed_tokens:
468 continue
469 tokenized = tokenized.replace(find, str(replace))
470 # TODO: log? eg that value -> tokenized
471 return tokenized
473 def _allowed_tokens(self, key):
474 """
475 Given config ``key``, return list of token strings to tokenize.
477 .. note::
478 This feels like it wants to eventually go away, but is used to
479 preserve as-strict-as-possible compatibility with OpenSSH, which
480 for whatever reason only applies some tokens to some config keys.
481 """
482 return self.TOKENS_BY_CONFIG_KEY.get(key, [])
484 def _expand_variables(self, config, target_hostname):
485 """
486 Return a dict of config options with expanded substitutions
487 for a given original & current target hostname.
489 Please refer to :doc:`/api/config` for details.
491 :param dict config: the currently parsed config
492 :param str hostname: the hostname whose config is being looked up
493 """
494 for k in config:
495 if config[k] is None:
496 continue
497 tokenizer = partial(self._tokenize, config, target_hostname, k)
498 if isinstance(config[k], list):
499 for i, value in enumerate(config[k]):
500 config[k][i] = tokenizer(value)
501 else:
502 config[k] = tokenizer(config[k])
503 return config
505 def _get_hosts(self, host):
506 """
507 Return a list of host_names from host value.
508 """
509 try:
510 return shlex.split(host)
511 except ValueError:
512 raise ConfigParseError("Unparsable host {}".format(host))
514 def _get_matches(self, match):
515 """
516 Parse a specific Match config line into a list-of-dicts for its values.
518 Performs some parse-time validation as well.
519 """
520 matches = []
521 tokens = shlex.split(match)
522 while tokens:
523 match = {"type": None, "param": None, "negate": False}
524 type_ = tokens.pop(0)
525 # Handle per-keyword negation
526 if type_.startswith("!"):
527 match["negate"] = True
528 type_ = type_[1:]
529 match["type"] = type_
530 # all/canonical have no params (everything else does)
531 if type_ in ("all", "canonical", "final"):
532 matches.append(match)
533 continue
534 if not tokens:
535 raise ConfigParseError(
536 "Missing parameter to Match '{}' keyword".format(type_)
537 )
538 match["param"] = tokens.pop(0)
539 matches.append(match)
540 # Perform some (easier to do now than in the middle) validation that is
541 # better handled here than at lookup time.
542 keywords = [x["type"] for x in matches]
543 if "all" in keywords:
544 allowable = ("all", "canonical")
545 ok, bad = (
546 list(filter(lambda x: x in allowable, keywords)),
547 list(filter(lambda x: x not in allowable, keywords)),
548 )
549 err = None
550 if any(bad):
551 err = "Match does not allow 'all' mixed with anything but 'canonical'" # noqa
552 elif "canonical" in ok and ok.index("canonical") > ok.index("all"):
553 err = "Match does not allow 'all' before 'canonical'"
554 if err is not None:
555 raise ConfigParseError(err)
556 return matches
559def _addressfamily_host_lookup(hostname, options):
560 """
561 Try looking up ``hostname`` in an IPv4 or IPv6 specific manner.
563 This is an odd duck due to needing use in two divergent use cases. It looks
564 up ``AddressFamily`` in ``options`` and if it is ``inet`` or ``inet6``,
565 this function uses `socket.getaddrinfo` to perform a family-specific
566 lookup, returning the result if successful.
568 In any other situation -- lookup failure, or ``AddressFamily`` being
569 unspecified or ``any`` -- ``None`` is returned instead and the caller is
570 expected to do something situation-appropriate like calling
571 `socket.gethostbyname`.
573 :param str hostname: Hostname to look up.
574 :param options: `SSHConfigDict` instance w/ parsed options.
575 :returns: ``getaddrinfo``-style tuples, or ``None``, depending.
576 """
577 address_family = options.get("addressfamily", "any").lower()
578 if address_family == "any":
579 return
580 try:
581 family = socket.AF_INET6
582 if address_family == "inet":
583 family = socket.AF_INET
584 return socket.getaddrinfo(
585 hostname,
586 None,
587 family,
588 socket.SOCK_DGRAM,
589 socket.IPPROTO_IP,
590 socket.AI_CANONNAME,
591 )
592 except socket.gaierror:
593 pass
596class LazyFqdn:
597 """
598 Returns the host's fqdn on request as string.
599 """
601 def __init__(self, config, host=None):
602 self.fqdn = None
603 self.config = config
604 self.host = host
606 def __str__(self):
607 if self.fqdn is None:
608 #
609 # If the SSH config contains AddressFamily, use that when
610 # determining the local host's FQDN. Using socket.getfqdn() from
611 # the standard library is the most general solution, but can
612 # result in noticeable delays on some platforms when IPv6 is
613 # misconfigured or not available, as it calls getaddrinfo with no
614 # address family specified, so both IPv4 and IPv6 are checked.
615 #
617 # Handle specific option
618 fqdn = None
619 results = _addressfamily_host_lookup(self.host, self.config)
620 if results is not None:
621 for res in results:
622 af, socktype, proto, canonname, sa = res
623 if canonname and "." in canonname:
624 fqdn = canonname
625 break
626 # Handle 'any' / unspecified / lookup failure
627 if fqdn is None:
628 fqdn = socket.getfqdn()
629 # Cache
630 self.fqdn = fqdn
631 return self.fqdn
634class SSHConfigDict(dict):
635 """
636 A dictionary wrapper/subclass for per-host configuration structures.
638 This class introduces some usage niceties for consumers of `SSHConfig`,
639 specifically around the issue of variable type conversions: normal value
640 access yields strings, but there are now methods such as `as_bool` and
641 `as_int` that yield casted values instead.
643 For example, given the following ``ssh_config`` file snippet::
645 Host foo.example.com
646 PasswordAuthentication no
647 Compression yes
648 ServerAliveInterval 60
650 the following code highlights how you can access the raw strings as well as
651 usefully Python type-casted versions (recalling that keys are all
652 normalized to lowercase first)::
654 my_config = SSHConfig()
655 my_config.parse(open('~/.ssh/config'))
656 conf = my_config.lookup('foo.example.com')
658 assert conf['passwordauthentication'] == 'no'
659 assert conf.as_bool('passwordauthentication') is False
660 assert conf['compression'] == 'yes'
661 assert conf.as_bool('compression') is True
662 assert conf['serveraliveinterval'] == '60'
663 assert conf.as_int('serveraliveinterval') == 60
665 .. versionadded:: 2.5
666 """
668 def as_bool(self, key):
669 """
670 Express given key's value as a boolean type.
672 Typically, this is used for ``ssh_config``'s pseudo-boolean values
673 which are either ``"yes"`` or ``"no"``. In such cases, ``"yes"`` yields
674 ``True`` and any other value becomes ``False``.
676 .. note::
677 If (for whatever reason) the stored value is already boolean in
678 nature, it's simply returned.
680 .. versionadded:: 2.5
681 """
682 val = self[key]
683 if isinstance(val, bool):
684 return val
685 return val.lower() == "yes"
687 def as_int(self, key):
688 """
689 Express given key's value as an integer, if possible.
691 This method will raise ``ValueError`` or similar if the value is not
692 int-appropriate, same as the builtin `int` type.
694 .. versionadded:: 2.5
695 """
696 return int(self[key])