Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scrapy/utils/python.py: 30%

158 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-07 06:38 +0000

1""" 

2This module contains essential stuff that should've come with Python itself ;) 

3""" 

4import collections.abc 

5import gc 

6import inspect 

7import re 

8import sys 

9import weakref 

10from functools import partial, wraps 

11from itertools import chain 

12from typing import ( 

13 Any, 

14 AsyncGenerator, 

15 AsyncIterable, 

16 AsyncIterator, 

17 Callable, 

18 Dict, 

19 Generator, 

20 Iterable, 

21 Iterator, 

22 List, 

23 Mapping, 

24 Optional, 

25 Pattern, 

26 Tuple, 

27 Union, 

28 overload, 

29) 

30 

31from scrapy.utils.asyncgen import as_async_generator 

32 

33 

34def flatten(x: Iterable) -> list: 

35 """flatten(sequence) -> list 

36 

37 Returns a single, flat list which contains all elements retrieved 

38 from the sequence and all recursively contained sub-sequences 

39 (iterables). 

40 

41 Examples: 

42 >>> [1, 2, [3,4], (5,6)] 

43 [1, 2, [3, 4], (5, 6)] 

44 >>> flatten([[[1,2,3], (42,None)], [4,5], [6], 7, (8,9,10)]) 

45 [1, 2, 3, 42, None, 4, 5, 6, 7, 8, 9, 10] 

46 >>> flatten(["foo", "bar"]) 

47 ['foo', 'bar'] 

48 >>> flatten(["foo", ["baz", 42], "bar"]) 

49 ['foo', 'baz', 42, 'bar'] 

50 """ 

51 return list(iflatten(x)) 

52 

53 

54def iflatten(x: Iterable) -> Iterable: 

55 """iflatten(sequence) -> iterator 

56 

57 Similar to ``.flatten()``, but returns iterator instead""" 

58 for el in x: 

59 if is_listlike(el): 

60 yield from iflatten(el) 

61 else: 

62 yield el 

63 

64 

65def is_listlike(x: Any) -> bool: 

66 """ 

67 >>> is_listlike("foo") 

68 False 

69 >>> is_listlike(5) 

70 False 

71 >>> is_listlike(b"foo") 

72 False 

73 >>> is_listlike([b"foo"]) 

74 True 

75 >>> is_listlike((b"foo",)) 

76 True 

77 >>> is_listlike({}) 

78 True 

79 >>> is_listlike(set()) 

80 True 

81 >>> is_listlike((x for x in range(3))) 

82 True 

83 >>> is_listlike(range(5)) 

84 True 

85 """ 

86 return hasattr(x, "__iter__") and not isinstance(x, (str, bytes)) 

87 

88 

89def unique(list_: Iterable, key: Callable[[Any], Any] = lambda x: x) -> list: 

90 """efficient function to uniquify a list preserving item order""" 

91 seen = set() 

92 result = [] 

93 for item in list_: 

94 seenkey = key(item) 

95 if seenkey in seen: 

96 continue 

97 seen.add(seenkey) 

98 result.append(item) 

99 return result 

100 

101 

102def to_unicode( 

103 text: Union[str, bytes], encoding: Optional[str] = None, errors: str = "strict" 

104) -> str: 

105 """Return the unicode representation of a bytes object ``text``. If 

106 ``text`` is already an unicode object, return it as-is.""" 

107 if isinstance(text, str): 

108 return text 

109 if not isinstance(text, (bytes, str)): 

110 raise TypeError( 

111 "to_unicode must receive a bytes or str " 

112 f"object, got {type(text).__name__}" 

113 ) 

114 if encoding is None: 

115 encoding = "utf-8" 

116 return text.decode(encoding, errors) 

117 

118 

119def to_bytes( 

120 text: Union[str, bytes], encoding: Optional[str] = None, errors: str = "strict" 

121) -> bytes: 

122 """Return the binary representation of ``text``. If ``text`` 

123 is already a bytes object, return it as-is.""" 

124 if isinstance(text, bytes): 

125 return text 

126 if not isinstance(text, str): 

127 raise TypeError( 

128 "to_bytes must receive a str or bytes " f"object, got {type(text).__name__}" 

129 ) 

130 if encoding is None: 

