Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/util.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20Useful functions used by the rest of paramiko. 

21""" 

22 

23 

24import sys 

25import struct 

26import traceback 

27import threading 

28import logging 

29 

30from paramiko.common import ( 

31 DEBUG, 

32 zero_byte, 

33 xffffffff, 

34 max_byte, 

35 byte_ord, 

36 byte_chr, 

37) 

38from paramiko.config import SSHConfig 

39 

40 

41def inflate_long(s, always_positive=False): 

42 """turns a normalized byte string into a long-int 

43 (adapted from Crypto.Util.number)""" 

44 out = 0 

45 negative = 0 

46 if not always_positive and (len(s) > 0) and (byte_ord(s[0]) >= 0x80): 

47 negative = 1 

48 if len(s) % 4: 

49 filler = zero_byte 

50 if negative: 

51 filler = max_byte 

52 # never convert this to ``s +=`` because this is a string, not a number 

53 # noinspection PyAugmentAssignment 

54 s = filler * (4 - len(s) % 4) + s 

55 for i in range(0, len(s), 4): 

56 out = (out << 32) + struct.unpack(">I", s[i : i + 4])[0] 

57 if negative: 

58 out -= 1 << (8 * len(s)) 

59 return out 

60 

61 

62def deflate_long(n, add_sign_padding=True): 

63 """turns a long-int into a normalized byte string 

64 (adapted from Crypto.Util.number)""" 

65 # after much testing, this algorithm was deemed to be the fastest 

66 s = bytes() 

67 n = int(n) 

68 while (n != 0) and (n != -1): 

69 s = struct.pack(">I", n & xffffffff) + s 

70 n >>= 32 

71 # strip off leading zeros, FFs 

72 for i in enumerate(s): 

73 if (n == 0) and (i[1] != 0): 

74 break 

75 if (n == -1) and (i[1] != 0xFF): 

76 break 

77 else: 

78 # degenerate case, n was either 0 or -1 

79 i = (0,) 

80 if n == 0: 

81 s = zero_byte 

82 else: 

83 s = max_byte 

84 s = s[i[0] :] 

85 if add_sign_padding: 

86 if (n == 0) and (byte_ord(s[0]) >= 0x80): 

87 s = zero_byte + s 

88 if (n == -1) and (byte_ord(s[0]) < 0x80): 

89 s = max_byte + s 

90 return s 

91 

92 

93def format_binary(data, prefix=""): 

94 x = 0 

95 out = [] 

96 while len(data) > x + 16: 

97 out.append(format_binary_line(data[x : x + 16])) 

98 x += 16 

99 if x < len(data): 

100 out.append(format_binary_line(data[x:])) 

101 return [prefix + line for line in out] 

102 

103 

104def format_binary_line(data): 

105 left = " ".join(["{:02X}".format(byte_ord(c)) for c in data]) 

106 right = "".join( 

107 [".{:c}..".format(byte_ord(c))[(byte_ord(c) + 63) // 95] for c in data] 

108 ) 

109 return "{:50s} {}".format(left, right) 

110 

111 

112def safe_string(s): 

113 out = b"" 

114 for c in s: 

115 i = byte_ord(c) 

116 if 32 <= i <= 127: 

117 out += byte_chr(i) 

118 else: 

119 out += b("%{:02X}".format(i)) 

120 return out 

121 

122 

123def bit_length(n): 

124 try: 

125 return n.bit_length() 

126 except AttributeError: 

127 norm = deflate_long(n, False) 

128 hbyte = byte_ord(norm[0]) 

129 if hbyte == 0: 

130 return 1 

131 bitlen = len(norm) * 8 

132 while not (hbyte & 0x80): 

133 hbyte <<= 1 

134 bitlen -= 1 

135 return bitlen 

136 

137 

138def tb_strings(): 

139 return "".join(traceback.format_exception(*sys.exc_info())).split("\n") 

140 

141 

142def generate_key_bytes(hash_alg, salt, key, nbytes): 

143 """ 

144 Given a password, passphrase, or other human-source key, scramble it 

145 through a secure hash into some keyworthy bytes. This specific algorithm 

146 is used for encrypting/decrypting private key files. 

147 

148 :param function hash_alg: A function which creates a new hash object, such 

149 as ``hashlib.sha256``. 

150 :param salt: data to salt the hash with. 

151 :type bytes salt: Hash salt bytes. 

152 :param str key: human-entered password or passphrase. 

153 :param int nbytes: number of bytes to generate. 

154 :return: Key data, as `bytes`. 

155 """ 

156 keydata = bytes() 

157 digest = bytes() 

158 if len(salt) > 8: 

159 salt = salt[:8] 

160 while nbytes > 0: 

161 hash_obj = hash_alg() 

162 if len(digest) > 0: 

163 hash_obj.update(digest) 

164 hash_obj.update(b(key)) 

165 hash_obj.update(salt) 

166 digest = hash_obj.digest() 

167 size = min(nbytes, len(digest)) 

168 keydata += digest[:size] 

169 nbytes -= size 

170 return keydata 

171 

172 

173def load_host_keys(filename): 

174 """ 

175 Read a file of known SSH host keys, in the format used by openssh, and 

176 return a compound dict of ``hostname -> keytype ->`` `PKey 

177 <paramiko.pkey.PKey>`. The hostname may be an IP address or DNS name. The 

178 keytype will be either ``"ssh-rsa"`` or ``"ssh-dss"``. 

