Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/smart_open/utils.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

77 statements  

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8 

9"""Helper functions for documentation, etc.""" 

10 

11import inspect 

12import io 

13import logging 

14import urllib.parse 

15 

16import wrapt 

17 

18logger = logging.getLogger(__name__) 

19 

20WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs'] 

21QUESTION_MARK_PLACEHOLDER = '///smart_open.utils.QUESTION_MARK_PLACEHOLDER///' 

22 

23 

24def inspect_kwargs(kallable): 

25 # 

26 # inspect.getargspec got deprecated in Py3.4, and calling it spews 

27 # deprecation warnings that we'd prefer to avoid. Unfortunately, older 

28 # versions of Python (<3.3) did not have inspect.signature, so we need to 

29 # handle them the old-fashioned getargspec way. 

30 # 

31 try: 

32 signature = inspect.signature(kallable) 

33 except AttributeError: 

34 try: 

35 args, varargs, keywords, defaults = inspect.getargspec(kallable) 

36 except TypeError: 

37 # 

38 # Happens under Py2.7 with mocking. 

39 # 

40 return {} 

41 

42 if not defaults: 

43 return {} 

44 supported_keywords = args[-len(defaults):] 

45 return dict(zip(supported_keywords, defaults)) 

46 else: 

47 return { 

48 name: param.default 

49 for name, param in signature.parameters.items() 

50 if param.default != inspect.Parameter.empty 

51 } 

52 

53 

54def check_kwargs(kallable, kwargs): 

55 """Check which keyword arguments the callable supports. 

56 

57 Parameters 

58 ---------- 

59 kallable: callable 

60 A function or method to test 

61 kwargs: dict 

62 The keyword arguments to check. If the callable doesn't support any 

63 of these, a warning message will get printed. 

64 

65 Returns 

66 ------- 

67 dict 

68 A dictionary of argument names and values supported by the callable. 

69 """ 

70 supported_keywords = sorted(inspect_kwargs(kallable)) 

71 unsupported_keywords = [k for k in sorted(kwargs) if k not in supported_keywords] 

72 supported_kwargs = {k: v for (k, v) in kwargs.items() if k in supported_keywords} 

73 

74 if unsupported_keywords: 

75 logger.warning('ignoring unsupported keyword arguments: %r', unsupported_keywords) 

76 

77 return supported_kwargs 

78 

79 

80def clamp(value, minval=0, maxval=None): 

81 """Clamp a numeric value to a specific range. 

82 

83 Parameters 

84 ---------- 

85 value: numeric 

86 The value to clamp. 

87 

88 minval: numeric 

89 The lower bound. 

90 

91 maxval: numeric 

92 The upper bound. 

93 

94 Returns 

95 ------- 

96 numeric 

97 The clamped value. It will be in the range ``[minval, maxval]``. 

98 

99 """ 

100 if maxval is not None: 

101 value = min(value, maxval) 

102 value = max(value, minval) 

103 return value 

104 

105 

106def make_range_string(start=None, stop=None): 

107 """Create a byte range specifier in accordance with RFC-2616. 

108 

109 Parameters 

110 ---------- 

111 start: int, optional 

112 The start of the byte range. If unspecified, stop indicated offset from EOF. 

113 

114 stop: int, optional 

115 The end of the byte range. If unspecified, indicates EOF. 

116 

117 Returns 

118 ------- 

119 str 

120 A byte range specifier. 

121 

122 """ 

123 # 

124 # https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 

125 # 

126 if start is None and stop is None: 

127 raise ValueError("make_range_string requires either a stop or start value") 

128 start_str = '' if start is None else str(start) 

129 stop_str = '' if stop is None else str(stop) 

130 return 'bytes=%s-%s' % (start_str, stop_str) 

131 

132 

133def parse_content_range(content_range): 

