Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/auth_strategy.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

87 statements  

1""" 

2Modern, adaptable authentication machinery. 

3 

4Replaces certain parts of `.SSHClient`. For a concrete implementation, see the 

5``OpenSSHAuthStrategy`` class in `Fabric <https://fabfile.org>`_. 

6""" 

7 

8from collections import namedtuple 

9 

10from .agent import AgentKey 

11from .ssh_exception import AuthenticationException 

12from .util import get_logger 

13 

14 

15class AuthSource: 

16 """ 

17 Some SSH authentication source, such as a password, private key, or agent. 

18 

19 See subclasses in this module for concrete implementations. 

20 

21 All implementations must accept at least a ``username`` (``str``) kwarg. 

22 """ 

23 

24 def __init__(self, username): 

25 self.username = username 

26 

27 def _repr(self, **kwargs): 

28 # TODO: are there any good libs for this? maybe some helper from 

29 # structlog? 

30 pairs = [f"{k}={v!r}" for k, v in kwargs.items()] 

31 joined = ", ".join(pairs) 

32 return f"{self.__class__.__name__}({joined})" 

33 

34 def __repr__(self): 

35 return self._repr() 

36 

37 def authenticate(self, transport): 

38 """ 

39 Perform authentication. 

40 """ 

41 raise NotImplementedError 

42 

43 

44class NoneAuth(AuthSource): 

45 """ 

46 Auth type "none", ie https://www.rfc-editor.org/rfc/rfc4252#section-5.2 . 

47 """ 

48 

49 def authenticate(self, transport): 

50 return transport.auth_none(self.username) 

51 

52 

53class Password(AuthSource): 

54 """ 

55 Password authentication. 

56 

57 :param callable password_getter: 

58 A lazy callable that should return a `str` password value at 

59 authentication time, such as a `functools.partial` wrapping 

60 `getpass.getpass`, an API call to a secrets store, or similar. 

61 

62 If you already know the password at instantiation time, you should 

63 simply use something like ``lambda: "my literal"`` (for a literal, but 

64 also, shame on you!) or ``lambda: variable_name`` (for something stored 

65 in a variable). 

66 """ 

67 

68 def __init__(self, username, password_getter): 

69 super().__init__(username=username) 

70 self.password_getter = password_getter 

71 

72 def __repr__(self): 

73 # Password auth is marginally more 'username-caring' than pkeys, so may 

74 # as well log that info here. 

75 return super()._repr(user=self.username) 

76 

77 def authenticate(self, transport): 

78 # Lazily get the password, in case it's prompting a user 

79 # TODO: be nice to log source _of_ the password? 

80 password = self.password_getter() 

81 return transport.auth_password(self.username, password) 

82 

83 

84# TODO (backwards incompat): twiddle this, or PKey, or both, so they're more 

85# obviously distinct. 

86# TODO (backwards incompat): the obvious is to make this more wordy 

87# (PrivateKeyAuth), the minimalist approach might be to rename PKey to just Key 

88# (esp given all the subclasses are WhateverKey and not WhateverPKey) 

89class PrivateKey(AuthSource): 

90 """ 

91 Essentially a mixin for private keys. 

92 

93 Knows how to auth, but leaves key material discovery/loading/decryption to 

94 subclasses. 

95 

96 Subclasses **must** ensure that they've set ``self.pkey`` to a decrypted 

97 `.PKey` instance before calling ``super().authenticate``; typically 

98 either in their ``__init__``, or in an overridden ``authenticate`` prior to 

99 its `super` call. 

100 """ 

101 

102 def authenticate(self, transport): 

103 return transport.auth_publickey(self.username, self.pkey) 

104 

105 

106class InMemoryPrivateKey(PrivateKey): 

107 """ 

108 An in-memory, decrypted `.PKey` object. 

109 """ 

110 

111 def __init__(self, username, pkey): 

112 super().__init__(username=username) 

