Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/util.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20Useful functions used by the rest of paramiko.
21"""
24import sys
25import struct
26import traceback
27import threading
28import logging
30from paramiko.common import (
31 DEBUG,
32 zero_byte,
33 xffffffff,
34 max_byte,
35 byte_ord,
36 byte_chr,
37)
38from paramiko.config import SSHConfig
41def inflate_long(s, always_positive=False):
42 """turns a normalized byte string into a long-int
43 (adapted from Crypto.Util.number)"""
44 out = 0
45 negative = 0
46 if not always_positive and (len(s) > 0) and (byte_ord(s[0]) >= 0x80):
47 negative = 1
48 if len(s) % 4:
49 filler = zero_byte
50 if negative:
51 filler = max_byte
52 # never convert this to ``s +=`` because this is a string, not a number
53 # noinspection PyAugmentAssignment
54 s = filler * (4 - len(s) % 4) + s
55 for i in range(0, len(s), 4):
56 out = (out << 32) + struct.unpack(">I", s[i : i + 4])[0]
57 if negative:
58 out -= 1 << (8 * len(s))
59 return out
62def deflate_long(n, add_sign_padding=True):
63 """turns a long-int into a normalized byte string
64 (adapted from Crypto.Util.number)"""
65 # after much testing, this algorithm was deemed to be the fastest
66 s = bytes()
67 n = int(n)
68 while (n != 0) and (n != -1):
69 s = struct.pack(">I", n & xffffffff) + s
70 n >>= 32
71 # strip off leading zeros, FFs
72 for i in enumerate(s):
73 if (n == 0) and (i[1] != 0):
74 break
75 if (n == -1) and (i[1] != 0xFF):
76 break
77 else:
78 # degenerate case, n was either 0 or -1
79 i = (0,)
80 if n == 0:
81 s = zero_byte
82 else:
83 s = max_byte
84 s = s[i[0] :]
85 if add_sign_padding:
86 if (n == 0) and (byte_ord(s[0]) >= 0x80):
87 s = zero_byte + s
88 if (n == -1) and (byte_ord(s[0]) < 0x80):
89 s = max_byte + s
90 return s
93def format_binary(data, prefix=""):
94 x = 0
95 out = []
96 while len(data) > x + 16:
97 out.append(format_binary_line(data[x : x + 16]))
98 x += 16
99 if x < len(data):
100 out.append(format_binary_line(data[x:]))
101 return [prefix + line for line in out]
104def format_binary_line(data):
105 left = " ".join(["{:02X}".format(byte_ord(c)) for c in data])
106 right = "".join(
107 [".{:c}..".format(byte_ord(c))[(byte_ord(c) + 63) // 95] for c in data]
108 )
109 return "{:50s} {}".format(left, right)
112def safe_string(s):
113 out = b""
114 for c in s:
115 i = byte_ord(c)
116 if 32 <= i <= 127:
117 out += byte_chr(i)
118 else:
119 out += b("%{:02X}".format(i))
120 return out
123def bit_length(n):
124 try:
125 return n.bit_length()
126 except AttributeError:
127 norm = deflate_long(n, False)
128 hbyte = byte_ord(norm[0])
129 if hbyte == 0:
130 return 1
131 bitlen = len(norm) * 8
132 while not (hbyte & 0x80):
133 hbyte <<= 1
134 bitlen -= 1
135 return bitlen
138def tb_strings():
139 return "".join(traceback.format_exception(*sys.exc_info())).split("\n")
142def generate_key_bytes(hash_alg, salt, key, nbytes):
143 """
144 Given a password, passphrase, or other human-source key, scramble it
145 through a secure hash into some keyworthy bytes. This specific algorithm
146 is used for encrypting/decrypting private key files.
148 :param function hash_alg: A function which creates a new hash object, such
149 as ``hashlib.sha256``.
150 :param salt: data to salt the hash with.
151 :type bytes salt: Hash salt bytes.
152 :param str key: human-entered password or passphrase.
153 :param int nbytes: number of bytes to generate.
154 :return: Key data, as `bytes`.
155 """
156 keydata = bytes()
157 digest = bytes()
158 if len(salt) > 8:
159 salt = salt[:8]
160 while nbytes > 0:
161 hash_obj = hash_alg()
162 if len(digest) > 0:
163 hash_obj.update(digest)
164 hash_obj.update(b(key))
165 hash_obj.update(salt)
166 digest = hash_obj.digest()
167 size = min(nbytes, len(digest))
168 keydata += digest[:size]
169 nbytes -= size
170 return keydata
173def load_host_keys(filename):
174 """
175 Read a file of known SSH host keys, in the format used by openssh, and
176 return a compound dict of ``hostname -> keytype ->`` `PKey
177 <paramiko.pkey.PKey>`. The hostname may be an IP address or DNS name.
179 This type of file unfortunately doesn't exist on Windows, but on posix,
180 it will usually be stored in ``os.path.expanduser("~/.ssh/known_hosts")``.
182 Since 1.5.3, this is just a wrapper around `.HostKeys`.
184 :param str filename: name of the file to read host keys from
185 :return:
186 nested dict of `.PKey` objects, indexed by hostname and then keytype
187 """
188 from paramiko.hostkeys import HostKeys
190 return HostKeys(filename)
193def parse_ssh_config(file_obj):
194 """
195 Provided only as a backward-compatible wrapper around `.SSHConfig`.
197 .. deprecated:: 2.7
198 Use `SSHConfig.from_file` instead.
199 """
200 config = SSHConfig()
201 config.parse(file_obj)
202 return config
205def lookup_ssh_host_config(hostname, config):
206 """
207 Provided only as a backward-compatible wrapper around `.SSHConfig`.
208 """
209 return config.lookup(hostname)
212def mod_inverse(x, m):
213 # it's crazy how small Python can make this function.
214 u1, u2, u3 = 1, 0, m
215 v1, v2, v3 = 0, 1, x
217 while v3 > 0:
218 q = u3 // v3
219 u1, v1 = v1, u1 - v1 * q
220 u2, v2 = v2, u2 - v2 * q
221 u3, v3 = v3, u3 - v3 * q
222 if u2 < 0:
223 u2 += m
224 return u2
227_g_thread_data = threading.local()
228_g_thread_counter = 0
229_g_thread_lock = threading.Lock()
232def get_thread_id():
233 global _g_thread_data, _g_thread_counter, _g_thread_lock
234 try:
235 return _g_thread_data.id
236 except AttributeError:
237 with _g_thread_lock:
238 _g_thread_counter += 1
239 _g_thread_data.id = _g_thread_counter
240 return _g_thread_data.id
243def log_to_file(filename, level=DEBUG):
244 """send paramiko logs to a logfile,
245 if they're not already going somewhere"""
246 logger = logging.getLogger("paramiko")
247 if len(logger.handlers) > 0:
248 return
249 logger.setLevel(level)
250 f = open(filename, "a")
251 handler = logging.StreamHandler(f)
252 frm = "%(levelname)-.3s [%(asctime)s.%(msecs)03d] thr=%(_threadid)-3d"
253 frm += " %(name)s: %(message)s"
254 handler.setFormatter(logging.Formatter(frm, "%Y%m%d-%H:%M:%S"))
255 logger.addHandler(handler)
258# make only one filter object, so it doesn't get applied more than once
259class PFilter:
260 def filter(self, record):
261 record._threadid = get_thread_id()
262 return True
265_pfilter = PFilter()
268def get_logger(name):
269 logger = logging.getLogger(name)
270 logger.addFilter(_pfilter)
271 return logger
274def constant_time_bytes_eq(a, b):
275 if len(a) != len(b):
276 return False
277 res = 0
278 # noinspection PyUnresolvedReferences
279 for i in range(len(a)): # noqa: F821
280 res |= byte_ord(a[i]) ^ byte_ord(b[i])
281 return res == 0
284class ClosingContextManager:
285 def __enter__(self):
286 return self
288 def __exit__(self, type, value, traceback):
289 self.close()
292def clamp_value(minimum, val, maximum):
293 return max(minimum, min(val, maximum))
296def asbytes(s):
297 """
298 Coerce to bytes if possible or return unchanged.
299 """
300 try:
301 # Attempt to run through our version of b(), which does the Right Thing
302 # for unicode strings vs bytestrings, and raises TypeError if it's not
303 # one of those types.
304 return b(s)
305 except TypeError:
306 try:
307 # If it wasn't a string/byte/buffer-ish object, try calling an
308 # asbytes() method, which many of our internal classes implement.
309 return s.asbytes()
310 except AttributeError:
311 # Finally, just do nothing & assume this object is sufficiently
312 # byte-y or buffer-y that everything will work out (or that callers
313 # are capable of handling whatever it is.)
314 return s
317# TODO: clean this up / force callers to assume bytes OR unicode
318def b(s, encoding="utf8"):
319 """cast unicode or bytes to bytes"""
320 if isinstance(s, bytes):
321 return s
322 elif isinstance(s, str):
323 return s.encode(encoding)
324 else:
325 raise TypeError(f"Expected unicode or bytes, got {type(s)}")
328# TODO: clean this up / force callers to assume bytes OR unicode
329def u(s, encoding="utf8"):
330 """cast bytes or unicode to unicode"""
331 if isinstance(s, bytes):
332 return s.decode(encoding)
333 elif isinstance(s, str):
334 return s
335 else:
336 raise TypeError(f"Expected unicode or bytes, got {type(s)}")