Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/utils.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com>
3#
4# This code is distributed under the terms and conditions
5# from the MIT License (MIT).
6#
8"""Helper functions for documentation, etc."""
10from __future__ import annotations
12import inspect
13import io
14import logging
15import urllib.parse
16from typing import IO, TYPE_CHECKING, Any
18import wrapt
20if TYPE_CHECKING:
21 from collections.abc import Callable
22 from types import TracebackType
24logger = logging.getLogger(__name__)
26WORKAROUND_SCHEMES = ["s3", "s3n", "s3a", "gcs", "gs"]
27QUESTION_MARK_PLACEHOLDER = "///smart_open.utils.QUESTION_MARK_PLACEHOLDER///"
30def inspect_kwargs(kallable: Callable[..., Any]) -> dict[str, Any]:
31 """Return a ``{name: default}`` mapping for every default-valued kwarg of `kallable`."""
32 signature = inspect.signature(kallable)
33 return {
34 name: param.default
35 for name, param in signature.parameters.items()
36 if param.default != inspect.Parameter.empty
37 }
40def check_kwargs(kallable: Callable[..., Any], kwargs: dict[str, Any]) -> dict[str, Any]:
41 """Check which keyword arguments the callable supports.
43 Args:
44 kallable: A function or method to test.
45 kwargs: The keyword arguments to check. If the callable doesn't support any
46 of these, a warning message will get printed.
48 Returns:
49 A dictionary of argument names and values supported by the callable.
50 """
51 supported_keywords = sorted(inspect_kwargs(kallable))
52 unsupported_keywords = [k for k in sorted(kwargs) if k not in supported_keywords]
53 supported_kwargs = {k: v for (k, v) in kwargs.items() if k in supported_keywords}
55 if unsupported_keywords:
56 logger.warning("ignoring unsupported keyword arguments: %r", unsupported_keywords)
58 return supported_kwargs
61def clamp(value: int, minval: int = 0, maxval: int | None = None) -> int:
62 """Clamp a numeric value to a specific range.
64 Args:
65 value: The value to clamp.
66 minval: The lower bound.
67 maxval: The upper bound.
69 Returns:
70 The clamped value. It will be in the range ``[minval, maxval]``.
71 """
72 if maxval is not None:
73 value = min(value, maxval)
74 return max(value, minval)
77def make_range_string(start: int | None = None, stop: int | None = None) -> str:
78 """Create a byte range specifier in accordance with RFC-2616.
80 Args:
81 start: The start of the byte range. If unspecified, stop indicated offset from EOF.
82 stop: The end of the byte range. If unspecified, indicates EOF.
84 Returns:
85 A byte range specifier.
87 Raises:
88 ValueError: If neither ``start`` nor ``stop`` are specified.
89 """
90 #
91 # https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
92 #
93 if start is None and stop is None:
94 msg = "make_range_string requires either a stop or start value"
95 raise ValueError(msg)
96 start_str = "" if start is None else str(start)
97 stop_str = "" if stop is None else str(stop)
98 return f"bytes={start_str}-{stop_str}"
101def parse_content_range(content_range: str) -> tuple[str, int, int, int]:
102 """Extract units, start, stop, and length from a content range header like "bytes 0-846981/846982".
104 Assumes a properly formatted content-range header from S3.
105 See werkzeug.http.parse_content_range_header for a more robust version.
107 Args:
108 content_range: The content-range header to parse.
110 Returns:
111 A tuple ``(units, start, stop, length)`` of one string and three integers
112 from the content-range header.
113 """
114 units, numbers = content_range.split(" ", 1)
115 range, length = numbers.split("/", 1)
116 start, stop = range.split("-", 1)
117 return units, int(start), int(stop), int(length)
120def safe_urlsplit(url: str) -> urllib.parse.SplitResult:
121 """This is a hack to prevent the regular urlsplit from splitting around question marks.
123 A question mark (?) in a URL typically indicates the start of a
124 querystring, and the standard library's urlparse function handles the
125 querystring separately. Unfortunately, question marks can also appear
126 _inside_ the actual URL for some schemas like S3, GS.
128 Replaces question marks with a special placeholder substring prior to
129 splitting. This work-around behavior is disabled in the unlikely event the
130 placeholder is already part of the URL. If this affects you, consider
131 changing the value of QUESTION_MARK_PLACEHOLDER to something more suitable.
133 See Also:
134 - https://bugs.python.org/issue43882
135 - https://github.com/python/cpython/blob/3.14/Lib/urllib/parse.py
136 - https://github.com/piskvorky/smart_open/issues/285
137 - https://github.com/piskvorky/smart_open/issues/458
138 - ``smart_open/utils.py:QUESTION_MARK_PLACEHOLDER``
139 """
140 sr = urllib.parse.urlsplit(url, allow_fragments=False)
142 placeholder = None
143 if sr.scheme in WORKAROUND_SCHEMES and "?" in url and QUESTION_MARK_PLACEHOLDER not in url:
144 #
145 # This is safe because people will _almost never_ use the below
146 # substring in a URL. If they do, then they're asking for trouble,
147 # and this special handling will simply not happen for them.
148 #
149 placeholder = QUESTION_MARK_PLACEHOLDER
150 url = url.replace("?", placeholder)
151 sr = urllib.parse.urlsplit(url, allow_fragments=False)
153 if placeholder is None:
154 return sr
156 path = sr.path.replace(placeholder, "?")
157 return urllib.parse.SplitResult(sr.scheme, sr.netloc, path, "", "")
160class TextIOWrapper(io.TextIOWrapper):
161 """`io.TextIOWrapper` subclass that does not close the buffer on exceptions."""
163 def __exit__(
164 self,
165 exc_type: type[BaseException] | None,
166 exc_val: BaseException | None,
167 exc_tb: TracebackType | None,
168 ) -> None:
169 """Call close on underlying buffer only when there was no exception.
171 Without this patch, TextIOWrapper would call self.buffer.close() during
172 exception handling, which is unwanted for e.g. s3 and azure. They only call
173 self.close() when there was no exception (self.terminate() otherwise) to avoid
174 committing unfinished/failed uploads.
175 """
176 if exc_type is None:
177 self.close()
180class FileLikeProxy(wrapt.ObjectProxy):
181 """Wrap an `outer` file-like object so that closing it also closes `inner`."""
183 __inner: Any = ... # initialized before wrapt disallows __setattr__ on certain objects
185 def __init__(self, outer: IO[Any], inner: IO[Any]) -> None:
186 super().__init__(outer)
187 self.__inner = inner
189 def __enter__(self) -> Any:
190 """This explicit proxy method is only required for pylance ref #916."""
191 return self.__wrapped__.__enter__()
193 def __exit__(
194 self,
195 exc_type: type[BaseException] | None,
196 exc_value: BaseException | None,
197 traceback: TracebackType | None,
198 ) -> Any:
199 """Exit inner after exiting outer."""
200 try:
201 return super().__exit__(exc_type, exc_value, traceback)
202 finally:
203 self.__inner.__exit__(exc_type, exc_value, traceback)
205 def __next__(self) -> Any:
206 """Delegate iteration to the wrapped file-like object."""
207 return self.__wrapped__.__next__()
209 def close(self) -> None:
210 """Close both the wrapped object and the inner object."""
211 try:
212 return self.__wrapped__.close()
213 finally:
214 if self.__inner != self.__wrapped__: # Don't close again if inner and wrapped are the same
215 self.__inner.close()