Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/util.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18 

19""" 

20Useful functions used by the rest of paramiko. 

21""" 

22 

23 

24import sys 

25import struct 

26import traceback 

27import threading 

28import logging 

29 

30from paramiko.common import ( 

31 DEBUG, 

32 zero_byte, 

33 xffffffff, 

34 max_byte, 

35 byte_ord, 

36 byte_chr, 

37) 

38from paramiko.config import SSHConfig 

39 

40 

41def inflate_long(s, always_positive=False): 

42 """turns a normalized byte string into a long-int 

43 (adapted from Crypto.Util.number)""" 

44 out = 0 

45 negative = 0 

46 if not always_positive and (len(s) > 0) and (byte_ord(s[0]) >= 0x80): 

47 negative = 1 

48 if len(s) % 4: 

49 filler = zero_byte 

50 if negative: 

51 filler = max_byte 

52 # never convert this to ``s +=`` because this is a string, not a number 

53 # noinspection PyAugmentAssignment 

54 s = filler * (4 - len(s) % 4) + s 

55 for i in range(0, len(s), 4): 

56 out = (out << 32) + struct.unpack(">I", s[i : i + 4])[0] 

57 if negative: 

58 out -= 1 << (8 * len(s)) 

59 return out 

60 

61 

62def deflate_long(n, add_sign_padding=True): 

63 """turns a long-int into a normalized byte string 

64 (adapted from Crypto.Util.number)""" 

65 # after much testing, this algorithm was deemed to be the fastest 

66 s = bytes() 

67 n = int(n) 

68 while (n != 0) and (n != -1): 

69 s = struct.pack(">I", n & xffffffff) + s 

70 n >>= 32 

71 # strip off leading zeros, FFs 

72 for i in enumerate(s): 

73 if (n == 0) and (i[1] != 0): 

74 break 

75 if (n == -1) and (i[1] != 0xFF): 

76 break 

77 else: 

78 # degenerate case, n was either 0 or -1 

79 i = (0,) 

80 if n == 0: 

81 s = zero_byte 

82 else: 

83 s = max_byte 

84 s = s[i[0] :] 

85 if add_sign_padding: 

86 if (n == 0) and (byte_ord(s[0]) >= 0x80): 

87 s = zero_byte + s 

88 if (n == -1) and (byte_ord(s[0]) < 0x80): 

89 s = max_byte + s 

90 return s 

91 

92 

93def format_binary(data, prefix=""): 

94 x = 0 

95 out = [] 

96 while len(data) > x + 16: 

97 out.append(format_binary_line(data[x : x + 16])) 

98 x += 16 

99 if x < len(data): 

100 out.append(format_binary_line(data[x:])) 

101 return [prefix + line for line in out] 

102 

103 

104def format_binary_line(data): 

105 left = " ".join(["{:02X}".format(byte_ord(c)) for c in data]) 

