Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/utils.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

74 statements  

1# 

2# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com> 

3# 

4# This code is distributed under the terms and conditions 

5# from the MIT License (MIT). 

6# 

7 

8"""Helper functions for documentation, etc.""" 

9 

10from __future__ import annotations 

11 

12import inspect 

13import io 

14import logging 

15import urllib.parse 

16from typing import IO, TYPE_CHECKING, Any 

17 

18import wrapt 

19 

20if TYPE_CHECKING: 

21 from collections.abc import Callable 

22 from types import TracebackType 

23 

24logger = logging.getLogger(__name__) 

25 

26WORKAROUND_SCHEMES = ["s3", "s3n", "s3a", "gcs", "gs"] 

27QUESTION_MARK_PLACEHOLDER = "///smart_open.utils.QUESTION_MARK_PLACEHOLDER///" 

28 

29 

30def inspect_kwargs(kallable: Callable[..., Any]) -> dict[str, Any]: 

31 """Return a ``{name: default}`` mapping for every default-valued kwarg of `kallable`.""" 

32 signature = inspect.signature(kallable) 

33 return { 

34 name: param.default 

35 for name, param in signature.parameters.items() 

36 if param.default != inspect.Parameter.empty 

37 } 

38 

39 

40def check_kwargs(kallable: Callable[..., Any], kwargs: dict[str, Any]) -> dict[str, Any]: 

41 """Check which keyword arguments the callable supports. 

42 

43 Args: 

44 kallable: A function or method to test. 

45 kwargs: The keyword arguments to check. If the callable doesn't support any 

46 of these, a warning message will get printed. 

47 

48 Returns: 

49 A dictionary of argument names and values supported by the callable. 

50 """ 

51 supported_keywords = sorted(inspect_kwargs(kallable)) 

52 unsupported_keywords = [k for k in sorted(kwargs) if k not in supported_keywords] 

53 supported_kwargs = {k: v for (k, v) in kwargs.items() if k in supported_keywords} 

54 

55 if unsupported_keywords: 

56 logger.warning("ignoring unsupported keyword arguments: %r", unsupported_keywords) 

57 

58 return supported_kwargs 

59 

60 

61def clamp(value: int, minval: int = 0, maxval: int | None = None) -> int: 

62 """Clamp a numeric value to a specific range. 

63 

64 Args: 

65 value: The value to clamp. 

66 minval: The lower bound. 

67 maxval: The upper bound. 

68 

69 Returns: 

70 The clamped value. It will be in the range ``[minval, maxval]``. 

71 """ 

72 if maxval is not None: 

73 value = min(value, maxval) 

74 return max(value, minval) 

75 

76 

77def make_range_string(start: int | None = None, stop: int | None = None) -> str: 

78 """Create a byte range specifier in accordance with RFC-2616. 

79 

80 Args: 

81 start: The start of the byte range. If unspecified, stop indicated offset from EOF. 

82 stop: The end of the byte range. If unspecified, indicates EOF. 

83 

84 Returns: 

85 A byte range specifier. 

86 

87 Raises: 

88 ValueError: If neither ``start`` nor ``stop`` are specified. 

89 """ 

90 # 

91 # https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 

92 # 

93 if start is None and stop is None: 

94 msg = "make_range_string requires either a stop or start value" 

95 raise ValueError(msg) 

96 start_str = "" if start is None else str(start) 

97 stop_str = "" if stop is None else str(stop) 

98 return f"bytes={start_str}-{stop_str}" 

99 

100 

101def parse_content_range(content_range: str) -> tuple[str, int, int, int]: 

102 """Extract units, start, stop, and length from a content range header like "bytes 0-846981/846982". 

103 

104 Assumes a properly formatted content-range header from S3. 

105 See werkzeug.http.parse_content_range_header for a more robust version. 

106 

107 Args: 

108 content_range: The content-range header to parse. 

109 

110 Returns: 

111 A tuple ``(units, start, stop, length)`` of one string and three integers 

112 from the content-range header. 

113 """ 

114 units, numbers = content_range.split(" ", 1) 

115 range, length = numbers.split("/", 1) 

116 start, stop = range.split("-", 1) 

117 return units, int(start), int(stop), int(length) 

