Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/config.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com>
2# Copyright (C) 2012 Olle Lundberg <geek@nerd.sh>
3#
4# This file is part of paramiko.
5#
6# Paramiko is free software; you can redistribute it and/or modify it under the
7# terms of the GNU Lesser General Public License as published by the Free
8# Software Foundation; either version 2.1 of the License, or (at your option)
9# any later version.
10#
11# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
12# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14# details.
15#
16# You should have received a copy of the GNU Lesser General Public License
17# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20"""
21Configuration file (aka ``ssh_config``) support.
22"""
24import fnmatch
25import getpass
26import os
27import re
28import shlex
29import socket
30from functools import partial
31from hashlib import sha1
32from io import StringIO
34invoke, invoke_import_error = None, None
35try:
36 import invoke
37except ImportError as e:
38 invoke_import_error = e
40from .ssh_exception import ConfigParseError, CouldNotCanonicalize
42SSH_PORT = 22
45class SSHConfig:
46 """
47 Representation of config information as stored in the format used by
48 OpenSSH. Queries can be made via `lookup`. The format is described in
49 OpenSSH's ``ssh_config`` man page. This class is provided primarily as a
50 convenience to posix users (since the OpenSSH format is a de-facto
51 standard on posix) but should work fine on Windows too.
53 .. versionadded:: 1.6
54 """
56 SETTINGS_REGEX = re.compile(r"(\w+)(?:\s*=\s*|\s+)(.+)")
58 # TODO: do a full scan of ssh.c & friends to make sure we're fully
59 # compatible across the board, e.g. OpenSSH 8.1 added %n to ProxyCommand.
60 TOKENS_BY_CONFIG_KEY = {
61 "controlpath": ["%C", "%h", "%l", "%L", "%n", "%p", "%r", "%u"],
62 "hostname": ["%h"],
63 "identityfile": ["%C", "~", "%d", "%h", "%l", "%u", "%r"],
64 "proxycommand": ["~", "%h", "%p", "%r"],
65 "proxyjump": ["%h", "%p", "%r"],
66 # Doesn't seem worth making this 'special' for now, it will fit well
67 # enough (no actual match-exec config key to be confused with).
68 "match-exec": ["%C", "%d", "%h", "%L", "%l", "%n", "%p", "%r", "%u"],
69 }
71 def __init__(self):
72 """
73 Create a new OpenSSH config object.
75 Note: the newer alternate constructors `from_path`, `from_file` and
76 `from_text` are simpler to use, as they parse on instantiation. For
77 example, instead of::
79 config = SSHConfig()
80 config.parse(open("some-path.config")
82 you could::
84 config = SSHConfig.from_file(open("some-path.config"))
85 # Or more directly:
86 config = SSHConfig.from_path("some-path.config")
87 # Or if you have arbitrary ssh_config text from some other source:
88 config = SSHConfig.from_text("Host foo\\n\\tUser bar")
89 """
90 self._config = []
92 @classmethod
93 def from_text(cls, text):
94 """
95 Create a new, parsed `SSHConfig` from ``text`` string.
97 .. versionadded:: 2.7
98 """
99 return cls.from_file(StringIO(text))
101 @classmethod
102 def from_path(cls, path):
103 """
104 Create a new, parsed `SSHConfig` from the file found at ``path``.
106 .. versionadded:: 2.7
107 """
108 with open(path) as flo:
109 return cls.from_file(flo)
111 @classmethod
112 def from_file(cls, flo):
113 """
114 Create a new, parsed `SSHConfig` from file-like object ``flo``.
116 .. versionadded:: 2.7
117 """
118 obj = cls()
119 obj.parse(flo)
120 return obj
122 def parse(self, file_obj):
123 """
124 Read an OpenSSH config from the given file object.
126 :param file_obj: a file-like object to read the config file from
127 """
128 # Start out w/ implicit/anonymous global host-like block to hold
129 # anything not contained by an explicit one.
130 context = {"host": ["*"], "config": {}}
131 for line in file_obj:
132 # Strip any leading or trailing whitespace from the line.
133 # Refer to https://github.com/paramiko/paramiko/issues/499
134 line = line.strip()
135 # Skip blanks, comments
136 if not line or line.startswith("#"):
137 continue
139 # Parse line into key, value
140 match = re.match(self.SETTINGS_REGEX, line)
141 if not match:
142 raise ConfigParseError("Unparsable line {}".format(line))
143 key = match.group(1).lower()
144 value = match.group(2)
146 # Host keyword triggers switch to new block/context
147 if key in ("host", "match"):
148 self._config.append(context)
149 context = {"config": {}}
150 if key == "host":
151 # TODO (backwards incompat): make these real objects or at
152 # least name this "hosts" to acknowledge it's an iterable.
153 # (Doing so prior to 3.0, despite it being a private API,
154 # feels bad - surely such an old codebase has folks
155 # actually relying on these keys.)
156 context["host"] = self._get_hosts(value)
157 else:
158 context["matches"] = self._get_matches(value)
159 # Special-case for noop ProxyCommands
160 elif key == "proxycommand" and value.lower() == "none":
161 # Store 'none' as None - not as a string implying that the
162 # proxycommand is the literal shell command "none"!
163 context["config"][key] = None
164 # All other keywords get stored, directly or via append
165 else:
166 if value.startswith('"') and value.endswith('"'):
167 value = value[1:-1]
169 # identityfile, localforward, remoteforward keys are special
170 # cases, since they are allowed to be specified multiple times
171 # and they should be tried in order of specification.
172 if key in ["identityfile", "localforward", "remoteforward"]:
173 if key in context["config"]:
174 context["config"][key].append(value)
175 else:
176 context["config"][key] = [value]
177 elif key not in context["config"]:
178 context["config"][key] = value
179 # Store last 'open' block and we're done
180 self._config.append(context)
182 def lookup(self, hostname):
183 """
184 Return a dict (`SSHConfigDict`) of config options for a given hostname.
186 The host-matching rules of OpenSSH's ``ssh_config`` man page are used:
187 For each parameter, the first obtained value will be used. The
188 configuration files contain sections separated by ``Host`` and/or
189 ``Match`` specifications, and that section is only applied for hosts
190 which match the given patterns or keywords
192 Since the first obtained value for each parameter is used, more host-
193 specific declarations should be given near the beginning of the file,
194 and general defaults at the end.
196 The keys in the returned dict are all normalized to lowercase (look for
197 ``"port"``, not ``"Port"``. The values are processed according to the
198 rules for substitution variable expansion in ``ssh_config``.
200 Finally, please see the docs for `SSHConfigDict` for deeper info on
201 features such as optional type conversion methods, e.g.::
203 conf = my_config.lookup('myhost')
204 assert conf['passwordauthentication'] == 'yes'
205 assert conf.as_bool('passwordauthentication') is True
207 .. note::
208 If there is no explicitly configured ``HostName`` value, it will be
209 set to the being-looked-up hostname, which is as close as we can
210 get to OpenSSH's behavior around that particular option.
212 :param str hostname: the hostname to lookup
214 .. versionchanged:: 2.5
215 Returns `SSHConfigDict` objects instead of dict literals.
216 .. versionchanged:: 2.7
217 Added canonicalization support.
218 .. versionchanged:: 2.7
219 Added ``Match`` support.
220 .. versionchanged:: 3.3
221 Added ``Match final`` support.
222 """
223 # First pass
224 options = self._lookup(hostname=hostname)
225 # Inject HostName if it was not set (this used to be done incidentally
226 # during tokenization, for some reason).
227 if "hostname" not in options:
228 options["hostname"] = hostname
229 # Handle canonicalization
230 canon = options.get("canonicalizehostname", None) in ("yes", "always")
231 maxdots = int(options.get("canonicalizemaxdots", 1))
232 if canon and hostname.count(".") <= maxdots:
233 # NOTE: OpenSSH manpage does not explicitly state this, but its
234 # implementation for CanonicalDomains is 'split on any whitespace'.
235 domains = options["canonicaldomains"].split()
236 hostname = self.canonicalize(hostname, options, domains)
237 # Overwrite HostName again here (this is also what OpenSSH does)
238 options["hostname"] = hostname
239 options = self._lookup(
240 hostname, options, canonical=True, final=True
241 )
242 else:
243 options = self._lookup(
244 hostname, options, canonical=False, final=True
245 )
246 return options
248 def _lookup(self, hostname, options=None, canonical=False, final=False):
249 # Init
250 if options is None:
251 options = SSHConfigDict()
252 # Iterate all stanzas, applying any that match, in turn (so that things
253 # like Match can reference currently understood state)
254 for context in self._config:
255 if not (
256 self._pattern_matches(context.get("host", []), hostname)
257 or self._does_match(
258 context.get("matches", []),
259 hostname,
260 canonical,
261 final,
262 options,
263 )
264 ):
265 continue
266 for key, value in context["config"].items():
267 if key not in options:
268 # Create a copy of the original value,
269 # else it will reference the original list
270 # in self._config and update that value too
271 # when the extend() is being called.
272 options[key] = value[:] if value is not None else value
273 elif key == "identityfile":
274 options[key].extend(
275 x for x in value if x not in options[key]
276 )
277 if final:
278 # Expand variables in resulting values
279 # (besides 'Match exec' which was already handled above)
280 options = self._expand_variables(options, hostname)
281 return options
283 def canonicalize(self, hostname, options, domains):
284 """
285 Return canonicalized version of ``hostname``.
287 :param str hostname: Target hostname.
288 :param options: An `SSHConfigDict` from a previous lookup pass.
289 :param domains: List of domains (e.g. ``["paramiko.org"]``).
291 :returns: A canonicalized hostname if one was found, else ``None``.
293 .. versionadded:: 2.7
294 """
295 found = False
296 for domain in domains:
297 candidate = "{}.{}".format(hostname, domain)
298 family_specific = _addressfamily_host_lookup(candidate, options)
299 if family_specific is not None:
300 # TODO: would we want to dig deeper into other results? e.g. to
301 # find something that satisfies PermittedCNAMEs when that is
302 # implemented?
303 found = family_specific[0]
304 else:
305 # TODO: what does ssh use here and is there a reason to use
306 # that instead of gethostbyname?
307 try:
308 found = socket.gethostbyname(candidate)
309 except socket.gaierror:
310 pass
311 if found:
312 # TODO: follow CNAME (implied by found != candidate?) if
313 # CanonicalizePermittedCNAMEs allows it
314 return candidate
315 # If we got here, it means canonicalization failed.
316 # When CanonicalizeFallbackLocal is undefined or 'yes', we just spit
317 # back the original hostname.
318 if options.get("canonicalizefallbacklocal", "yes") == "yes":
319 return hostname
320 # And here, we failed AND fallback was set to a non-yes value, so we
321 # need to get mad.
322 raise CouldNotCanonicalize(hostname)
324 def get_hostnames(self):
325 """
326 Return the set of literal hostnames defined in the SSH config (both
327 explicit hostnames and wildcard entries).
328 """
329 hosts = set()
330 for entry in self._config:
331 hosts.update(entry["host"])
332 return hosts
334 def _pattern_matches(self, patterns, target):
335 # Convenience auto-splitter if not already a list
336 if hasattr(patterns, "split"):
337 patterns = patterns.split(",")
338 match = False
339 for pattern in patterns:
340 # Short-circuit if target matches a negated pattern
341 if pattern.startswith("!") and fnmatch.fnmatch(
342 target, pattern[1:]
343 ):
344 return False
345 # Flag a match, but continue (in case of later negation) if regular
346 # match occurs
347 elif fnmatch.fnmatch(target, pattern):
348 match = True
349 return match
351 def _does_match(
352 self, match_list, target_hostname, canonical, final, options
353 ):
354 matched = []
355 candidates = match_list[:]
356 local_username = getpass.getuser()
357 while candidates:
358 candidate = candidates.pop(0)
359 passed = None
360 # Obtain latest host/user value every loop, so later Match may
361 # reference values assigned within a prior Match.
362 configured_host = options.get("hostname", None)
363 configured_user = options.get("user", None)
364 type_, param = candidate["type"], candidate["param"]
365 # Canonical is a hard pass/fail based on whether this is a
366 # canonicalized re-lookup.
367 if type_ == "canonical":
368 if self._should_fail(canonical, candidate):
369 return False
370 if type_ == "final":
371 passed = final
372 # The parse step ensures we only see this by itself or after
373 # canonical, so it's also an easy hard pass. (No negation here as
374 # that would be uh, pretty weird?)
375 elif type_ == "all":
376 return True
377 # From here, we are testing various non-hard criteria,
378 # short-circuiting only on fail
379 elif type_ == "host":
380 hostval = configured_host or target_hostname
381 passed = self._pattern_matches(param, hostval)
382 elif type_ == "originalhost":
383 passed = self._pattern_matches(param, target_hostname)
384 elif type_ == "user":
385 user = configured_user or local_username
386 passed = self._pattern_matches(param, user)
387 elif type_ == "localuser":
388 passed = self._pattern_matches(param, local_username)
389 elif type_ == "exec":
390 exec_cmd = self._tokenize(
391 options, target_hostname, "match-exec", param
392 )
393 # This is the laziest spot in which we can get mad about an
394 # inability to import Invoke.
395 if invoke is None:
396 raise invoke_import_error
397 # Like OpenSSH, we 'redirect' stdout but let stderr bubble up
398 passed = invoke.run(exec_cmd, hide="stdout", warn=True).ok
399 # Tackle any 'passed, but was negated' results from above
400 if passed is not None and self._should_fail(passed, candidate):
401 return False
402 # Made it all the way here? Everything matched!
403 matched.append(candidate)
404 # Did anything match? (To be treated as bool, usually.)
405 return matched
407 def _should_fail(self, would_pass, candidate):
408 return would_pass if candidate["negate"] else not would_pass
410 def _tokenize(self, config, target_hostname, key, value):
411 """
412 Tokenize a string based on current config/hostname data.
414 :param config: Current config data.
415 :param target_hostname: Original target connection hostname.
416 :param key: Config key being tokenized (used to filter token list).
417 :param value: Config value being tokenized.
419 :returns: The tokenized version of the input ``value`` string.
420 """
421 allowed_tokens = self._allowed_tokens(key)
422 # Short-circuit if no tokenization possible
423 if not allowed_tokens:
424 return value
425 # Obtain potentially configured hostname, for use with %h.
426 # Special-case where we are tokenizing the hostname itself, to avoid
427 # replacing %h with a %h-bearing value, etc.
428 configured_hostname = target_hostname
429 if key != "hostname":
430 configured_hostname = config.get("hostname", configured_hostname)
431 # Ditto the rest of the source values
432 if "port" in config:
433 port = config["port"]
434 else:
435 port = SSH_PORT
436 user = getpass.getuser()
437 if "user" in config:
438 remoteuser = config["user"]
439 else:
440 remoteuser = user
441 local_hostname = socket.gethostname().split(".")[0]
442 local_fqdn = LazyFqdn(config, local_hostname)
443 homedir = os.path.expanduser("~")
444 tohash = local_hostname + target_hostname + repr(port) + remoteuser
445 # The actual tokens!
446 replacements = {
447 # TODO: %%???
448 # TODO: sha1 bad / this is offspec from rfc/openssh
449 "%C": sha1(tohash.encode()).hexdigest(),
450 "%d": homedir,
451 "%h": configured_hostname,
452 # TODO: %i?
453 "%L": local_hostname,
454 "%l": local_fqdn,
455 # also this is pseudo buggy when not in Match exec mode so document
456 # that. also WHY is that the case?? don't we do all of this late?
457 "%n": target_hostname,
458 "%p": port,
459 "%r": remoteuser,
460 # TODO: %T? don't believe this is possible however
461 "%u": user,
462 "~": homedir,
463 }
464 # Do the thing with the stuff
465 tokenized = value
466 for find, replace in replacements.items():
467 if find not in allowed_tokens:
468 continue
469 tokenized = tokenized.replace(find, str(replace))
470 # TODO: log? eg that value -> tokenized
471 return tokenized
473 def _allowed_tokens(self, key):
474 """
475 Given config ``key``, return list of token strings to tokenize.
477 .. note::
478 This feels like it wants to eventually go away, but is used to
479 preserve as-strict-as-possible compatibility with OpenSSH, which
480 for whatever reason only applies some tokens to some config keys.
481 """
482 return self.TOKENS_BY_CONFIG_KEY.get(key, [])
484 def _expand_variables(self, config, target_hostname):
485 """
486 Return a dict of config options with expanded substitutions
487 for a given original & current target hostname.
489 Please refer to :doc:`/api/config` for details.
491 :param dict config: the currently parsed config
492 :param str hostname: the hostname whose config is being looked up
493 """
494 for k in config:
495 if config[k] is None:
496 continue
497 tokenizer = partial(self._tokenize, config, target_hostname, k)
498 if isinstance(config[k], list):
499 for i, value in enumerate(config[k]):
500 config[k][i] = tokenizer(value)
501 else:
502 config[k] = tokenizer(config[k])
503 return config
505 def _get_hosts(self, host):
506 """
507 Return a list of host_names from host value.
508 """
509 try:
510 return shlex.split(host)
511 except ValueError:
512 raise ConfigParseError("Unparsable host {}".format(host))
514 def _get_matches(self, match):
515 """
516 Parse a specific Match config line into a list-of-dicts for its values.
518 Performs some parse-time validation as well.
519 """
520 matches = []
521 tokens = shlex.split(match)
522 while tokens:
523 match = {"type": None, "param": None, "negate": False}
524 type_ = tokens.pop(0)
525 # Handle per-keyword negation
526 if type_.startswith("!"):
527 match["negate"] = True
528 type_ = type_[1:]
529 match["type"] = type_
530 # all/canonical have no params (everything else does)
531 if type_ in ("all", "canonical", "final"):
532 matches.append(match)
533 continue
534 if not tokens:
535 raise ConfigParseError(
536 "Missing parameter to Match '{}' keyword".format(type_)
537 )
538 match["param"] = tokens.pop(0)
539 matches.append(match)
540 # Perform some (easier to do now than in the middle) validation that is
541 # better handled here than at lookup time.
542 keywords = [x["type"] for x in matches]
543 if "all" in keywords:
544 allowable = ("all", "canonical")
545 ok, bad = (
546 list(filter(lambda x: x in allowable, keywords)),
547 list(filter(lambda x: x not in allowable, keywords)),
548 )
549 err = None
550 if any(bad):
551 err = "Match does not allow 'all' mixed with anything but 'canonical'" # noqa
552 elif "canonical" in ok and ok.index("canonical") > ok.index("all"):
553 err = "Match does not allow 'all' before 'canonical'"
554 if err is not None:
555 raise ConfigParseError(err)
556 return matches
559def _addressfamily_host_lookup(hostname, options):
560 """
561 Try looking up ``hostname`` in an IPv4 or IPv6 specific manner.
563 This is an odd duck due to needing use in two divergent use cases. It looks
564 up ``AddressFamily`` in ``options`` and if it is ``inet`` or ``inet6``,
565 this function uses `socket.getaddrinfo` to perform a family-specific
566 lookup, returning the result if successful.
568 In any other situation -- lookup failure, or ``AddressFamily`` being
569 unspecified or ``any`` -- ``None`` is returned instead and the caller is
570 expected to do something situation-appropriate like calling
571 `socket.gethostbyname`.
573 :param str hostname: Hostname to look up.
574 :param options: `SSHConfigDict` instance w/ parsed options.
575 :returns: ``getaddrinfo``-style tuples, or ``None``, depending.
576 """
577 address_family = options.get("addressfamily", "any").lower()
578 if address_family == "any":
579 return
580 try:
581 family = socket.AF_INET6
582 if address_family == "inet":
583 family = socket.AF_INET
584 return socket.getaddrinfo(
585 hostname,
586 None,
587 family,
588 socket.SOCK_DGRAM,
589 socket.IPPROTO_IP,
590 socket.AI_CANONNAME,
591 )
592 except socket.gaierror:
593 pass
596class LazyFqdn:
597 """
598 Returns the host's fqdn on request as string.
599 """
601 def __init__(self, config, host=None):
602 self.fqdn = None
603 self.config = config
604 self.host = host
606 def __str__(self):
607 if self.fqdn is None:
608 #
609 # If the SSH config contains AddressFamily, use that when
610 # determining the local host's FQDN. Using socket.getfqdn() from
611 # the standard library is the most general solution, but can
612 # result in noticeable delays on some platforms when IPv6 is
613 # misconfigured or not available, as it calls getaddrinfo with no
614 # address family specified, so both IPv4 and IPv6 are checked.
615 #
617 # Handle specific option
618 fqdn = None
619 results = _addressfamily_host_lookup(self.host, self.config)
620 if results is not None:
621 for res in results:
622 af, socktype, proto, canonname, sa = res
623 if canonname and "." in canonname:
624 fqdn = canonname
625 break
626 # Handle 'any' / unspecified / lookup failure
627 if fqdn is None:
628 fqdn = socket.getfqdn()
629 # Cache
630 self.fqdn = fqdn
631 return self.fqdn
634class SSHConfigDict(dict):
635 """
636 A dictionary wrapper/subclass for per-host configuration structures.
638 This class introduces some usage niceties for consumers of `SSHConfig`,
639 specifically around the issue of variable type conversions: normal value
640 access yields strings, but there are now methods such as `as_bool` and
641 `as_int` that yield casted values instead.
643 For example, given the following ``ssh_config`` file snippet::
645 Host foo.example.com
646 PasswordAuthentication no
647 Compression yes
648 ServerAliveInterval 60
650 the following code highlights how you can access the raw strings as well as
651 usefully Python type-casted versions (recalling that keys are all
652 normalized to lowercase first)::
654 my_config = SSHConfig()
655 my_config.parse(open('~/.ssh/config'))
656 conf = my_config.lookup('foo.example.com')
658 assert conf['passwordauthentication'] == 'no'
659 assert conf.as_bool('passwordauthentication') is False
660 assert conf['compression'] == 'yes'
661 assert conf.as_bool('compression') is True
662 assert conf['serveraliveinterval'] == '60'
663 assert conf.as_int('serveraliveinterval') == 60
665 .. versionadded:: 2.5
666 """
668 def as_bool(self, key):
669 """
670 Express given key's value as a boolean type.
672 Typically, this is used for ``ssh_config``'s pseudo-boolean values
673 which are either ``"yes"`` or ``"no"``. In such cases, ``"yes"`` yields
674 ``True`` and any other value becomes ``False``.
676 .. note::
677 If (for whatever reason) the stored value is already boolean in
678 nature, it's simply returned.
680 .. versionadded:: 2.5
681 """
682 val = self[key]
683 if isinstance(val, bool):
684 return val
685 return val.lower() == "yes"
687 def as_int(self, key):
688 """
689 Express given key's value as an integer, if possible.
691 This method will raise ``ValueError`` or similar if the value is not
692 int-appropriate, same as the builtin `int` type.
694 .. versionadded:: 2.5
695 """
696 return int(self[key])