Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/hostkeys.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

191 statements  

1# Copyright (C) 2006-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19 

20import binascii 

21import os 

22import re 

23from base64 import decodebytes, encodebytes 

24from collections.abc import MutableMapping 

25from hashlib import sha1 

26from hmac import HMAC 

27 

28from paramiko.pkey import PKey, UnknownKeyType 

29from paramiko.ssh_exception import SSHException 

30from paramiko.util import b, constant_time_bytes_eq, get_logger, u 

31 

32 

33class HostKeys(MutableMapping): 

34 """ 

35 Representation of an OpenSSH-style "known hosts" file. Host keys can be 

36 read from one or more files, and then individual hosts can be looked up to 

37 verify server keys during SSH negotiation. 

38 

39 A `.HostKeys` object can be treated like a dict; any dict lookup is 

40 equivalent to calling `lookup`. 

41 

42 .. versionadded:: 1.5.3 

43 """ 

44 

45 def __init__(self, filename=None): 

46 """ 

47 Create a new HostKeys object, optionally loading keys from an OpenSSH 

48 style host-key file. 

49 

50 :param str filename: filename to load host keys from, or ``None`` 

51 """ 

52 # emulate a dict of { hostname: { keytype: PKey } } 

53 self._entries = [] 

54 if filename is not None: 

55 self.load(filename) 

56 

57 def add(self, hostname, keytype, key): 

58 """ 

59 Add a host key entry to the table. Any existing entry for a 

60 ``(hostname, keytype)`` pair will be replaced. 

61 

62 :param str hostname: the hostname (or IP) to add 

63 :param str keytype: key type (in ``"ssh-<type>"`` format) 

64 :param .PKey key: the key to add 

65 """ 

66 for e in self._entries: 

67 if (hostname in e.hostnames) and (e.key.get_name() == keytype): 

68 e.key = key 

69 return 

70 self._entries.append(HostKeyEntry([hostname], key)) 

71 

72 def load(self, filename): 

73 """ 

74 Read a file of known SSH host keys, in the format used by OpenSSH. 

75 This type of file unfortunately doesn't exist on Windows, but on 

76 posix, it will usually be stored in 

77 ``os.path.expanduser("~/.ssh/known_hosts")``. 

78 

79 If this method is called multiple times, the host keys are merged, 

80 not cleared. So multiple calls to `load` will just call `add`, 

81 replacing any existing entries and adding new ones. 

82 

83 :param str filename: name of the file to read host keys from 

84 

85 :raises: ``IOError`` -- if there was an error reading the file 

86 """ 

87 with open(filename, "r") as f: 

88 for lineno, line in enumerate(f, 1): 

89 line = line.strip() 

90 if (len(line) == 0) or (line[0] == "#"): 

91 continue 

92 try: 

93 entry = HostKeyEntry.from_line(line, lineno) 

94 except SSHException: 

95 continue 

96 if entry is not None: 

97 _hostnames = entry.hostnames 

98 for h in _hostnames: 

99 if self.check(h, entry.key): 

100 entry.hostnames.remove(h) 

101 if len(entry.hostnames): 

102 self._entries.append(entry) 

103 

104 def save(self, filename): 

105 """ 

106 Save host keys into a file, in the format used by OpenSSH. The order 

107 of keys in the file will be preserved when possible (if these keys were 

108 loaded from a file originally). The single exception is that combined 

109 lines will be split into individual key lines, which is arguably a bug. 

110 

111 :param str filename: name of the file to write 

112 

113 :raises: ``IOError`` -- if there was an error writing the file 

114 

115 .. versionadded:: 1.6.1 

116 """ 

117 with open(filename, "w") as f: 

118 for e in self._entries: 

119 line = e.to_line() 

120 if line: 

121 f.write(line) 

122 

123 def lookup(self, hostname): 

124 """ 

125 Find a hostkey entry for a given hostname or IP. If no entry is found, 

126 ``None`` is returned. Otherwise a dictionary of keytype to key is 

127 returned. 

128 

129 :param str hostname: the hostname (or IP) to lookup 

130 :return: dict of `str` -> `.PKey` keys associated with this host 

131 (or ``None``) 

132 """ 