118 

119 

120def safe_urlsplit(url: str) -> urllib.parse.SplitResult: 

121 """This is a hack to prevent the regular urlsplit from splitting around question marks. 

122 

123 A question mark (?) in a URL typically indicates the start of a 

124 querystring, and the standard library's urlparse function handles the 

125 querystring separately. Unfortunately, question marks can also appear 

126 _inside_ the actual URL for some schemas like S3, GS. 

127 

128 Replaces question marks with a special placeholder substring prior to 

129 splitting. This work-around behavior is disabled in the unlikely event the 

130 placeholder is already part of the URL. If this affects you, consider 

131 changing the value of QUESTION_MARK_PLACEHOLDER to something more suitable. 

132 

133 See Also: 

134 - https://bugs.python.org/issue43882 

135 - https://github.com/python/cpython/blob/3.14/Lib/urllib/parse.py 

136 - https://github.com/piskvorky/smart_open/issues/285 

137 - https://github.com/piskvorky/smart_open/issues/458 

138 - ``smart_open/utils.py:QUESTION_MARK_PLACEHOLDER`` 

139 """ 

140 sr = urllib.parse.urlsplit(url, allow_fragments=False) 

141 

142 placeholder = None 

143 if sr.scheme in WORKAROUND_SCHEMES and "?" in url and QUESTION_MARK_PLACEHOLDER not in url: 

144 # 

145 # This is safe because people will _almost never_ use the below 

146 # substring in a URL. If they do, then they're asking for trouble, 

147 # and this special handling will simply not happen for them. 

148 # 

149 placeholder = QUESTION_MARK_PLACEHOLDER 

150 url = url.replace("?", placeholder) 

151 sr = urllib.parse.urlsplit(url, allow_fragments=False) 

152 

153 if placeholder is None: 

154 return sr 

155 

156 path = sr.path.replace(placeholder, "?") 

157 return urllib.parse.SplitResult(sr.scheme, sr.netloc, path, "", "") 

158 

159 

160class TextIOWrapper(io.TextIOWrapper): 

161 """`io.TextIOWrapper` subclass that does not close the buffer on exceptions.""" 

162 

163 def __exit__( 

164 self, 

165 exc_type: type[BaseException] | None, 

166 exc_val: BaseException | None, 

167 exc_tb: TracebackType | None, 

168 ) -> None: 

169 """Call close on underlying buffer only when there was no exception. 

170 

171 Without this patch, TextIOWrapper would call self.buffer.close() during 

172 exception handling, which is unwanted for e.g. s3 and azure. They only call 

173 self.close() when there was no exception (self.terminate() otherwise) to avoid 

174 committing unfinished/failed uploads. 

175 """ 

176 if exc_type is None: 

177 self.close() 

178 

179 

180class FileLikeProxy(wrapt.ObjectProxy): 

181 """Wrap an `outer` file-like object so that closing it also closes `inner`.""" 

182 

183 __inner: Any = ... # initialized before wrapt disallows __setattr__ on certain objects 

184 

185 def __init__(self, outer: IO[Any], inner: IO[Any]) -> None: 

186 super().__init__(outer) 

187 self.__inner = inner 

188 

189 def __enter__(self) -> Any: 

190 """This explicit proxy method is only required for pylance ref #916.""" 

191 return self.__wrapped__.__enter__() 

192 

193 def __exit__( 

194 self, 

195 exc_type: type[BaseException] | None, 

196 exc_value: BaseException | None, 

197 traceback: TracebackType | None, 

198 ) -> Any: 

199 """Exit inner after exiting outer.""" 

200 try: 

201 return super().__exit__(exc_type, exc_value, traceback) 

202 finally: 

203 self.__inner.__exit__(exc_type, exc_value, traceback) 

204 

205 def __next__(self) -> Any: 

206 """Delegate iteration to the wrapped file-like object.""" 

207 return self.__wrapped__.__next__() 

208 

209 def close(self) -> None: 

210 """Close both the wrapped object and the inner object.""" 

211 try: 

212 return self.__wrapped__.close() 

213 finally: 

214 if self.__inner != self.__wrapped__: # Don't close again if inner and wrapped are the same 

215 self.__inner.close()