Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/hostkeys.py: 20%

199 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19 

20from base64 import encodebytes, decodebytes 

21import binascii 

22import os 

23import re 

24 

25from collections.abc import MutableMapping 

26from hashlib import sha1 

27from hmac import HMAC 

28 

29 

30from paramiko.dsskey import DSSKey 

31from paramiko.rsakey import RSAKey 

32from paramiko.util import get_logger, constant_time_bytes_eq, b, u 

33from paramiko.ecdsakey import ECDSAKey 

34from paramiko.ed25519key import Ed25519Key 

35from paramiko.ssh_exception import SSHException 

36 

37 

38class HostKeys(MutableMapping): 

39 """ 

40 Representation of an OpenSSH-style "known hosts" file. Host keys can be 

41 read from one or more files, and then individual hosts can be looked up to 

42 verify server keys during SSH negotiation. 

43 

44 A `.HostKeys` object can be treated like a dict; any dict lookup is 

45 equivalent to calling `lookup`. 

46 

47 .. versionadded:: 1.5.3 

48 """ 

49 

50 def __init__(self, filename=None): 

51 """ 

52 Create a new HostKeys object, optionally loading keys from an OpenSSH 

53 style host-key file. 

54 

55 :param str filename: filename to load host keys from, or ``None`` 

56 """ 

57 # emulate a dict of { hostname: { keytype: PKey } } 

58 self._entries = [] 

59 if filename is not None: 

60 self.load(filename) 

61 

62 def add(self, hostname, keytype, key): 

63 """ 

64 Add a host key entry to the table. Any existing entry for a 

65 ``(hostname, keytype)`` pair will be replaced. 

66 

67 :param str hostname: the hostname (or IP) to add 

68 :param str keytype: key type (``"ssh-rsa"`` or ``"ssh-dss"``) 

69 :param .PKey key: the key to add 

70 """ 

71 for e in self._entries: 

72 if (hostname in e.hostnames) and (e.key.get_name() == keytype): 

73 e.key = key 

74 return 

75 self._entries.append(HostKeyEntry([hostname], key)) 

76 

77 def load(self, filename): 

78 """ 

79 Read a file of known SSH host keys, in the format used by OpenSSH. 

80 This type of file unfortunately doesn't exist on Windows, but on 

81 posix, it will usually be stored in 

82 ``os.path.expanduser("~/.ssh/known_hosts")``. 

83 

84 If this method is called multiple times, the host keys are merged, 

85 not cleared. So multiple calls to `load` will just call `add`, 

86 replacing any existing entries and adding new ones. 

87 

88 :param str filename: name of the file to read host keys from 

89 

90 :raises: ``IOError`` -- if there was an error reading the file 

91 """ 

92 with open(filename, "r") as f: 

93 for lineno, line in enumerate(f, 1): 

94 line = line.strip() 

95 if (len(line) == 0) or (line[0] == "#"): 

96 continue 

97 try: 

98 e = HostKeyEntry.from_line(line, lineno) 

99 except SSHException: 

100 continue 

101 if e is not None: 

102 _hostnames = e.hostnames 

103 for h in _hostnames: 

104 if self.check(h, e.key): 

105 e.hostnames.remove(h) 

106 if len(e.hostnames): 

107 self._entries.append(e) 

108 

109 def save(self, filename): 

110 """ 

111 Save host keys into a file, in the format used by OpenSSH. The order 

112 of keys in the file will be preserved when possible (if these keys were 

113 loaded from a file originally). The single exception is that combined 

114 lines will be split into individual key lines, which is arguably a bug. 

115 

116 :param str filename: name of the file to write 

117 

118 :raises: ``IOError`` -- if there was an error writing the file 

119 

120 .. versionadded:: 1.6.1 

121 """ 

122 with open(filename, "w") as f: 

123 for e in self._entries: 

124 line = e.to_line() 

125 if line: 

126 f.write(line) 

127 

128 def lookup(self, hostname): 