133 

134 class SubDict(MutableMapping): 

135 def __init__(self, hostname, entries, hostkeys): 

136 self._hostname = hostname 

137 self._entries = entries 

138 self._hostkeys = hostkeys 

139 

140 def __iter__(self): 

141 for k in self.keys(): 

142 yield k 

143 

144 def __len__(self): 

145 return len(self.keys()) 

146 

147 def __delitem__(self, key): 

148 for e in list(self._entries): 

149 if e.key.get_name() == key: 

150 self._entries.remove(e) 

151 break 

152 else: 

153 raise KeyError(key) 

154 

155 def __getitem__(self, key): 

156 for e in self._entries: 

157 if e.key.get_name() == key: 

158 return e.key 

159 raise KeyError(key) 

160 

161 def __setitem__(self, key, val): 

162 for e in self._entries: 

163 if e.key is None: 

164 continue 

165 if e.key.get_name() == key: 

166 # replace 

167 e.key = val 

168 break 

169 else: 

170 # add a new one 

171 e = HostKeyEntry([hostname], val) 

172 self._entries.append(e) 

173 self._hostkeys._entries.append(e) 

174 

175 def keys(self): 

176 return [ 

177 e.key.get_name() 

178 for e in self._entries 

179 if e.key is not None 

180 ] 

181 

182 entries = [] 

183 for e in self._entries: 

184 if self._hostname_matches(hostname, e): 

185 entries.append(e) 

186 if len(entries) == 0: 

187 return None 

188 return SubDict(hostname, entries, self) 

189 

190 def _hostname_matches(self, hostname, entry): 

191 """ 

192 Tests whether ``hostname`` string matches given SubDict ``entry``. 

193 

194 :returns bool: 

195 """ 

196 for h in entry.hostnames: 

197 if ( 

198 h == hostname 

199 or h.startswith("|1|") 

200 and not hostname.startswith("|1|") 

201 and constant_time_bytes_eq(self.hash_host(hostname, h), h) 

202 ): 

203 return True 

204 return False 

205 

206 def check(self, hostname, key): 

207 """ 

208 Return True if the given key is associated with the given hostname 

209 in this dictionary. 

210 

211 :param str hostname: hostname (or IP) of the SSH server 

212 :param .PKey key: the key to check 

213 :return: 

214 ``True`` if the key is associated with the hostname; else ``False`` 

215 """ 

216 k = self.lookup(hostname) 

217 if k is None: 

218 return False 

219 host_key = k.get(key.get_name(), None) 

220 if host_key is None: 

221 return False 

222 return host_key.asbytes() == key.asbytes() 

223 

224 def clear(self): 

225 """ 

226 Remove all host keys from the dictionary. 

227 """ 

228 self._entries = [] 

229 

230 def __iter__(self): 

231 for k in self.keys(): 

232 yield k 

233 

234 def __len__(self): 

235 return len(self.keys()) 

236 

237 def __getitem__(self, key): 

238 ret = self.lookup(key) 

239 if ret is None: 

240 raise KeyError(key) 

241 return ret 

242 

243 def __delitem__(self, key): 

244 index = None 

245 for i, entry in enumerate(self._entries): 

246 if self._hostname_matches(key, entry): 

247 index = i 

248 break 

249 if index is None: 

250 raise KeyError(key) 

251 self._entries.pop(index) 

252 

253 def __setitem__(self, hostname, entry): 

254 # don't use this please. 

255 if len(entry) == 0: 

256 self._entries.append(HostKeyEntry([hostname], None)) 

257 return 

258 for key_type in entry.keys(): 

259 found = False 

260 for e in self._entries: 

261 if (hostname in e.hostnames) and e.key.get_name() == key_type: 

262 # replace 

263 e.key = entry[key_type] 

264 found = True 

265 if not found: 

266 self._entries.append(HostKeyEntry([hostname], entry[key_type])) 

267 

268 def keys(self): 

269 ret = [] 

270 for e in self._entries: 

271 for h in e.hostnames: 

