Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/smart_open/utils.py: 24%

54 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:57 +0000

1# -*- coding: utf-8 -*- 

2# 

3# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com> 

4# 

5# This code is distributed under the terms and conditions 

6# from the MIT License (MIT). 

7# 

8 

9"""Helper functions for documentation, etc.""" 

10 

11import inspect 

12import logging 

13import urllib.parse 

14 

15logger = logging.getLogger(__name__) 

16 

17WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs'] 

18QUESTION_MARK_PLACEHOLDER = '///smart_open.utils.QUESTION_MARK_PLACEHOLDER///' 

19 

20 

21def inspect_kwargs(kallable): 

22 # 

23 # inspect.getargspec got deprecated in Py3.4, and calling it spews 

24 # deprecation warnings that we'd prefer to avoid. Unfortunately, older 

25 # versions of Python (<3.3) did not have inspect.signature, so we need to 

26 # handle them the old-fashioned getargspec way. 

27 # 

28 try: 

29 signature = inspect.signature(kallable) 

30 except AttributeError: 

31 try: 

32 args, varargs, keywords, defaults = inspect.getargspec(kallable) 

33 except TypeError: 

34 # 

35 # Happens under Py2.7 with mocking. 

36 # 

37 return {} 

38 

39 if not defaults: 

40 return {} 

41 supported_keywords = args[-len(defaults):] 

42 return dict(zip(supported_keywords, defaults)) 

43 else: 

44 return { 

45 name: param.default 

46 for name, param in signature.parameters.items() 

47 if param.default != inspect.Parameter.empty 

48 } 

49 

50 

51def check_kwargs(kallable, kwargs): 

52 """Check which keyword arguments the callable supports. 

53 

54 Parameters 

55 ---------- 

56 kallable: callable 

57 A function or method to test 

58 kwargs: dict 

59 The keyword arguments to check. If the callable doesn't support any 

60 of these, a warning message will get printed. 

61 

62 Returns 

63 ------- 

64 dict 

65 A dictionary of argument names and values supported by the callable. 

66 """ 

67 supported_keywords = sorted(inspect_kwargs(kallable)) 

68 unsupported_keywords = [k for k in sorted(kwargs) if k not in supported_keywords] 

69 supported_kwargs = {k: v for (k, v) in kwargs.items() if k in supported_keywords} 

70 

71 if unsupported_keywords: 

72 logger.warning('ignoring unsupported keyword arguments: %r', unsupported_keywords) 

73 

74 return supported_kwargs 

75 

76 

77def clamp(value, minval=0, maxval=None): 

78 """Clamp a numeric value to a specific range. 

79 

80 Parameters 

81 ---------- 

82 value: numeric 

83 The value to clamp. 

84 

85 minval: numeric 

86 The lower bound. 

87 

88 maxval: numeric 

89 The upper bound. 

90 

91 Returns 

92 ------- 

93 numeric 

94 The clamped value. It will be in the range ``[minval, maxval]``. 

95 

96 """ 

97 if maxval is not None: 

98 value = min(value, maxval) 

99 value = max(value, minval) 

100 return value 

101 

102 

103def make_range_string(start=None, stop=None): 

104 """Create a byte range specifier in accordance with RFC-2616. 

105 

106 Parameters 

107 ---------- 

108 start: int, optional 

109 The start of the byte range. If unspecified, stop indicated offset from EOF. 

110 

111 stop: int, optional 

112 The end of the byte range. If unspecified, indicates EOF. 

113 

114 Returns 

115 ------- 

116 str 

117 A byte range specifier. 

118 

119 """ 

120 # 

121 # https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 

122 # 

123 if start is None and stop is None: 

124 raise ValueError("make_range_string requires either a stop or start value") 

125 start_str = '' if start is None else str(start) 

126 stop_str = '' if stop is None else str(stop) 

127 return 'bytes=%s-%s' % (start_str, stop_str) 

128 

129 

130def parse_content_range(content_range): 

131 """Extract units, start, stop, and length from a content range header like "bytes 0-846981/846982". 

132 

133 Assumes a properly formatted content-range header from S3. 

134 See werkzeug.http.parse_content_range_header for a more robust version. 

135 

136 Parameters 

137 ---------- 

138 content_range: str 

139 The content-range header to parse. 

140 

141 Returns 

142 ------- 

143 tuple (units: str, start: int, stop: int, length: int) 

144 The units and three integers from the content-range header. 

145 

146 """ 

147 units, numbers = content_range.split(' ', 1) 

148 range, length = numbers.split('/', 1) 

149 start, stop = range.split('-', 1) 

150 return units, int(start), int(stop), int(length) 

151 

152 

153def safe_urlsplit(url): 

154 """This is a hack to prevent the regular urlsplit from splitting around question marks. 

155 

156 A question mark (?) in a URL typically indicates the start of a 

157 querystring, and the standard library's urlparse function handles the 

158 querystring separately. Unfortunately, question marks can also appear 

159 _inside_ the actual URL for some schemas like S3, GS. 

160 

161 Replaces question marks with a special placeholder substring prior to 

162 splitting. This work-around behavior is disabled in the unlikely event the 

163 placeholder is already part of the URL. If this affects you, consider 

164 changing the value of QUESTION_MARK_PLACEHOLDER to something more suitable. 

165 

166 See Also 

167 -------- 

168 https://bugs.python.org/issue43882 

169 https://github.com/python/cpython/blob/3.7/Lib/urllib/parse.py 

170 https://github.com/RaRe-Technologies/smart_open/issues/285 

171 https://github.com/RaRe-Technologies/smart_open/issues/458 

172 smart_open/utils.py:QUESTION_MARK_PLACEHOLDER 

173 """ 

174 sr = urllib.parse.urlsplit(url, allow_fragments=False) 

175 

176 placeholder = None 

177 if sr.scheme in WORKAROUND_SCHEMES and '?' in url and QUESTION_MARK_PLACEHOLDER not in url: 

178 # 

179 # This is safe because people will _almost never_ use the below 

180 # substring in a URL. If they do, then they're asking for trouble, 

181 # and this special handling will simply not happen for them. 

182 # 

183 placeholder = QUESTION_MARK_PLACEHOLDER 

184 url = url.replace('?', placeholder) 

185 sr = urllib.parse.urlsplit(url, allow_fragments=False) 

186 

187 if placeholder is None: 

188 return sr 

189 

190 path = sr.path.replace(placeholder, '?') 

191 return urllib.parse.SplitResult(sr.scheme, sr.netloc, path, '', '')