129 """ 

130 Find a hostkey entry for a given hostname or IP. If no entry is found, 

131 ``None`` is returned. Otherwise a dictionary of keytype to key is 

132 returned. The keytype will be either ``"ssh-rsa"`` or ``"ssh-dss"``. 

133 

134 :param str hostname: the hostname (or IP) to lookup 

135 :return: dict of `str` -> `.PKey` keys associated with this host 

136 (or ``None``) 

137 """ 

138 

139 class SubDict(MutableMapping): 

140 def __init__(self, hostname, entries, hostkeys): 

141 self._hostname = hostname 

142 self._entries = entries 

143 self._hostkeys = hostkeys 

144 

145 def __iter__(self): 

146 for k in self.keys(): 

147 yield k 

148 

149 def __len__(self): 

150 return len(self.keys()) 

151 

152 def __delitem__(self, key): 

153 for e in list(self._entries): 

154 if e.key.get_name() == key: 

155 self._entries.remove(e) 

156 break 

157 else: 

158 raise KeyError(key) 

159 

160 def __getitem__(self, key): 

161 for e in self._entries: 

162 if e.key.get_name() == key: 

163 return e.key 

164 raise KeyError(key) 

165 

166 def __setitem__(self, key, val): 

167 for e in self._entries: 

168 if e.key is None: 

169 continue 

170 if e.key.get_name() == key: 

171 # replace 

172 e.key = val 

173 break 

174 else: 

175 # add a new one 

176 e = HostKeyEntry([hostname], val) 

177 self._entries.append(e) 

178 self._hostkeys._entries.append(e) 

179 

180 def keys(self): 

181 return [ 

182 e.key.get_name() 

183 for e in self._entries 

184 if e.key is not None 

185 ] 

186 

187 entries = [] 

188 for e in self._entries: 

189 if self._hostname_matches(hostname, e): 

190 entries.append(e) 

191 if len(entries) == 0: 

192 return None 

193 return SubDict(hostname, entries, self) 

194 

195 def _hostname_matches(self, hostname, entry): 

196 """ 

197 Tests whether ``hostname`` string matches given SubDict ``entry``. 

198 

199 :returns bool: 

200 """ 

201 for h in entry.hostnames: 

202 if ( 

203 h == hostname 

204 or h.startswith("|1|") 

205 and not hostname.startswith("|1|") 

206 and constant_time_bytes_eq(self.hash_host(hostname, h), h) 

207 ): 

208 return True 

209 return False 

210 

211 def check(self, hostname, key): 

212 """ 

213 Return True if the given key is associated with the given hostname 

214 in this dictionary. 

215 

216 :param str hostname: hostname (or IP) of the SSH server 

217 :param .PKey key: the key to check 

218 :return: 

219 ``True`` if the key is associated with the hostname; else ``False`` 

220 """ 

221 k = self.lookup(hostname) 

222 if k is None: 

223 return False 

224 host_key = k.get(key.get_name(), None) 

225 if host_key is None: 

226 return False 

227 return host_key.asbytes() == key.asbytes() 

228 

229 def clear(self): 

230 """ 

231 Remove all host keys from the dictionary. 

232 """ 

233 self._entries = [] 

234 

235 def __iter__(self): 

236 for k in self.keys(): 

237 yield k 

238 

239 def __len__(self): 

240 return len(self.keys()) 

241 

242 def __getitem__(self, key): 

243 ret = self.lookup(key) 

244 if ret is None: 

245 raise KeyError(key) 

246 return ret 

247 

248 def __delitem__(self, key): 

249 index = None 

250 for i, entry in enumerate(self._entries): 

251 if self._hostname_matches(key, entry): 

252 index = i 

253 break 

254 if index is None: 

255 raise KeyError(key) 

256 self._entries.pop(index) 

257 

258 def __setitem__(self, hostname, entry): 

259 # don't use this please. 

260 if len(entry) == 0: 

261 self._entries.append(HostKeyEntry([hostname], None)) 

262 return 

263 for key_type in entry.keys(): 

264 found = False 

265 for e in self._entries: 

266 if (hostname in e.hostnames) and e.key.get_name() == key_type: 

267 # replace 

268 e.key = entry[key_type] 

269 found = True 

270 if not found: 

271 self._entries.append(HostKeyEntry([hostname], entry[key_type])) 

272 

