Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/util.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
2#
3# This file is part of paramiko.
4#
5# Paramiko is free software; you can redistribute it and/or modify it under the
6# terms of the GNU Lesser General Public License as published by the Free
7# Software Foundation; either version 2.1 of the License, or (at your option)
8# any later version.
9#
10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19"""
20Useful functions used by the rest of paramiko.
21"""
24import sys
25import struct
26import traceback
27import threading
28import logging
30from paramiko.common import (
31 DEBUG,
32 zero_byte,
33 xffffffff,
34 max_byte,
35 byte_ord,
36 byte_chr,
37)
38from paramiko.config import SSHConfig
41def inflate_long(s, always_positive=False):
42 """turns a normalized byte string into a long-int
43 (adapted from Crypto.Util.number)"""
44 out = 0
45 negative = 0
46 if not always_positive and (len(s) > 0) and (byte_ord(s[0]) >= 0x80):
47 negative = 1
48 if len(s) % 4:
49 filler = zero_byte
50 if negative:
51 filler = max_byte
52 # never convert this to ``s +=`` because this is a string, not a number
53 # noinspection PyAugmentAssignment
54 s = filler * (4 - len(s) % 4) + s
55 for i in range(0, len(s), 4):
56 out = (out << 32) + struct.unpack(">I", s[i : i + 4])[0]
57 if negative:
58 out -= 1 << (8 * len(s))
59 return out
62def deflate_long(n, add_sign_padding=True):
63 """turns a long-int into a normalized byte string
64 (adapted from Crypto.Util.number)"""
65 # after much testing, this algorithm was deemed to be the fastest
66 s = bytes()
67 n = int(n)
68 while (n != 0) and (n != -1):
69 s = struct.pack(">I", n & xffffffff) + s
70 n >>= 32
71 # strip off leading zeros, FFs
72 for i in enumerate(s):
73 if (n == 0) and (i[1] != 0):
74 break
75 if (n == -1) and (i[1] != 0xFF):
76 break
77 else:
78 # degenerate case, n was either 0 or -1
79 i = (0,)
80 if n == 0:
81 s = zero_byte
82 else:
83 s = max_byte
84 s = s[i[0] :]
85 if add_sign_padding:
86 if (n == 0) and (byte_ord(s[0]) >= 0x80):
87 s = zero_byte + s
88 if (n == -1) and (byte_ord(s[0]) < 0x80):
89 s = max_byte + s
90 return s
93def format_binary(data, prefix=""):
94 x = 0
95 out = []
96 while len(data) > x + 16:
97 out.append(format_binary_line(data[x : x + 16]))
98 x += 16
99 if x < len(data):
100 out.append(format_binary_line(data[x:]))
101 return [prefix + line for line in out]
104def format_binary_line(data):
105 left = " ".join(["{:02X}".format(byte_ord(c)) for c in data])
106 right = "".join(
107 [".{:c}..".format(byte_ord(c))[(byte_ord(c) + 63) // 95] for c in data]
108 )
109 return "{:50s} {}".format(left, right)
112def safe_string(s):
113 out = b""
114 for c in s:
115 i = byte_ord(c)
116 if 32 <= i <= 127:
117 out += byte_chr(i)
118 else:
119 out += b("%{:02X}".format(i))
120 return out
123def bit_length(n):
124 try:
125 return n.bit_length()
126 except AttributeError:
127 norm = deflate_long(n, False)
128 hbyte = byte_ord(norm[0])
129 if hbyte == 0:
130 return 1
131 bitlen = len(norm) * 8
132 while not (hbyte & 0x80):
133 hbyte <<= 1
134 bitlen -= 1
135 return bitlen
138def tb_strings():
139 return "".join(traceback.format_exception(*sys.exc_info())).split("\n")
142def generate_key_bytes(hash_alg, salt, key, nbytes):
143 """
144 Given a password, passphrase, or other human-source key, scramble it
145 through a secure hash into some keyworthy bytes. This specific algorithm
146 is used for encrypting/decrypting private key files.
148 :param function hash_alg: A function which creates a new hash object, such
149 as ``hashlib.sha256``.
150 :param salt: data to salt the hash with.
151 :type bytes salt: Hash salt bytes.
152 :param str key: human-entered password or passphrase.
153 :param int nbytes: number of bytes to generate.
154 :return: Key data, as `bytes`.
155 """
156 keydata = bytes()
157 digest = bytes()
158 if len(salt) > 8:
159 salt = salt[:8]
160 while nbytes > 0:
161 hash_obj = hash_alg()
162 if len(digest) > 0:
163 hash_obj.update(digest)
164 hash_obj.update(b(key))
165 hash_obj.update(salt)
166 digest = hash_obj.digest()
167 size = min(nbytes, len(digest))
168 keydata += digest[:size]
169 nbytes -= size
170 return keydata
173def load_host_keys(filename):
174 """
175 Read a file of known SSH host keys, in the format used by openssh, and
176 return a compound dict of ``hostname -> keytype ->`` `PKey
177 <paramiko.pkey.PKey>`. The hostname may be an IP address or DNS name. The
178 keytype will be either ``"ssh-rsa"`` or ``"ssh-dss"``.
180 This type of file unfortunately doesn't exist on Windows, but on posix,
181 it will usually be stored in ``os.path.expanduser("~/.ssh/known_hosts")``.
183 Since 1.5.3, this is just a wrapper around `.HostKeys`.
185 :param str filename: name of the file to read host keys from
186 :return:
187 nested dict of `.PKey` objects, indexed by hostname and then keytype
188 """
189 from paramiko.hostkeys import HostKeys
191 return HostKeys(filename)
194def parse_ssh_config(file_obj):
195 """
196 Provided only as a backward-compatible wrapper around `.SSHConfig`.
198 .. deprecated:: 2.7
199 Use `SSHConfig.from_file` instead.
200 """
201 config = SSHConfig()
202 config.parse(file_obj)
203 return config
206def lookup_ssh_host_config(hostname, config):
207 """
208 Provided only as a backward-compatible wrapper around `.SSHConfig`.
209 """
210 return config.lookup(hostname)
213def mod_inverse(x, m):
214 # it's crazy how small Python can make this function.
215 u1, u2, u3 = 1, 0, m
216 v1, v2, v3 = 0, 1, x
218 while v3 > 0:
219 q = u3 // v3
220 u1, v1 = v1, u1 - v1 * q
221 u2, v2 = v2, u2 - v2 * q
222 u3, v3 = v3, u3 - v3 * q
223 if u2 < 0:
224 u2 += m
225 return u2
228_g_thread_data = threading.local()
229_g_thread_counter = 0
230_g_thread_lock = threading.Lock()
233def get_thread_id():
234 global _g_thread_data, _g_thread_counter, _g_thread_lock
235 try:
236 return _g_thread_data.id
237 except AttributeError:
238 with _g_thread_lock:
239 _g_thread_counter += 1
240 _g_thread_data.id = _g_thread_counter
241 return _g_thread_data.id
244def log_to_file(filename, level=DEBUG):
245 """send paramiko logs to a logfile,
246 if they're not already going somewhere"""
247 logger = logging.getLogger("paramiko")
248 if len(logger.handlers) > 0:
249 return
250 logger.setLevel(level)
251 f = open(filename, "a")
252 handler = logging.StreamHandler(f)
253 frm = "%(levelname)-.3s [%(asctime)s.%(msecs)03d] thr=%(_threadid)-3d"
254 frm += " %(name)s: %(message)s"
255 handler.setFormatter(logging.Formatter(frm, "%Y%m%d-%H:%M:%S"))
256 logger.addHandler(handler)
259# make only one filter object, so it doesn't get applied more than once
260class PFilter:
261 def filter(self, record):
262 record._threadid = get_thread_id()
263 return True
266_pfilter = PFilter()
269def get_logger(name):
270 logger = logging.getLogger(name)
271 logger.addFilter(_pfilter)
272 return logger
275def constant_time_bytes_eq(a, b):
276 if len(a) != len(b):
277 return False
278 res = 0
279 # noinspection PyUnresolvedReferences
280 for i in range(len(a)): # noqa: F821
281 res |= byte_ord(a[i]) ^ byte_ord(b[i])
282 return res == 0
285class ClosingContextManager:
286 def __enter__(self):
287 return self
289 def __exit__(self, type, value, traceback):
290 self.close()
293def clamp_value(minimum, val, maximum):
294 return max(minimum, min(val, maximum))
297def asbytes(s):
298 """
299 Coerce to bytes if possible or return unchanged.
300 """
301 try:
302 # Attempt to run through our version of b(), which does the Right Thing
303 # for unicode strings vs bytestrings, and raises TypeError if it's not
304 # one of those types.
305 return b(s)
306 except TypeError:
307 try:
308 # If it wasn't a string/byte/buffer-ish object, try calling an
309 # asbytes() method, which many of our internal classes implement.
310 return s.asbytes()
311 except AttributeError:
312 # Finally, just do nothing & assume this object is sufficiently
313 # byte-y or buffer-y that everything will work out (or that callers
314 # are capable of handling whatever it is.)
315 return s
318# TODO: clean this up / force callers to assume bytes OR unicode
319def b(s, encoding="utf8"):
320 """cast unicode or bytes to bytes"""
321 if isinstance(s, bytes):
322 return s
323 elif isinstance(s, str):
324 return s.encode(encoding)
325 else:
326 raise TypeError(f"Expected unicode or bytes, got {type(s)}")
329# TODO: clean this up / force callers to assume bytes OR unicode
330def u(s, encoding="utf8"):
331 """cast bytes or unicode to unicode"""
332 if isinstance(s, bytes):
333 return s.decode(encoding)
334 elif isinstance(s, str):
335 return s
336 else:
337 raise TypeError(f"Expected unicode or bytes, got {type(s)}")