Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/smart_open/utils.py: 24%
54 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:57 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:57 +0000
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com>
4#
5# This code is distributed under the terms and conditions
6# from the MIT License (MIT).
7#
9"""Helper functions for documentation, etc."""
11import inspect
12import logging
13import urllib.parse
15logger = logging.getLogger(__name__)
17WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs']
18QUESTION_MARK_PLACEHOLDER = '///smart_open.utils.QUESTION_MARK_PLACEHOLDER///'
21def inspect_kwargs(kallable):
22 #
23 # inspect.getargspec got deprecated in Py3.4, and calling it spews
24 # deprecation warnings that we'd prefer to avoid. Unfortunately, older
25 # versions of Python (<3.3) did not have inspect.signature, so we need to
26 # handle them the old-fashioned getargspec way.
27 #
28 try:
29 signature = inspect.signature(kallable)
30 except AttributeError:
31 try:
32 args, varargs, keywords, defaults = inspect.getargspec(kallable)
33 except TypeError:
34 #
35 # Happens under Py2.7 with mocking.
36 #
37 return {}
39 if not defaults:
40 return {}
41 supported_keywords = args[-len(defaults):]
42 return dict(zip(supported_keywords, defaults))
43 else:
44 return {
45 name: param.default
46 for name, param in signature.parameters.items()
47 if param.default != inspect.Parameter.empty
48 }
51def check_kwargs(kallable, kwargs):
52 """Check which keyword arguments the callable supports.
54 Parameters
55 ----------
56 kallable: callable
57 A function or method to test
58 kwargs: dict
59 The keyword arguments to check. If the callable doesn't support any
60 of these, a warning message will get printed.
62 Returns
63 -------
64 dict
65 A dictionary of argument names and values supported by the callable.
66 """
67 supported_keywords = sorted(inspect_kwargs(kallable))
68 unsupported_keywords = [k for k in sorted(kwargs) if k not in supported_keywords]
69 supported_kwargs = {k: v for (k, v) in kwargs.items() if k in supported_keywords}
71 if unsupported_keywords:
72 logger.warning('ignoring unsupported keyword arguments: %r', unsupported_keywords)
74 return supported_kwargs
77def clamp(value, minval=0, maxval=None):
78 """Clamp a numeric value to a specific range.
80 Parameters
81 ----------
82 value: numeric
83 The value to clamp.
85 minval: numeric
86 The lower bound.
88 maxval: numeric
89 The upper bound.
91 Returns
92 -------
93 numeric
94 The clamped value. It will be in the range ``[minval, maxval]``.
96 """
97 if maxval is not None:
98 value = min(value, maxval)
99 value = max(value, minval)
100 return value
103def make_range_string(start=None, stop=None):
104 """Create a byte range specifier in accordance with RFC-2616.
106 Parameters
107 ----------
108 start: int, optional
109 The start of the byte range. If unspecified, stop indicated offset from EOF.
111 stop: int, optional
112 The end of the byte range. If unspecified, indicates EOF.
114 Returns
115 -------
116 str
117 A byte range specifier.
119 """
120 #
121 # https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
122 #
123 if start is None and stop is None:
124 raise ValueError("make_range_string requires either a stop or start value")
125 start_str = '' if start is None else str(start)
126 stop_str = '' if stop is None else str(stop)
127 return 'bytes=%s-%s' % (start_str, stop_str)
130def parse_content_range(content_range):
131 """Extract units, start, stop, and length from a content range header like "bytes 0-846981/846982".
133 Assumes a properly formatted content-range header from S3.
134 See werkzeug.http.parse_content_range_header for a more robust version.
136 Parameters
137 ----------
138 content_range: str
139 The content-range header to parse.
141 Returns
142 -------
143 tuple (units: str, start: int, stop: int, length: int)
144 The units and three integers from the content-range header.
146 """
147 units, numbers = content_range.split(' ', 1)
148 range, length = numbers.split('/', 1)
149 start, stop = range.split('-', 1)
150 return units, int(start), int(stop), int(length)
153def safe_urlsplit(url):
154 """This is a hack to prevent the regular urlsplit from splitting around question marks.
156 A question mark (?) in a URL typically indicates the start of a
157 querystring, and the standard library's urlparse function handles the
158 querystring separately. Unfortunately, question marks can also appear
159 _inside_ the actual URL for some schemas like S3, GS.
161 Replaces question marks with a special placeholder substring prior to
162 splitting. This work-around behavior is disabled in the unlikely event the
163 placeholder is already part of the URL. If this affects you, consider
164 changing the value of QUESTION_MARK_PLACEHOLDER to something more suitable.
166 See Also
167 --------
168 https://bugs.python.org/issue43882
169 https://github.com/python/cpython/blob/3.7/Lib/urllib/parse.py
170 https://github.com/RaRe-Technologies/smart_open/issues/285
171 https://github.com/RaRe-Technologies/smart_open/issues/458
172 smart_open/utils.py:QUESTION_MARK_PLACEHOLDER
173 """
174 sr = urllib.parse.urlsplit(url, allow_fragments=False)
176 placeholder = None
177 if sr.scheme in WORKAROUND_SCHEMES and '?' in url and QUESTION_MARK_PLACEHOLDER not in url:
178 #
179 # This is safe because people will _almost never_ use the below
180 # substring in a URL. If they do, then they're asking for trouble,
181 # and this special handling will simply not happen for them.
182 #
183 placeholder = QUESTION_MARK_PLACEHOLDER
184 url = url.replace('?', placeholder)
185 sr = urllib.parse.urlsplit(url, allow_fragments=False)
187 if placeholder is None:
188 return sr
190 path = sr.path.replace(placeholder, '?')
191 return urllib.parse.SplitResult(sr.scheme, sr.netloc, path, '', '')