Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/config.py: 18%
263 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com>
2# Copyright (C) 2012 Olle Lundberg <geek@nerd.sh>
3#
4# This file is part of paramiko.
5#
6# Paramiko is free software; you can redistribute it and/or modify it under the
7# terms of the GNU Lesser General Public License as published by the Free
8# Software Foundation; either version 2.1 of the License, or (at your option)
9# any later version.
10#
11# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
12# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14# details.
15#
16# You should have received a copy of the GNU Lesser General Public License
17# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
18# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20"""
21Configuration file (aka ``ssh_config``) support.
22"""
24import fnmatch
25import getpass
26import os
27import re
28import shlex
29import socket
30from hashlib import sha1
31from io import StringIO
32from functools import partial
34invoke, invoke_import_error = None, None
35try:
36 import invoke
37except ImportError as e:
38 invoke_import_error = e
40from .ssh_exception import CouldNotCanonicalize, ConfigParseError
43SSH_PORT = 22
46class SSHConfig:
47 """
48 Representation of config information as stored in the format used by
49 OpenSSH. Queries can be made via `lookup`. The format is described in
50 OpenSSH's ``ssh_config`` man page. This class is provided primarily as a
51 convenience to posix users (since the OpenSSH format is a de-facto
52 standard on posix) but should work fine on Windows too.
54 .. versionadded:: 1.6
55 """
57 SETTINGS_REGEX = re.compile(r"(\w+)(?:\s*=\s*|\s+)(.+)")
59 # TODO: do a full scan of ssh.c & friends to make sure we're fully
60 # compatible across the board, e.g. OpenSSH 8.1 added %n to ProxyCommand.
61 TOKENS_BY_CONFIG_KEY = {
62 "controlpath": ["%C", "%h", "%l", "%L", "%n", "%p", "%r", "%u"],
63 "hostname": ["%h"],
64 "identityfile": ["%C", "~", "%d", "%h", "%l", "%u", "%r"],
65 "proxycommand": ["~", "%h", "%p", "%r"],
66 "proxyjump": ["%h", "%p", "%r"],
67 # Doesn't seem worth making this 'special' for now, it will fit well
68 # enough (no actual match-exec config key to be confused with).
69 "match-exec": ["%C", "%d", "%h", "%L", "%l", "%n", "%p", "%r", "%u"],
70 }
72 def __init__(self):
73 """
74 Create a new OpenSSH config object.
76 Note: the newer alternate constructors `from_path`, `from_file` and
77 `from_text` are simpler to use, as they parse on instantiation. For
78 example, instead of::
80 config = SSHConfig()
81 config.parse(open("some-path.config")
83 you could::
85 config = SSHConfig.from_file(open("some-path.config"))
86 # Or more directly:
87 config = SSHConfig.from_path("some-path.config")
88 # Or if you have arbitrary ssh_config text from some other source:
89 config = SSHConfig.from_text("Host foo\\n\\tUser bar")
90 """
91 self._config = []
93 @classmethod
94 def from_text(cls, text):
95 """
96 Create a new, parsed `SSHConfig` from ``text`` string.
98 .. versionadded:: 2.7
99 """
100 return cls.from_file(StringIO(text))
102 @classmethod
103 def from_path(cls, path):
104 """
105 Create a new, parsed `SSHConfig` from the file found at ``path``.
107 .. versionadded:: 2.7
108 """
109 with open(path) as flo:
110 return cls.from_file(flo)
112 @classmethod
113 def from_file(cls, flo):
114 """
115 Create a new, parsed `SSHConfig` from file-like object ``flo``.
117 .. versionadded:: 2.7
118 """
119 obj = cls()
120 obj.parse(flo)
121 return obj
123 def parse(self, file_obj):
124 """
125 Read an OpenSSH config from the given file object.
127 :param file_obj: a file-like object to read the config file from
128 """
129 # Start out w/ implicit/anonymous global host-like block to hold
130 # anything not contained by an explicit one.
131 context = {"host": ["*"], "config": {}}
132 for line in file_obj:
133 # Strip any leading or trailing whitespace from the line.
134 # Refer to https://github.com/paramiko/paramiko/issues/499
135 line = line.strip()
136 # Skip blanks, comments
137 if not line or line.startswith("#"):
138 continue
140 # Parse line into key, value
141 match = re.match(self.SETTINGS_REGEX, line)
142 if not match:
143 raise ConfigParseError("Unparsable line {}".format(line))
144 key = match.group(1).lower()
145 value = match.group(2)
147 # Host keyword triggers switch to new block/context
148 if key in ("host", "match"):
149 self._config.append(context)
150 context = {"config": {}}
151 if key == "host":
152 # TODO 4.0: make these real objects or at least name this
153 # "hosts" to acknowledge it's an iterable. (Doing so prior
154 # to 3.0, despite it being a private API, feels bad -
155 # surely such an old codebase has folks actually relying on
156 # these keys.)
157 context["host"] = self._get_hosts(value)
158 else:
159 context["matches"] = self._get_matches(value)
160 # Special-case for noop ProxyCommands
161 elif key == "proxycommand" and value.lower() == "none":
162 # Store 'none' as None - not as a string implying that the
163 # proxycommand is the literal shell command "none"!
164 context["config"][key] = None
165 # All other keywords get stored, directly or via append
166 else:
167 if value.startswith('"') and value.endswith('"'):
168 value = value[1:-1]
170 # identityfile, localforward, remoteforward keys are special
171 # cases, since they are allowed to be specified multiple times
172 # and they should be tried in order of specification.
173 if key in ["identityfile", "localforward", "remoteforward"]:
174 if key in context["config"]:
175 context["config"][key].append(value)
176 else:
177 context["config"][key] = [value]
178 elif key not in context["config"]:
179 context["config"][key] = value
180 # Store last 'open' block and we're done
181 self._config.append(context)
183 def lookup(self, hostname):
184 """
185 Return a dict (`SSHConfigDict`) of config options for a given hostname.
187 The host-matching rules of OpenSSH's ``ssh_config`` man page are used:
188 For each parameter, the first obtained value will be used. The
189 configuration files contain sections separated by ``Host`` and/or
190 ``Match`` specifications, and that section is only applied for hosts
191 which match the given patterns or keywords
193 Since the first obtained value for each parameter is used, more host-
194 specific declarations should be given near the beginning of the file,
195 and general defaults at the end.
197 The keys in the returned dict are all normalized to lowercase (look for
198 ``"port"``, not ``"Port"``. The values are processed according to the
199 rules for substitution variable expansion in ``ssh_config``.
201 Finally, please see the docs for `SSHConfigDict` for deeper info on
202 features such as optional type conversion methods, e.g.::
204 conf = my_config.lookup('myhost')
205 assert conf['passwordauthentication'] == 'yes'
206 assert conf.as_bool('passwordauthentication') is True
208 .. note::
209 If there is no explicitly configured ``HostName`` value, it will be
210 set to the being-looked-up hostname, which is as close as we can
211 get to OpenSSH's behavior around that particular option.
213 :param str hostname: the hostname to lookup
215 .. versionchanged:: 2.5
216 Returns `SSHConfigDict` objects instead of dict literals.
217 .. versionchanged:: 2.7
218 Added canonicalization support.
219 .. versionchanged:: 2.7
220 Added ``Match`` support.
221 """
222 # First pass
223 options = self._lookup(hostname=hostname)
224 # Inject HostName if it was not set (this used to be done incidentally
225 # during tokenization, for some reason).
226 if "hostname" not in options:
227 options["hostname"] = hostname
228 # Handle canonicalization
229 canon = options.get("canonicalizehostname", None) in ("yes", "always")
230 maxdots = int(options.get("canonicalizemaxdots", 1))
231 if canon and hostname.count(".") <= maxdots:
232 # NOTE: OpenSSH manpage does not explicitly state this, but its
233 # implementation for CanonicalDomains is 'split on any whitespace'.
234 domains = options["canonicaldomains"].split()
235 hostname = self.canonicalize(hostname, options, domains)
236 # Overwrite HostName again here (this is also what OpenSSH does)
237 options["hostname"] = hostname
238 options = self._lookup(hostname, options, canonical=True)
239 return options
241 def _lookup(self, hostname, options=None, canonical=False):
242 # Init
243 if options is None:
244 options = SSHConfigDict()
245 # Iterate all stanzas, applying any that match, in turn (so that things
246 # like Match can reference currently understood state)
247 for context in self._config:
248 if not (
249 self._pattern_matches(context.get("host", []), hostname)
250 or self._does_match(
251 context.get("matches", []), hostname, canonical, options
252 )
253 ):
254 continue
255 for key, value in context["config"].items():
256 if key not in options:
257 # Create a copy of the original value,
258 # else it will reference the original list
259 # in self._config and update that value too
260 # when the extend() is being called.
261 options[key] = value[:] if value is not None else value
262 elif key == "identityfile":
263 options[key].extend(
264 x for x in value if x not in options[key]
265 )
266 # Expand variables in resulting values (besides 'Match exec' which was
267 # already handled above)
268 options = self._expand_variables(options, hostname)
269 return options
271 def canonicalize(self, hostname, options, domains):
272 """
273 Return canonicalized version of ``hostname``.
275 :param str hostname: Target hostname.
276 :param options: An `SSHConfigDict` from a previous lookup pass.
277 :param domains: List of domains (e.g. ``["paramiko.org"]``).
279 :returns: A canonicalized hostname if one was found, else ``None``.
281 .. versionadded:: 2.7
282 """
283 found = False
284 for domain in domains:
285 candidate = "{}.{}".format(hostname, domain)
286 family_specific = _addressfamily_host_lookup(candidate, options)
287 if family_specific is not None:
288 # TODO: would we want to dig deeper into other results? e.g. to
289 # find something that satisfies PermittedCNAMEs when that is
290 # implemented?
291 found = family_specific[0]
292 else:
293 # TODO: what does ssh use here and is there a reason to use
294 # that instead of gethostbyname?
295 try:
296 found = socket.gethostbyname(candidate)
297 except socket.gaierror:
298 pass
299 if found:
300 # TODO: follow CNAME (implied by found != candidate?) if
301 # CanonicalizePermittedCNAMEs allows it
302 return candidate
303 # If we got here, it means canonicalization failed.
304 # When CanonicalizeFallbackLocal is undefined or 'yes', we just spit
305 # back the original hostname.
306 if options.get("canonicalizefallbacklocal", "yes") == "yes":
307 return hostname
308 # And here, we failed AND fallback was set to a non-yes value, so we
309 # need to get mad.
310 raise CouldNotCanonicalize(hostname)
312 def get_hostnames(self):
313 """
314 Return the set of literal hostnames defined in the SSH config (both
315 explicit hostnames and wildcard entries).
316 """
317 hosts = set()
318 for entry in self._config:
319 hosts.update(entry["host"])
320 return hosts
322 def _pattern_matches(self, patterns, target):
323 # Convenience auto-splitter if not already a list
324 if hasattr(patterns, "split"):
325 patterns = patterns.split(",")
326 match = False
327 for pattern in patterns:
328 # Short-circuit if target matches a negated pattern
329 if pattern.startswith("!") and fnmatch.fnmatch(
330 target, pattern[1:]
331 ):
332 return False
333 # Flag a match, but continue (in case of later negation) if regular
334 # match occurs
335 elif fnmatch.fnmatch(target, pattern):
336 match = True
337 return match
339 def _does_match(self, match_list, target_hostname, canonical, options):
340 matched = []
341 candidates = match_list[:]
342 local_username = getpass.getuser()
343 while candidates:
344 candidate = candidates.pop(0)
345 passed = None
346 # Obtain latest host/user value every loop, so later Match may
347 # reference values assigned within a prior Match.
348 configured_host = options.get("hostname", None)
349 configured_user = options.get("user", None)
350 type_, param = candidate["type"], candidate["param"]
351 # Canonical is a hard pass/fail based on whether this is a
352 # canonicalized re-lookup.
353 if type_ == "canonical":
354 if self._should_fail(canonical, candidate):
355 return False
356 # The parse step ensures we only see this by itself or after
357 # canonical, so it's also an easy hard pass. (No negation here as
358 # that would be uh, pretty weird?)
359 elif type_ == "all":
360 return True
361 # From here, we are testing various non-hard criteria,
362 # short-circuiting only on fail
363 elif type_ == "host":
364 hostval = configured_host or target_hostname
365 passed = self._pattern_matches(param, hostval)
366 elif type_ == "originalhost":
367 passed = self._pattern_matches(param, target_hostname)
368 elif type_ == "user":
369 user = configured_user or local_username
370 passed = self._pattern_matches(param, user)
371 elif type_ == "localuser":
372 passed = self._pattern_matches(param, local_username)
373 elif type_ == "exec":
374 exec_cmd = self._tokenize(
375 options, target_hostname, "match-exec", param
376 )
377 # This is the laziest spot in which we can get mad about an
378 # inability to import Invoke.
379 if invoke is None:
380 raise invoke_import_error
381 # Like OpenSSH, we 'redirect' stdout but let stderr bubble up
382 passed = invoke.run(exec_cmd, hide="stdout", warn=True).ok
383 # Tackle any 'passed, but was negated' results from above
384 if passed is not None and self._should_fail(passed, candidate):
385 return False
386 # Made it all the way here? Everything matched!
387 matched.append(candidate)
388 # Did anything match? (To be treated as bool, usually.)
389 return matched
391 def _should_fail(self, would_pass, candidate):
392 return would_pass if candidate["negate"] else not would_pass
394 def _tokenize(self, config, target_hostname, key, value):
395 """
396 Tokenize a string based on current config/hostname data.
398 :param config: Current config data.
399 :param target_hostname: Original target connection hostname.
400 :param key: Config key being tokenized (used to filter token list).
401 :param value: Config value being tokenized.
403 :returns: The tokenized version of the input ``value`` string.
404 """
405 allowed_tokens = self._allowed_tokens(key)
406 # Short-circuit if no tokenization possible
407 if not allowed_tokens:
408 return value
409 # Obtain potentially configured hostname, for use with %h.
410 # Special-case where we are tokenizing the hostname itself, to avoid
411 # replacing %h with a %h-bearing value, etc.
412 configured_hostname = target_hostname
413 if key != "hostname":
414 configured_hostname = config.get("hostname", configured_hostname)
415 # Ditto the rest of the source values
416 if "port" in config:
417 port = config["port"]
418 else:
419 port = SSH_PORT
420 user = getpass.getuser()
421 if "user" in config:
422 remoteuser = config["user"]
423 else:
424 remoteuser = user
425 local_hostname = socket.gethostname().split(".")[0]
426 local_fqdn = LazyFqdn(config, local_hostname)
427 homedir = os.path.expanduser("~")
428 tohash = local_hostname + target_hostname + repr(port) + remoteuser
429 # The actual tokens!
430 replacements = {
431 # TODO: %%???
432 "%C": sha1(tohash.encode()).hexdigest(),
433 "%d": homedir,
434 "%h": configured_hostname,
435 # TODO: %i?
436 "%L": local_hostname,
437 "%l": local_fqdn,
438 # also this is pseudo buggy when not in Match exec mode so document
439 # that. also WHY is that the case?? don't we do all of this late?
440 "%n": target_hostname,
441 "%p": port,
442 "%r": remoteuser,
443 # TODO: %T? don't believe this is possible however
444 "%u": user,
445 "~": homedir,
446 }
447 # Do the thing with the stuff
448 tokenized = value
449 for find, replace in replacements.items():
450 if find not in allowed_tokens:
451 continue
452 tokenized = tokenized.replace(find, str(replace))
453 # TODO: log? eg that value -> tokenized
454 return tokenized
456 def _allowed_tokens(self, key):
457 """
458 Given config ``key``, return list of token strings to tokenize.
460 .. note::
461 This feels like it wants to eventually go away, but is used to
462 preserve as-strict-as-possible compatibility with OpenSSH, which
463 for whatever reason only applies some tokens to some config keys.
464 """
465 return self.TOKENS_BY_CONFIG_KEY.get(key, [])
467 def _expand_variables(self, config, target_hostname):
468 """
469 Return a dict of config options with expanded substitutions
470 for a given original & current target hostname.
472 Please refer to :doc:`/api/config` for details.
474 :param dict config: the currently parsed config
475 :param str hostname: the hostname whose config is being looked up
476 """
477 for k in config:
478 if config[k] is None:
479 continue
480 tokenizer = partial(self._tokenize, config, target_hostname, k)
481 if isinstance(config[k], list):
482 for i, value in enumerate(config[k]):
483 config[k][i] = tokenizer(value)
484 else:
485 config[k] = tokenizer(config[k])
486 return config
488 def _get_hosts(self, host):
489 """
490 Return a list of host_names from host value.
491 """
492 try:
493 return shlex.split(host)
494 except ValueError:
495 raise ConfigParseError("Unparsable host {}".format(host))
497 def _get_matches(self, match):
498 """
499 Parse a specific Match config line into a list-of-dicts for its values.
501 Performs some parse-time validation as well.
502 """
503 matches = []
504 tokens = shlex.split(match)
505 while tokens:
506 match = {"type": None, "param": None, "negate": False}
507 type_ = tokens.pop(0)
508 # Handle per-keyword negation
509 if type_.startswith("!"):
510 match["negate"] = True
511 type_ = type_[1:]
512 match["type"] = type_
513 # all/canonical have no params (everything else does)
514 if type_ in ("all", "canonical"):
515 matches.append(match)
516 continue
517 if not tokens:
518 raise ConfigParseError(
519 "Missing parameter to Match '{}' keyword".format(type_)
520 )
521 match["param"] = tokens.pop(0)
522 matches.append(match)
523 # Perform some (easier to do now than in the middle) validation that is
524 # better handled here than at lookup time.
525 keywords = [x["type"] for x in matches]
526 if "all" in keywords:
527 allowable = ("all", "canonical")
528 ok, bad = (
529 list(filter(lambda x: x in allowable, keywords)),
530 list(filter(lambda x: x not in allowable, keywords)),
531 )
532 err = None
533 if any(bad):
534 err = "Match does not allow 'all' mixed with anything but 'canonical'" # noqa
535 elif "canonical" in ok and ok.index("canonical") > ok.index("all"):
536 err = "Match does not allow 'all' before 'canonical'"
537 if err is not None:
538 raise ConfigParseError(err)
539 return matches
542def _addressfamily_host_lookup(hostname, options):
543 """
544 Try looking up ``hostname`` in an IPv4 or IPv6 specific manner.
546 This is an odd duck due to needing use in two divergent use cases. It looks
547 up ``AddressFamily`` in ``options`` and if it is ``inet`` or ``inet6``,
548 this function uses `socket.getaddrinfo` to perform a family-specific
549 lookup, returning the result if successful.
551 In any other situation -- lookup failure, or ``AddressFamily`` being
552 unspecified or ``any`` -- ``None`` is returned instead and the caller is
553 expected to do something situation-appropriate like calling
554 `socket.gethostbyname`.
556 :param str hostname: Hostname to look up.
557 :param options: `SSHConfigDict` instance w/ parsed options.
558 :returns: ``getaddrinfo``-style tuples, or ``None``, depending.
559 """
560 address_family = options.get("addressfamily", "any").lower()
561 if address_family == "any":
562 return
563 try:
564 family = socket.AF_INET6
565 if address_family == "inet":
566 family = socket.AF_INET
567 return socket.getaddrinfo(
568 hostname,
569 None,
570 family,
571 socket.SOCK_DGRAM,
572 socket.IPPROTO_IP,
573 socket.AI_CANONNAME,
574 )
575 except socket.gaierror:
576 pass
579class LazyFqdn:
580 """
581 Returns the host's fqdn on request as string.
582 """
584 def __init__(self, config, host=None):
585 self.fqdn = None
586 self.config = config
587 self.host = host
589 def __str__(self):
590 if self.fqdn is None:
591 #
592 # If the SSH config contains AddressFamily, use that when
593 # determining the local host's FQDN. Using socket.getfqdn() from
594 # the standard library is the most general solution, but can
595 # result in noticeable delays on some platforms when IPv6 is
596 # misconfigured or not available, as it calls getaddrinfo with no
597 # address family specified, so both IPv4 and IPv6 are checked.
598 #
600 # Handle specific option
601 fqdn = None
602 results = _addressfamily_host_lookup(self.host, self.config)
603 if results is not None:
604 for res in results:
605 af, socktype, proto, canonname, sa = res
606 if canonname and "." in canonname:
607 fqdn = canonname
608 break
609 # Handle 'any' / unspecified / lookup failure
610 if fqdn is None:
611 fqdn = socket.getfqdn()
612 # Cache
613 self.fqdn = fqdn
614 return self.fqdn
617class SSHConfigDict(dict):
618 """
619 A dictionary wrapper/subclass for per-host configuration structures.
621 This class introduces some usage niceties for consumers of `SSHConfig`,
622 specifically around the issue of variable type conversions: normal value
623 access yields strings, but there are now methods such as `as_bool` and
624 `as_int` that yield casted values instead.
626 For example, given the following ``ssh_config`` file snippet::
628 Host foo.example.com
629 PasswordAuthentication no
630 Compression yes
631 ServerAliveInterval 60
633 the following code highlights how you can access the raw strings as well as
634 usefully Python type-casted versions (recalling that keys are all
635 normalized to lowercase first)::
637 my_config = SSHConfig()
638 my_config.parse(open('~/.ssh/config'))
639 conf = my_config.lookup('foo.example.com')
641 assert conf['passwordauthentication'] == 'no'
642 assert conf.as_bool('passwordauthentication') is False
643 assert conf['compression'] == 'yes'
644 assert conf.as_bool('compression') is True
645 assert conf['serveraliveinterval'] == '60'
646 assert conf.as_int('serveraliveinterval') == 60
648 .. versionadded:: 2.5
649 """
651 def as_bool(self, key):
652 """
653 Express given key's value as a boolean type.
655 Typically, this is used for ``ssh_config``'s pseudo-boolean values
656 which are either ``"yes"`` or ``"no"``. In such cases, ``"yes"`` yields
657 ``True`` and any other value becomes ``False``.
659 .. note::
660 If (for whatever reason) the stored value is already boolean in
661 nature, it's simply returned.
663 .. versionadded:: 2.5
664 """
665 val = self[key]
666 if isinstance(val, bool):
667 return val
668 return val.lower() == "yes"
670 def as_int(self, key):
671 """
672 Express given key's value as an integer, if possible.
674 This method will raise ``ValueError`` or similar if the value is not
675 int-appropriate, same as the builtin `int` type.
677 .. versionadded:: 2.5
678 """
679 return int(self[key])