131 encoding = "utf-8" 

132 return text.encode(encoding, errors) 

133 

134 

135def re_rsearch( 

136 pattern: Union[str, Pattern], text: str, chunk_size: int = 1024 

137) -> Optional[Tuple[int, int]]: 

138 """ 

139 This function does a reverse search in a text using a regular expression 

140 given in the attribute 'pattern'. 

141 Since the re module does not provide this functionality, we have to find for 

142 the expression into chunks of text extracted from the end (for the sake of efficiency). 

143 At first, a chunk of 'chunk_size' kilobytes is extracted from the end, and searched for 

144 the pattern. If the pattern is not found, another chunk is extracted, and another 

145 search is performed. 

146 This process continues until a match is found, or until the whole file is read. 

147 In case the pattern wasn't found, None is returned, otherwise it returns a tuple containing 

148 the start position of the match, and the ending (regarding the entire text). 

149 """ 

150 

151 def _chunk_iter() -> Generator[Tuple[str, int], Any, None]: 

152 offset = len(text) 

153 while True: 

154 offset -= chunk_size * 1024 

155 if offset <= 0: 

156 break 

157 yield (text[offset:], offset) 

158 yield (text, 0) 

159 

160 if isinstance(pattern, str): 

161 pattern = re.compile(pattern) 

162 

163 for chunk, offset in _chunk_iter(): 

164 matches = [match for match in pattern.finditer(chunk)] 

165 if matches: 

166 start, end = matches[-1].span() 

167 return offset + start, offset + end 

168 return None 

169 

170 

171def memoizemethod_noargs(method: Callable) -> Callable: 

172 """Decorator to cache the result of a method (without arguments) using a 

173 weak reference to its object 

174 """ 

175 cache: weakref.WeakKeyDictionary[Any, Any] = weakref.WeakKeyDictionary() 

176 

177 @wraps(method) 

178 def new_method(self: Any, *args: Any, **kwargs: Any) -> Any: 

179 if self not in cache: 

180 cache[self] = method(self, *args, **kwargs) 

181 return cache[self] 

182 

183 return new_method 

184 

185 

186_BINARYCHARS = { 

187 i for i in range(32) if to_bytes(chr(i)) not in {b"\0", b"\t", b"\n", b"\r"} 

188} 

189 

190 

191def binary_is_text(data: bytes) -> bool: 

192 """Returns ``True`` if the given ``data`` argument (a ``bytes`` object) 

193 does not contain unprintable control characters. 

194 """ 

195 if not isinstance(data, bytes): 

196 raise TypeError(f"data must be bytes, got '{type(data).__name__}'") 

197 return all(c not in _BINARYCHARS for c in data) 

198 

199 

200def get_func_args(func: Callable, stripself: bool = False) -> List[str]: 

201 """Return the argument name list of a callable object""" 

202 if not callable(func): 

203 raise TypeError(f"func must be callable, got '{type(func).__name__}'") 

204 

205 args: List[str] = [] 

206 try: 

207 sig = inspect.signature(func) 

208 except ValueError: 

209 return args 

210 

211 if isinstance(func, partial): 

212 partial_args = func.args 

213 partial_kw = func.keywords 

214 

215 for name, param in sig.parameters.items(): 

216 if param.name in partial_args: 

217 continue 

218 if partial_kw and param.name in partial_kw: 

219 continue 

220 args.append(name) 

221 else: 

222 for name in sig.parameters.keys(): 

223 args.append(name) 

224 

225 if stripself and args and args[0] == "self": 

226 args = args[1:] 

227 return args 

228 

229 

230def get_spec(func: Callable) -> Tuple[List[str], Dict[str, Any]]: 

231 """Returns (args, kwargs) tuple for a function 

232 >>> import re 

233 >>> get_spec(re.match) 

234 (['pattern', 'string'], {'flags': 0}) 

235 

236 >>> class Test: 

237 ... def __call__(self, val): 

238 ... pass 

239 ... def method(self, val, flags=0): 

240 ... pass 

241 

242 >>> get_spec(Test) 

243 (['self', 'val'], {}) 

244 

245 >>> get_spec(Test.method) 

246 (['self', 'val'], {'flags': 0}) 

247 

248 >>> get_spec(Test().method) 

249 (['self', 'val'], {'flags': 0}) 

250 """ 