179 

180 This type of file unfortunately doesn't exist on Windows, but on posix, 

181 it will usually be stored in ``os.path.expanduser("~/.ssh/known_hosts")``. 

182 

183 Since 1.5.3, this is just a wrapper around `.HostKeys`. 

184 

185 :param str filename: name of the file to read host keys from 

186 :return: 

187 nested dict of `.PKey` objects, indexed by hostname and then keytype 

188 """ 

189 from paramiko.hostkeys import HostKeys 

190 

191 return HostKeys(filename) 

192 

193 

194def parse_ssh_config(file_obj): 

195 """ 

196 Provided only as a backward-compatible wrapper around `.SSHConfig`. 

197 

198 .. deprecated:: 2.7 

199 Use `SSHConfig.from_file` instead. 

200 """ 

201 config = SSHConfig() 

202 config.parse(file_obj) 

203 return config 

204 

205 

206def lookup_ssh_host_config(hostname, config): 

207 """ 

208 Provided only as a backward-compatible wrapper around `.SSHConfig`. 

209 """ 

210 return config.lookup(hostname) 

211 

212 

213def mod_inverse(x, m): 

214 # it's crazy how small Python can make this function. 

215 u1, u2, u3 = 1, 0, m 

216 v1, v2, v3 = 0, 1, x 

217 

218 while v3 > 0: 

219 q = u3 // v3 

220 u1, v1 = v1, u1 - v1 * q 

221 u2, v2 = v2, u2 - v2 * q 

222 u3, v3 = v3, u3 - v3 * q 

223 if u2 < 0: 

224 u2 += m 

225 return u2 

226 

227 

228_g_thread_data = threading.local() 

229_g_thread_counter = 0 

230_g_thread_lock = threading.Lock() 

231 

232 

233def get_thread_id(): 

234 global _g_thread_data, _g_thread_counter, _g_thread_lock 

235 try: 

236 return _g_thread_data.id 

237 except AttributeError: 

238 with _g_thread_lock: 

239 _g_thread_counter += 1 

240 _g_thread_data.id = _g_thread_counter 

241 return _g_thread_data.id 

242 

243 

244def log_to_file(filename, level=DEBUG): 

245 """send paramiko logs to a logfile, 

246 if they're not already going somewhere""" 

247 logger = logging.getLogger("paramiko") 

248 if len(logger.handlers) > 0: 

249 return 

250 logger.setLevel(level) 

251 f = open(filename, "a") 

252 handler = logging.StreamHandler(f) 

253 frm = "%(levelname)-.3s [%(asctime)s.%(msecs)03d] thr=%(_threadid)-3d" 

254 frm += " %(name)s: %(message)s" 

255 handler.setFormatter(logging.Formatter(frm, "%Y%m%d-%H:%M:%S")) 

256 logger.addHandler(handler) 

257 

258 

259# make only one filter object, so it doesn't get applied more than once 

260class PFilter: 

261 def filter(self, record): 

262 record._threadid = get_thread_id() 

263 return True 

264 

265 

266_pfilter = PFilter() 

267 

268 

269def get_logger(name): 

270 logger = logging.getLogger(name) 

271 logger.addFilter(_pfilter) 

272 return logger 

273 

274 

275def constant_time_bytes_eq(a, b): 

276 if len(a) != len(b): 

277 return False 

278 res = 0 

279 # noinspection PyUnresolvedReferences 

280 for i in range(len(a)): # noqa: F821 

281 res |= byte_ord(a[i]) ^ byte_ord(b[i]) 

282 return res == 0 

283 

284 

285class ClosingContextManager: 

286 def __enter__(self): 

287 return self 

288 

289 def __exit__(self, type, value, traceback): 

290 self.close() 

291 

292 

293def clamp_value(minimum, val, maximum): 

294 return max(minimum, min(val, maximum)) 

295 

296 

297def asbytes(s): 

298 """ 

299 Coerce to bytes if possible or return unchanged. 

300 """ 

301 try: 

302 # Attempt to run through our version of b(), which does the Right Thing 

303 # for unicode strings vs bytestrings, and raises TypeError if it's not 

304 # one of those types. 

305 return b(s) 

306 except TypeError: 

307 try: 

308 # If it wasn't a string/byte/buffer-ish object, try calling an 

309 # asbytes() method, which many of our internal classes implement. 

310 return s.asbytes() 

311 except AttributeError: 

312 # Finally, just do nothing & assume this object is sufficiently 

313 # byte-y or buffer-y that everything will work out (or that callers 

314 # are capable of handling whatever it is.) 

315 return s 

316 

317 

318# TODO: clean this up / force callers to assume bytes OR unicode 

319def b(s, encoding="utf8"): 

320 """cast unicode or bytes to bytes""" 

321 if isinstance(s, bytes): 

322 return s 

323 elif isinstance(s, str): 

324 return s.encode(encoding) 

325 else: 

326 raise TypeError(f"Expected unicode or bytes, got {type(s)}") 

327 

328 

329# TODO: clean this up / force callers to assume bytes OR unicode 

330def u(s, encoding="utf8"): 

331 """cast bytes or unicode to unicode""" 

332 if isinstance(s, bytes): 

333 return s.decode(encoding) 

334 elif isinstance(s, str): 

335 return s 

336 else: 

337 raise TypeError(f"Expected unicode or bytes, got {type(s)}")