113 # No decryption (presumably) necessary! 

114 self.pkey = pkey 

115 

116 def __repr__(self): 

117 # NOTE: most of interesting repr-bits for private keys is in PKey. 

118 # TODO: tacking on agent-ness like this is a bit awkward, but, eh? 

119 rep = super()._repr(pkey=self.pkey) 

120 if isinstance(self.pkey, AgentKey): 

121 rep += " [agent]" 

122 return rep 

123 

124 

125class OnDiskPrivateKey(PrivateKey): 

126 """ 

127 Some on-disk private key that needs opening and possibly decrypting. 

128 

129 :param str source: 

130 String tracking where this key's path was specified; should be one of 

131 ``"ssh-config"``, ``"python-config"``, or ``"implicit-home"``. 

132 :param Path path: 

133 The filesystem path this key was loaded from. 

134 :param PKey pkey: 

135 The `PKey` object this auth source uses/represents. 

136 """ 

137 

138 def __init__(self, username, source, path, pkey): 

139 super().__init__(username=username) 

140 self.source = source 

141 allowed = ("ssh-config", "python-config", "implicit-home") 

142 if source not in allowed: 

143 raise ValueError(f"source argument must be one of: {allowed!r}") 

144 self.path = path 

145 # Superclass wants .pkey, other two are mostly for display/debugging. 

146 self.pkey = pkey 

147 

148 def __repr__(self): 

149 return self._repr( 

150 key=self.pkey, source=self.source, path=str(self.path) 

151 ) 

152 

153 

154# TODO re sources: is there anything in an OpenSSH config file that doesn't fit 

155# into what Paramiko already had kwargs for? 

156 

157 

158SourceResult = namedtuple("SourceResult", ["source", "result"]) 

159 

160 

161# TODO: tempting to make this an OrderedDict, except the keys essentially want 

162# to be rich objects (AuthSources) which do not make for useful user indexing? 

163# TODO: members being vanilla tuples is pretty old-school/expedient; they 

164# "really" want to be something that's type friendlier (unless the tuple's 2nd 

165# member being a Union of two types is "fine"?), which I assume means yet more 

166# classes, eg an abstract SourceResult with concrete AuthSuccess and 

167# AuthFailure children? 

168# TODO: arguably we want __init__ typechecking of the members (or to leverage 

169# mypy by classifying this literally as list-of-AuthSource?) 

170class AuthResult(list): 

171 """ 

172 Represents a partial or complete SSH authentication attempt. 

173 

174 This class conceptually extends `AuthStrategy` by pairing the former's 

175 authentication **sources** with the **results** of trying to authenticate 

176 with them. 

177 

178 `AuthResult` is a (subclass of) `list` of `namedtuple`, which are of the 

179 form ``namedtuple('SourceResult', 'source', 'result')`` (where the 

180 ``source`` member is an `AuthSource` and the ``result`` member is either a 

181 return value from the relevant `.Transport` method, or an exception 

182 object). 

183 

184 .. note:: 

185 Transport auth method results are always themselves a ``list`` of "next 

186 allowable authentication methods". 

187 

188 In the simple case of "you just authenticated successfully", it's an 

189 empty list; if your auth was rejected but you're allowed to try again, 

190 it will be a list of string method names like ``pubkey`` or 

191 ``password``. 

192 

193 The ``__str__`` of this class represents the empty-list scenario as the 

194 word ``success``, which should make reading the result of an 

195 authentication session more obvious to humans. 

196 

197 Instances also have a `strategy` attribute referencing the `AuthStrategy` 

198 which was attempted. 

199 """ 

200 

201 def __init__(self, strategy, *args, **kwargs): 

202 self.strategy = strategy 

203 super().__init__(*args, **kwargs) 

204 

205 def __str__(self): 

206 # NOTE: meaningfully distinct from __repr__, which still wants to use 

207 # superclass' implementation. 

208 # TODO: go hog wild, use rich.Table? how is that on degraded term's? 