251 

252 if inspect.isfunction(func) or inspect.ismethod(func): 

253 spec = inspect.getfullargspec(func) 

254 elif hasattr(func, "__call__"): 

255 spec = inspect.getfullargspec(func.__call__) 

256 else: 

257 raise TypeError(f"{type(func)} is not callable") 

258 

259 defaults: Tuple[Any, ...] = spec.defaults or () 

260 

261 firstdefault = len(spec.args) - len(defaults) 

262 args = spec.args[:firstdefault] 

263 kwargs = dict(zip(spec.args[firstdefault:], defaults)) 

264 return args, kwargs 

265 

266 

267def equal_attributes( 

268 obj1: Any, obj2: Any, attributes: Optional[List[Union[str, Callable]]] 

269) -> bool: 

270 """Compare two objects attributes""" 

271 # not attributes given return False by default 

272 if not attributes: 

273 return False 

274 

275 temp1, temp2 = object(), object() 

276 for attr in attributes: 

277 # support callables like itemgetter 

278 if callable(attr): 

279 if attr(obj1) != attr(obj2): 

280 return False 

281 elif getattr(obj1, attr, temp1) != getattr(obj2, attr, temp2): 

282 return False 

283 # all attributes equal 

284 return True 

285 

286 

287@overload 

288def without_none_values(iterable: Mapping) -> dict: 

289 ... 

290 

291 

292@overload 

293def without_none_values(iterable: Iterable) -> Iterable: 

294 ... 

295 

296 

297def without_none_values(iterable: Union[Mapping, Iterable]) -> Union[dict, Iterable]: 

298 """Return a copy of ``iterable`` with all ``None`` entries removed. 

299 

300 If ``iterable`` is a mapping, return a dictionary where all pairs that have 

301 value ``None`` have been removed. 

302 """ 

303 if isinstance(iterable, collections.abc.Mapping): 

304 return {k: v for k, v in iterable.items() if v is not None} 

305 else: 

306 # the iterable __init__ must take another iterable 

307 return type(iterable)(v for v in iterable if v is not None) # type: ignore[call-arg] 

308 

309 

310def global_object_name(obj: Any) -> str: 

311 """ 

312 Return full name of a global object. 

313 

314 >>> from scrapy import Request 

315 >>> global_object_name(Request) 

316 'scrapy.http.request.Request' 

317 """ 

318 return f"{obj.__module__}.{obj.__name__}" 

319 

320 

321if hasattr(sys, "pypy_version_info"): 

322 

323 def garbage_collect() -> None: 

324 # Collecting weakreferences can take two collections on PyPy. 

325 gc.collect() 

326 gc.collect() 

327 

328else: 

329 

330 def garbage_collect() -> None: 

331 gc.collect() 

332 

333 

334class MutableChain(Iterable): 

335 """ 

336 Thin wrapper around itertools.chain, allowing to add iterables "in-place" 

337 """ 

338 

339 def __init__(self, *args: Iterable): 

340 self.data = chain.from_iterable(args) 

341 

342 def extend(self, *iterables: Iterable) -> None: 

343 self.data = chain(self.data, chain.from_iterable(iterables)) 

344 

345 def __iter__(self) -> Iterator: 

346 return self 

347 

348 def __next__(self) -> Any: 

349 return next(self.data) 

350 

351 

352async def _async_chain(*iterables: Union[Iterable, AsyncIterable]) -> AsyncGenerator: 

353 for it in iterables: 

354 async for o in as_async_generator(it): 

355 yield o 

356 

357 

358class MutableAsyncChain(AsyncIterable): 

359 """ 

360 Similar to MutableChain but for async iterables 

361 """ 

362 

363 def __init__(self, *args: Union[Iterable, AsyncIterable]): 

364 self.data = _async_chain(*args) 

365 

366 def extend(self, *iterables: Union[Iterable, AsyncIterable]) -> None: 

367 self.data = _async_chain(self.data, _async_chain(*iterables)) 

368 

369 def __aiter__(self) -> AsyncIterator: 

370 return self 

371 

372 async def __anext__(self) -> Any: 

373 return await self.data.__anext__()