272 if h not in ret: 

273 ret.append(h) 

274 return ret 

275 

276 def values(self): 

277 ret = [] 

278 for k in self.keys(): 

279 ret.append(self.lookup(k)) 

280 return ret 

281 

282 @staticmethod 

283 def hash_host(hostname, salt=None): 

284 """ 

285 Return a "hashed" form of the hostname, as used by OpenSSH when storing 

286 hashed hostnames in the known_hosts file. 

287 

288 :param str hostname: the hostname to hash 

289 :param str salt: optional salt to use when hashing 

290 (must be 20 bytes long) 

291 :return: the hashed hostname as a `str` 

292 """ 

293 if salt is None: 

294 salt = os.urandom(sha1().digest_size) 

295 else: 

296 if salt.startswith("|1|"): 

297 salt = salt.split("|")[2] 

298 salt = decodebytes(b(salt)) 

299 assert len(salt) == sha1().digest_size 

300 hmac = HMAC(salt, b(hostname), sha1).digest() 

301 hostkey = "|1|{}|{}".format(u(encodebytes(salt)), u(encodebytes(hmac))) 

302 return hostkey.replace("\n", "") 

303 

304 

305class InvalidHostKey(Exception): 

306 def __init__(self, line, exc): 

307 self.line = line 

308 self.exc = exc 

309 self.args = (line, exc) 

310 

311 

312class HostKeyEntry: 

313 """ 

314 Representation of a line in an OpenSSH-style "known hosts" file. 

315 """ 

316 

317 def __init__(self, hostnames=None, key=None): 

318 self.valid = (hostnames is not None) and (key is not None) 

319 self.hostnames = hostnames 

320 self.key = key 

321 

322 @classmethod 

323 def from_line(cls, line, lineno=None): 

324 """ 

325 Parses the given line of text to find the names for the host, 

326 the type of key, and the key data. The line is expected to be in the 

327 format used by the OpenSSH known_hosts file. Fields are separated by a 

328 single space or tab. 

329 

330 Lines are expected to not have leading or trailing whitespace. 

331 We don't bother to check for comments or empty lines. All of 

332 that should be taken care of before sending the line to us. 

333 

334 :param str line: a line from an OpenSSH known_hosts file 

335 """ 

336 log = get_logger("paramiko.hostkeys") 

337 fields = re.split(" |\t", line) 

338 if len(fields) < 3: 

339 # Bad number of fields 

340 msg = "Not enough fields found in known_hosts in line {} ({!r})" 

341 log.info(msg.format(lineno, line)) 

342 return None 

343 fields = fields[:3] 

344 

345 names, key_type, key = fields 

346 names = names.split(",") 

347 

348 # Decide what kind of key we're looking at and create an object 

349 # to hold it accordingly. 

350 try: 

351 # TODO: this grew organically and doesn't seem /wrong/ per se (file 

352 # read -> unicode str -> bytes for base64 decode -> decoded bytes); 

353 # but in Python 3 forever land, can we simply use 

354 # `base64.b64decode(str-from-file)` here? 

355 key_bytes = decodebytes(b(key)) 

356 except binascii.Error as e: 

357 raise InvalidHostKey(line, e) 

358 

359 try: 

360 return cls(names, PKey.from_type_string(key_type, key_bytes)) 

361 except UnknownKeyType: 

362 # TODO (backwards incompat): consider changing HostKeys API so this 

363 # just raises naturally and the exception is muted higher up in the 

364 # stack? 

365 log.info("Unable to handle key of type {}".format(key_type)) 

366 return None 

367 

368 def to_line(self): 

369 """ 

370 Returns a string in OpenSSH known_hosts file format, or None if 

371 the object is not in a valid state. A trailing newline is 

372 included. 

373 """ 

374 if self.valid: 

375 return "{} {} {}\n".format( 

376 ",".join(self.hostnames), 

377 self.key.get_name(), 

378 self.key.get_base64(), 

379 ) 

380 return None 

381 

382 def __repr__(self): 

383 return "<HostKeyEntry {!r}: {!r}>".format(self.hostnames, self.key)