273 def keys(self): 

274 ret = [] 

275 for e in self._entries: 

276 for h in e.hostnames: 

277 if h not in ret: 

278 ret.append(h) 

279 return ret 

280 

281 def values(self): 

282 ret = [] 

283 for k in self.keys(): 

284 ret.append(self.lookup(k)) 

285 return ret 

286 

287 @staticmethod 

288 def hash_host(hostname, salt=None): 

289 """ 

290 Return a "hashed" form of the hostname, as used by OpenSSH when storing 

291 hashed hostnames in the known_hosts file. 

292 

293 :param str hostname: the hostname to hash 

294 :param str salt: optional salt to use when hashing 

295 (must be 20 bytes long) 

296 :return: the hashed hostname as a `str` 

297 """ 

298 if salt is None: 

299 salt = os.urandom(sha1().digest_size) 

300 else: 

301 if salt.startswith("|1|"): 

302 salt = salt.split("|")[2] 

303 salt = decodebytes(b(salt)) 

304 assert len(salt) == sha1().digest_size 

305 hmac = HMAC(salt, b(hostname), sha1).digest() 

306 hostkey = "|1|{}|{}".format(u(encodebytes(salt)), u(encodebytes(hmac))) 

307 return hostkey.replace("\n", "") 

308 

309 

310class InvalidHostKey(Exception): 

311 def __init__(self, line, exc): 

312 self.line = line 

313 self.exc = exc 

314 self.args = (line, exc) 

315 

316 

317class HostKeyEntry: 

318 """ 

319 Representation of a line in an OpenSSH-style "known hosts" file. 

320 """ 

321 

322 def __init__(self, hostnames=None, key=None): 

323 self.valid = (hostnames is not None) and (key is not None) 

324 self.hostnames = hostnames 

325 self.key = key 

326 

327 @classmethod 

328 def from_line(cls, line, lineno=None): 

329 """ 

330 Parses the given line of text to find the names for the host, 

331 the type of key, and the key data. The line is expected to be in the 

332 format used by the OpenSSH known_hosts file. Fields are separated by a 

333 single space or tab. 

334 

335 Lines are expected to not have leading or trailing whitespace. 

336 We don't bother to check for comments or empty lines. All of 

337 that should be taken care of before sending the line to us. 

338 

339 :param str line: a line from an OpenSSH known_hosts file 

340 """ 

341 log = get_logger("paramiko.hostkeys") 

342 fields = re.split(" |\t", line) 

343 if len(fields) < 3: 

344 # Bad number of fields 

345 msg = "Not enough fields found in known_hosts in line {} ({!r})" 

346 log.info(msg.format(lineno, line)) 

347 return None 

348 fields = fields[:3] 

349 

350 names, keytype, key = fields 

351 names = names.split(",") 

352 

353 # Decide what kind of key we're looking at and create an object 

354 # to hold it accordingly. 

355 try: 

356 key = b(key) 

357 if keytype == "ssh-rsa": 

358 key = RSAKey(data=decodebytes(key)) 

359 elif keytype == "ssh-dss": 

360 key = DSSKey(data=decodebytes(key)) 

361 elif keytype in ECDSAKey.supported_key_format_identifiers(): 

362 key = ECDSAKey(data=decodebytes(key), validate_point=False) 

363 elif keytype == "ssh-ed25519": 

364 key = Ed25519Key(data=decodebytes(key)) 

365 else: 

366 log.info("Unable to handle key of type {}".format(keytype)) 

367 return None 

368 

369 except binascii.Error as e: 

370 raise InvalidHostKey(line, e) 

371 

372 return cls(names, key) 

373 

374 def to_line(self): 

375 """ 

376 Returns a string in OpenSSH known_hosts file format, or None if 

377 the object is not in a valid state. A trailing newline is 

378 included. 

379 """ 

380 if self.valid: 

381 return "{} {} {}\n".format( 

382 ",".join(self.hostnames), 

383 self.key.get_name(), 

384 self.key.get_base64(), 

385 ) 

386 return None 

387 

388 def __repr__(self): 

389 return "<HostKeyEntry {!r}: {!r}>".format(self.hostnames, self.key)