Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/auth_strategy.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Modern, adaptable authentication machinery.
4Replaces certain parts of `.SSHClient`. For a concrete implementation, see the
5``OpenSSHAuthStrategy`` class in `Fabric <https://fabfile.org>`_.
6"""
8from collections import namedtuple
10from .agent import AgentKey
11from .ssh_exception import AuthenticationException
12from .util import get_logger
15class AuthSource:
16 """
17 Some SSH authentication source, such as a password, private key, or agent.
19 See subclasses in this module for concrete implementations.
21 All implementations must accept at least a ``username`` (``str``) kwarg.
22 """
24 def __init__(self, username):
25 self.username = username
27 def _repr(self, **kwargs):
28 # TODO: are there any good libs for this? maybe some helper from
29 # structlog?
30 pairs = [f"{k}={v!r}" for k, v in kwargs.items()]
31 joined = ", ".join(pairs)
32 return f"{self.__class__.__name__}({joined})"
34 def __repr__(self):
35 return self._repr()
37 def authenticate(self, transport):
38 """
39 Perform authentication.
40 """
41 raise NotImplementedError
44class NoneAuth(AuthSource):
45 """
46 Auth type "none", ie https://www.rfc-editor.org/rfc/rfc4252#section-5.2 .
47 """
49 def authenticate(self, transport):
50 return transport.auth_none(self.username)
53class Password(AuthSource):
54 """
55 Password authentication.
57 :param callable password_getter:
58 A lazy callable that should return a `str` password value at
59 authentication time, such as a `functools.partial` wrapping
60 `getpass.getpass`, an API call to a secrets store, or similar.
62 If you already know the password at instantiation time, you should
63 simply use something like ``lambda: "my literal"`` (for a literal, but
64 also, shame on you!) or ``lambda: variable_name`` (for something stored
65 in a variable).
66 """
68 def __init__(self, username, password_getter):
69 super().__init__(username=username)
70 self.password_getter = password_getter
72 def __repr__(self):
73 # Password auth is marginally more 'username-caring' than pkeys, so may
74 # as well log that info here.
75 return super()._repr(user=self.username)
77 def authenticate(self, transport):
78 # Lazily get the password, in case it's prompting a user
79 # TODO: be nice to log source _of_ the password?
80 password = self.password_getter()
81 return transport.auth_password(self.username, password)
84# TODO (backwards incompat): twiddle this, or PKey, or both, so they're more
85# obviously distinct.
86# TODO (backwards incompat): the obvious is to make this more wordy
87# (PrivateKeyAuth), the minimalist approach might be to rename PKey to just Key
88# (esp given all the subclasses are WhateverKey and not WhateverPKey)
89class PrivateKey(AuthSource):
90 """
91 Essentially a mixin for private keys.
93 Knows how to auth, but leaves key material discovery/loading/decryption to
94 subclasses.
96 Subclasses **must** ensure that they've set ``self.pkey`` to a decrypted
97 `.PKey` instance before calling ``super().authenticate``; typically
98 either in their ``__init__``, or in an overridden ``authenticate`` prior to
99 its `super` call.
100 """
102 def authenticate(self, transport):
103 return transport.auth_publickey(self.username, self.pkey)
106class InMemoryPrivateKey(PrivateKey):
107 """
108 An in-memory, decrypted `.PKey` object.
109 """
111 def __init__(self, username, pkey):
112 super().__init__(username=username)
113 # No decryption (presumably) necessary!
114 self.pkey = pkey
116 def __repr__(self):
117 # NOTE: most of interesting repr-bits for private keys is in PKey.
118 # TODO: tacking on agent-ness like this is a bit awkward, but, eh?
119 rep = super()._repr(pkey=self.pkey)
120 if isinstance(self.pkey, AgentKey):
121 rep += " [agent]"
122 return rep
125class OnDiskPrivateKey(PrivateKey):
126 """
127 Some on-disk private key that needs opening and possibly decrypting.
129 :param str source:
130 String tracking where this key's path was specified; should be one of
131 ``"ssh-config"``, ``"python-config"``, or ``"implicit-home"``.
132 :param Path path:
133 The filesystem path this key was loaded from.
134 :param PKey pkey:
135 The `PKey` object this auth source uses/represents.
136 """
138 def __init__(self, username, source, path, pkey):
139 super().__init__(username=username)
140 self.source = source
141 allowed = ("ssh-config", "python-config", "implicit-home")
142 if source not in allowed:
143 raise ValueError(f"source argument must be one of: {allowed!r}")
144 self.path = path
145 # Superclass wants .pkey, other two are mostly for display/debugging.
146 self.pkey = pkey
148 def __repr__(self):
149 return self._repr(
150 key=self.pkey, source=self.source, path=str(self.path)
151 )
154# TODO re sources: is there anything in an OpenSSH config file that doesn't fit
155# into what Paramiko already had kwargs for?
158SourceResult = namedtuple("SourceResult", ["source", "result"])
161# TODO: tempting to make this an OrderedDict, except the keys essentially want
162# to be rich objects (AuthSources) which do not make for useful user indexing?
163# TODO: members being vanilla tuples is pretty old-school/expedient; they
164# "really" want to be something that's type friendlier (unless the tuple's 2nd
165# member being a Union of two types is "fine"?), which I assume means yet more
166# classes, eg an abstract SourceResult with concrete AuthSuccess and
167# AuthFailure children?
168# TODO: arguably we want __init__ typechecking of the members (or to leverage
169# mypy by classifying this literally as list-of-AuthSource?)
170class AuthResult(list):
171 """
172 Represents a partial or complete SSH authentication attempt.
174 This class conceptually extends `AuthStrategy` by pairing the former's
175 authentication **sources** with the **results** of trying to authenticate
176 with them.
178 `AuthResult` is a (subclass of) `list` of `namedtuple`, which are of the
179 form ``namedtuple('SourceResult', 'source', 'result')`` (where the
180 ``source`` member is an `AuthSource` and the ``result`` member is either a
181 return value from the relevant `.Transport` method, or an exception
182 object).
184 .. note::
185 Transport auth method results are always themselves a ``list`` of "next
186 allowable authentication methods".
188 In the simple case of "you just authenticated successfully", it's an
189 empty list; if your auth was rejected but you're allowed to try again,
190 it will be a list of string method names like ``pubkey`` or
191 ``password``.
193 The ``__str__`` of this class represents the empty-list scenario as the
194 word ``success``, which should make reading the result of an
195 authentication session more obvious to humans.
197 Instances also have a `strategy` attribute referencing the `AuthStrategy`
198 which was attempted.
199 """
201 def __init__(self, strategy, *args, **kwargs):
202 self.strategy = strategy
203 super().__init__(*args, **kwargs)
205 def __str__(self):
206 # NOTE: meaningfully distinct from __repr__, which still wants to use
207 # superclass' implementation.
208 # TODO: go hog wild, use rich.Table? how is that on degraded term's?
209 # TODO: test this lol
210 return "\n".join(
211 f"{x.source} -> {x.result or 'success'}" for x in self
212 )
215# TODO (backwards incompat): descend from SSHException or even just Exception
216class AuthFailure(AuthenticationException):
217 """
218 Basic exception wrapping an `AuthResult` indicating overall auth failure.
220 Note that `AuthFailure` descends from `AuthenticationException` but is
221 generally "higher level"; the latter is now only raised by individual
222 `AuthSource` attempts and should typically only be seen by users when
223 encapsulated in this class. It subclasses `AuthenticationException`
224 primarily for backwards compatibility reasons.
225 """
227 def __init__(self, result):
228 self.result = result
230 def __str__(self):
231 return "\n" + str(self.result)
234class AuthStrategy:
235 """
236 This class represents one or more attempts to auth with an SSH server.
238 By default, subclasses must at least accept an ``ssh_config``
239 (`.SSHConfig`) keyword argument, but may opt to accept more as needed for
240 their particular strategy.
241 """
243 def __init__(
244 self,
245 ssh_config,
246 ):
247 self.ssh_config = ssh_config
248 self.log = get_logger(__name__)
250 def get_sources(self):
251 """
252 Generator yielding `AuthSource` instances, in the order to try.
254 This is the primary override point for subclasses: you figure out what
255 sources you need, and ``yield`` them.
257 Subclasses _of_ subclasses may find themselves wanting to do things
258 like filtering or discarding around a call to `super`.
259 """
260 raise NotImplementedError
262 def authenticate(self, transport):
263 """
264 Handles attempting `AuthSource` instances yielded from `get_sources`.
266 You *normally* won't need to override this, but it's an option for
267 advanced users.
268 """
269 succeeded = False
270 overall_result = AuthResult(strategy=self)
271 # TODO: arguably we could fit in a "send none auth, record allowed auth
272 # types sent back" thing here as OpenSSH-client does, but that likely
273 # wants to live in fabric.OpenSSHAuthStrategy as not all target servers
274 # will implement it!
275 # TODO: needs better "server told us too many attempts" checking!
276 for source in self.get_sources():
277 self.log.debug(f"Trying {source}")
278 try: # NOTE: this really wants to _only_ wrap the authenticate()!
279 result = source.authenticate(transport)
280 succeeded = True
281 # TODO: 'except PartialAuthentication' is needed for 2FA and
282 # similar, as per old SSHClient.connect - it is the only way
283 # AuthHandler supplies access to the 'name-list' field from
284 # MSG_USERAUTH_FAILURE, at present.
285 except Exception as e:
286 result = e
287 # TODO: look at what this could possibly raise, we don't really
288 # want Exception here, right? just SSHException subclasses? or
289 # do we truly want to capture anything at all with assumption
290 # it's easy enough for users to look afterwards?
291 # NOTE: showing type, not message, for tersity & also most of
292 # the time it's basically just "Authentication failed."
293 source_class = e.__class__.__name__
294 self.log.info(
295 f"Authentication via {source} failed with {source_class}"
296 )
297 overall_result.append(SourceResult(source, result))
298 if succeeded:
299 break
300 # Gotta die here if nothing worked, otherwise Transport's main loop
301 # just kinda hangs out until something times out!
302 if not succeeded:
303 raise AuthFailure(result=overall_result)
304 # Success: give back what was done, in case they care.
305 return overall_result
307 # TODO: is there anything OpenSSH client does which _can't_ cleanly map to
308 # iterating a generator?