134 """Extract units, start, stop, and length from a content range header like "bytes 0-846981/846982". 

135 

136 Assumes a properly formatted content-range header from S3. 

137 See werkzeug.http.parse_content_range_header for a more robust version. 

138 

139 Parameters 

140 ---------- 

141 content_range: str 

142 The content-range header to parse. 

143 

144 Returns 

145 ------- 

146 tuple (units: str, start: int, stop: int, length: int) 

147 The units and three integers from the content-range header. 

148 

149 """ 

150 units, numbers = content_range.split(' ', 1) 

151 range, length = numbers.split('/', 1) 

152 start, stop = range.split('-', 1) 

153 return units, int(start), int(stop), int(length) 

154 

155 

156def safe_urlsplit(url): 

157 """This is a hack to prevent the regular urlsplit from splitting around question marks. 

158 

159 A question mark (?) in a URL typically indicates the start of a 

160 querystring, and the standard library's urlparse function handles the 

161 querystring separately. Unfortunately, question marks can also appear 

162 _inside_ the actual URL for some schemas like S3, GS. 

163 

164 Replaces question marks with a special placeholder substring prior to 

165 splitting. This work-around behavior is disabled in the unlikely event the 

166 placeholder is already part of the URL. If this affects you, consider 

167 changing the value of QUESTION_MARK_PLACEHOLDER to something more suitable. 

168 

169 See Also 

170 -------- 

171 https://bugs.python.org/issue43882 

172 https://github.com/python/cpython/blob/3.13/Lib/urllib/parse.py 

173 https://github.com/piskvorky/smart_open/issues/285 

174 https://github.com/piskvorky/smart_open/issues/458 

175 smart_open/utils.py:QUESTION_MARK_PLACEHOLDER 

176 """ 

177 sr = urllib.parse.urlsplit(url, allow_fragments=False) 

178 

179 placeholder = None 

180 if sr.scheme in WORKAROUND_SCHEMES and '?' in url and QUESTION_MARK_PLACEHOLDER not in url: 

181 # 

182 # This is safe because people will _almost never_ use the below 

183 # substring in a URL. If they do, then they're asking for trouble, 

184 # and this special handling will simply not happen for them. 

185 # 

186 placeholder = QUESTION_MARK_PLACEHOLDER 

187 url = url.replace('?', placeholder) 

188 sr = urllib.parse.urlsplit(url, allow_fragments=False) 

189 

190 if placeholder is None: 

191 return sr 

192 

193 path = sr.path.replace(placeholder, '?') 

194 return urllib.parse.SplitResult(sr.scheme, sr.netloc, path, '', '') 

195 

196 

197class TextIOWrapper(io.TextIOWrapper): 

198 def __exit__(self, exc_type, exc_val, exc_tb): 

199 """Call close on underlying buffer only when there was no exception. 

200 

201 Without this patch, TextIOWrapper would call self.buffer.close() during 

202 exception handling, which is unwanted for e.g. s3 and azure. They only call 

203 self.close() when there was no exception (self.terminate() otherwise) to avoid 

204 committing unfinished/failed uploads. 

205 """ 

206 if exc_type is None: 

207 self.close() 

208 

209 

210class FileLikeProxy(wrapt.ObjectProxy): 

211 __inner = ... # initialized before wrapt disallows __setattr__ on certain objects 

212 

213 def __init__(self, outer, inner): 

214 super().__init__(outer) 

215 self.__inner = inner 

216 

217 def __exit__(self, *args, **kwargs): 

218 """Exit inner after exiting outer.""" 

219 try: 

220 return super().__exit__(*args, **kwargs) 

221 finally: 

222 self.__inner.__exit__(*args, **kwargs) 

223 

224 def __next__(self): 

225 return self.__wrapped__.__next__() 

226 

227 def close(self): 

228 try: 

229 return self.__wrapped__.close() 

230 finally: 

231 if self.__inner != self.__wrapped__: # Don't close again if inner and wrapped are the same 

232 self.__inner.close()