106 right = "".join( 

107 [".{:c}..".format(byte_ord(c))[(byte_ord(c) + 63) // 95] for c in data] 

108 ) 

109 return "{:50s} {}".format(left, right) 

110 

111 

112def safe_string(s): 

113 out = b"" 

114 for c in s: 

115 i = byte_ord(c) 

116 if 32 <= i <= 127: 

117 out += byte_chr(i) 

118 else: 

119 out += b("%{:02X}".format(i)) 

120 return out 

121 

122 

123def bit_length(n): 

124 try: 

125 return n.bit_length() 

126 except AttributeError: 

127 norm = deflate_long(n, False) 

128 hbyte = byte_ord(norm[0]) 

129 if hbyte == 0: 

130 return 1 

131 bitlen = len(norm) * 8 

132 while not (hbyte & 0x80): 

133 hbyte <<= 1 

134 bitlen -= 1 

135 return bitlen 

136 

137 

138def tb_strings(): 

139 return "".join(traceback.format_exception(*sys.exc_info())).split("\n") 

140 

141 

142def generate_key_bytes(hash_alg, salt, key, nbytes): 

143 """ 

144 Given a password, passphrase, or other human-source key, scramble it 

145 through a secure hash into some keyworthy bytes. This specific algorithm 

146 is used for encrypting/decrypting private key files. 

147 

148 :param function hash_alg: A function which creates a new hash object, such 

149 as ``hashlib.sha256``. 

150 :param salt: data to salt the hash with. 

151 :type bytes salt: Hash salt bytes. 

152 :param str key: human-entered password or passphrase. 

153 :param int nbytes: number of bytes to generate. 

154 :return: Key data, as `bytes`. 

155 """ 

156 keydata = bytes() 

157 digest = bytes() 

158 if len(salt) > 8: 

159 salt = salt[:8] 

160 while nbytes > 0: 

161 hash_obj = hash_alg() 

162 if len(digest) > 0: 

163 hash_obj.update(digest) 

164 hash_obj.update(b(key)) 

165 hash_obj.update(salt) 

166 digest = hash_obj.digest() 

167 size = min(nbytes, len(digest)) 

168 keydata += digest[:size] 

169 nbytes -= size 

170 return keydata 

171 

172 

173def load_host_keys(filename): 

174 """ 

175 Read a file of known SSH host keys, in the format used by openssh, and 

176 return a compound dict of ``hostname -> keytype ->`` `PKey 

177 <paramiko.pkey.PKey>`. The hostname may be an IP address or DNS name. 

178 

179 This type of file unfortunately doesn't exist on Windows, but on posix, 

180 it will usually be stored in ``os.path.expanduser("~/.ssh/known_hosts")``. 

181 

182 Since 1.5.3, this is just a wrapper around `.HostKeys`. 

183 

184 :param str filename: name of the file to read host keys from 

185 :return: 

186 nested dict of `.PKey` objects, indexed by hostname and then keytype 

187 """ 

188 from paramiko.hostkeys import HostKeys 

189 

190 return HostKeys(filename) 

191 

192 

193def parse_ssh_config(file_obj): 

194 """ 

195 Provided only as a backward-compatible wrapper around `.SSHConfig`. 

196 

197 .. deprecated:: 2.7 

198 Use `SSHConfig.from_file` instead. 

199 """ 

200 config = SSHConfig() 

201 config.parse(file_obj) 

202 return config 

203 

204 

205def lookup_ssh_host_config(hostname, config): 

206 """ 

207 Provided only as a backward-compatible wrapper around `.SSHConfig`. 

208 """ 

209 return config.lookup(hostname) 

210 

211 

212def mod_inverse(x, m): 

213 # it's crazy how small Python can make this function. 

214 u1, u2, u3 = 1, 0, m 

215 v1, v2, v3 = 0, 1, x 

216 

217 while v3 > 0: 

218 q = u3 // v3 

219 u1, v1 = v1, u1 - v1 * q 

220 u2, v2 = v2, u2 - v2 * q 

221 u3, v3 = v3, u3 - v3 * q 

222 if u2 < 0: 

223 u2 += m 

224 return u2 

225 

226 

227_g_thread_data = threading.local() 

228_g_thread_counter = 0 

229_g_thread_lock = threading.Lock() 

230 

231 

232def get_thread_id(): 

233 global _g_thread_data, _g_thread_counter, _g_thread_lock 

234 try: 

235 return _g_thread_data.id 

236 except AttributeError: 

237 with _g_thread_lock: 

238 _g_thread_counter += 1 

239 _g_thread_data.id = _g_thread_counter 

240 return _g_thread_data.id 

241 

242 

243def log_to_file(filename, level=DEBUG): 

244 """send paramiko logs to a logfile, 

245 if they're not already going somewhere""" 

246 logger = logging.getLogger("paramiko") 

247 if len(logger.handlers) > 0: 

248 return 

249 logger.setLevel(level) 

250 f = open(filename, "a") 

251 handler = logging.StreamHandler(f) 

252 frm = "%(levelname)-.3s [%(asctime)s.%(msecs)03d] thr=%(_threadid)-3d" 

253 frm += " %(name)s: %(message)s" 

254 handler.setFormatter(logging.Formatter(frm, "%Y%m%d-%H:%M:%S")) 

255 logger.addHandler(handler) 

256 

257 

258# make only one filter object, so it doesn't get applied more than once 

259class PFilter: 

260 def filter(self, record): 

261 record._threadid = get_thread_id() 

262 return True 

263 

264 

265_pfilter = PFilter() 

266 

267 

268def get_logger(name): 

269 logger = logging.getLogger(name) 

270 logger.addFilter(_pfilter) 

271 return logger 

272 

273 

274def constant_time_bytes_eq(a, b): 

275 if len(a) != len(b): 

276 return False 

277 res = 0 

278 # noinspection PyUnresolvedReferences 

279 for i in range(len(a)): # noqa: F821 

280 res |= byte_ord(a[i]) ^ byte_ord(b[i]) 

281 return res == 0 

282 

283 

284class ClosingContextManager: 

285 def __enter__(self): 

286 return self 

287 

288 def __exit__(self, type, value, traceback): 

289 self.close() 

290 

291 

292def clamp_value(minimum, val, maximum): 

293 return max(minimum, min(val, maximum)) 

294 

295 

296def asbytes(s): 

297 """ 

298 Coerce to bytes if possible or return unchanged. 

299 """ 

300 try: 

301 # Attempt to run through our version of b(), which does the Right Thing 

302 # for unicode strings vs bytestrings, and raises TypeError if it's not 

303 # one of those types. 

304 return b(s) 

305 except TypeError: 

306 try: 

307 # If it wasn't a string/byte/buffer-ish object, try calling an 

308 # asbytes() method, which many of our internal classes implement. 

309 return s.asbytes() 

310 except AttributeError: 

311 # Finally, just do nothing & assume this object is sufficiently 

312 # byte-y or buffer-y that everything will work out (or that callers 

313 # are capable of handling whatever it is.) 

314 return s 

315 

316 

317# TODO: clean this up / force callers to assume bytes OR unicode 

318def b(s, encoding="utf8"): 

319 """cast unicode or bytes to bytes""" 

320 if isinstance(s, bytes): 

321 return s 

322 elif isinstance(s, str): 

323 return s.encode(encoding) 

324 else: 

325 raise TypeError(f"Expected unicode or bytes, got {type(s)}") 

326 

327 

328# TODO: clean this up / force callers to assume bytes OR unicode 

329def u(s, encoding="utf8"): 

330 """cast bytes or unicode to unicode""" 

331 if isinstance(s, bytes): 

332 return s.decode(encoding) 

333 elif isinstance(s, str): 

334 return s 

335 else: 

336 raise TypeError(f"Expected unicode or bytes, got {type(s)}")