209 # TODO: test this lol 

210 return "\n".join( 

211 f"{x.source} -> {x.result or 'success'}" for x in self 

212 ) 

213 

214 

215# TODO (backwards incompat): descend from SSHException or even just Exception 

216class AuthFailure(AuthenticationException): 

217 """ 

218 Basic exception wrapping an `AuthResult` indicating overall auth failure. 

219 

220 Note that `AuthFailure` descends from `AuthenticationException` but is 

221 generally "higher level"; the latter is now only raised by individual 

222 `AuthSource` attempts and should typically only be seen by users when 

223 encapsulated in this class. It subclasses `AuthenticationException` 

224 primarily for backwards compatibility reasons. 

225 """ 

226 

227 def __init__(self, result): 

228 self.result = result 

229 

230 def __str__(self): 

231 return "\n" + str(self.result) 

232 

233 

234class AuthStrategy: 

235 """ 

236 This class represents one or more attempts to auth with an SSH server. 

237 

238 By default, subclasses must at least accept an ``ssh_config`` 

239 (`.SSHConfig`) keyword argument, but may opt to accept more as needed for 

240 their particular strategy. 

241 """ 

242 

243 def __init__( 

244 self, 

245 ssh_config, 

246 ): 

247 self.ssh_config = ssh_config 

248 self.log = get_logger(__name__) 

249 

250 def get_sources(self): 

251 """ 

252 Generator yielding `AuthSource` instances, in the order to try. 

253 

254 This is the primary override point for subclasses: you figure out what 

255 sources you need, and ``yield`` them. 

256 

257 Subclasses _of_ subclasses may find themselves wanting to do things 

258 like filtering or discarding around a call to `super`. 

259 """ 

260 raise NotImplementedError 

261 

262 def authenticate(self, transport): 

263 """ 

264 Handles attempting `AuthSource` instances yielded from `get_sources`. 

265 

266 You *normally* won't need to override this, but it's an option for 

267 advanced users. 

268 """ 

269 succeeded = False 

270 overall_result = AuthResult(strategy=self) 

271 # TODO: arguably we could fit in a "send none auth, record allowed auth 

272 # types sent back" thing here as OpenSSH-client does, but that likely 

273 # wants to live in fabric.OpenSSHAuthStrategy as not all target servers 

274 # will implement it! 

275 # TODO: needs better "server told us too many attempts" checking! 

276 for source in self.get_sources(): 

277 self.log.debug(f"Trying {source}") 

278 try: # NOTE: this really wants to _only_ wrap the authenticate()! 

279 result = source.authenticate(transport) 

280 succeeded = True 

281 # TODO: 'except PartialAuthentication' is needed for 2FA and 

282 # similar, as per old SSHClient.connect - it is the only way 

283 # AuthHandler supplies access to the 'name-list' field from 

284 # MSG_USERAUTH_FAILURE, at present. 

285 except Exception as e: 

286 result = e 

287 # TODO: look at what this could possibly raise, we don't really 

288 # want Exception here, right? just SSHException subclasses? or 

289 # do we truly want to capture anything at all with assumption 

290 # it's easy enough for users to look afterwards? 

291 # NOTE: showing type, not message, for tersity & also most of 

292 # the time it's basically just "Authentication failed." 

293 source_class = e.__class__.__name__ 

294 self.log.info( 

295 f"Authentication via {source} failed with {source_class}" 

296 ) 

297 overall_result.append(SourceResult(source, result)) 

298 if succeeded: 

299 break 

300 # Gotta die here if nothing worked, otherwise Transport's main loop 

301 # just kinda hangs out until something times out! 

302 if not succeeded: 

303 raise AuthFailure(result=overall_result) 

304 # Success: give back what was done, in case they care. 

305 return overall_result 

306 

307 # TODO: is there anything OpenSSH client does which _can't_ cleanly map to 

308 # iterating a generator?