Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scrapy/utils/python.py: 30%
158 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-07 06:38 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-07 06:38 +0000
1"""
2This module contains essential stuff that should've come with Python itself ;)
3"""
4import collections.abc
5import gc
6import inspect
7import re
8import sys
9import weakref
10from functools import partial, wraps
11from itertools import chain
12from typing import (
13 Any,
14 AsyncGenerator,
15 AsyncIterable,
16 AsyncIterator,
17 Callable,
18 Dict,
19 Generator,
20 Iterable,
21 Iterator,
22 List,
23 Mapping,
24 Optional,
25 Pattern,
26 Tuple,
27 Union,
28 overload,
29)
31from scrapy.utils.asyncgen import as_async_generator
34def flatten(x: Iterable) -> list:
35 """flatten(sequence) -> list
37 Returns a single, flat list which contains all elements retrieved
38 from the sequence and all recursively contained sub-sequences
39 (iterables).
41 Examples:
42 >>> [1, 2, [3,4], (5,6)]
43 [1, 2, [3, 4], (5, 6)]
44 >>> flatten([[[1,2,3], (42,None)], [4,5], [6], 7, (8,9,10)])
45 [1, 2, 3, 42, None, 4, 5, 6, 7, 8, 9, 10]
46 >>> flatten(["foo", "bar"])
47 ['foo', 'bar']
48 >>> flatten(["foo", ["baz", 42], "bar"])
49 ['foo', 'baz', 42, 'bar']
50 """
51 return list(iflatten(x))
54def iflatten(x: Iterable) -> Iterable:
55 """iflatten(sequence) -> iterator
57 Similar to ``.flatten()``, but returns iterator instead"""
58 for el in x:
59 if is_listlike(el):
60 yield from iflatten(el)
61 else:
62 yield el
65def is_listlike(x: Any) -> bool:
66 """
67 >>> is_listlike("foo")
68 False
69 >>> is_listlike(5)
70 False
71 >>> is_listlike(b"foo")
72 False
73 >>> is_listlike([b"foo"])
74 True
75 >>> is_listlike((b"foo",))
76 True
77 >>> is_listlike({})
78 True
79 >>> is_listlike(set())
80 True
81 >>> is_listlike((x for x in range(3)))
82 True
83 >>> is_listlike(range(5))
84 True
85 """
86 return hasattr(x, "__iter__") and not isinstance(x, (str, bytes))
89def unique(list_: Iterable, key: Callable[[Any], Any] = lambda x: x) -> list:
90 """efficient function to uniquify a list preserving item order"""
91 seen = set()
92 result = []
93 for item in list_:
94 seenkey = key(item)
95 if seenkey in seen:
96 continue
97 seen.add(seenkey)
98 result.append(item)
99 return result
102def to_unicode(
103 text: Union[str, bytes], encoding: Optional[str] = None, errors: str = "strict"
104) -> str:
105 """Return the unicode representation of a bytes object ``text``. If
106 ``text`` is already an unicode object, return it as-is."""
107 if isinstance(text, str):
108 return text
109 if not isinstance(text, (bytes, str)):
110 raise TypeError(
111 "to_unicode must receive a bytes or str "
112 f"object, got {type(text).__name__}"
113 )
114 if encoding is None:
115 encoding = "utf-8"
116 return text.decode(encoding, errors)
119def to_bytes(
120 text: Union[str, bytes], encoding: Optional[str] = None, errors: str = "strict"
121) -> bytes:
122 """Return the binary representation of ``text``. If ``text``
123 is already a bytes object, return it as-is."""
124 if isinstance(text, bytes):
125 return text
126 if not isinstance(text, str):
127 raise TypeError(
128 "to_bytes must receive a str or bytes " f"object, got {type(text).__name__}"
129 )
130 if encoding is None:
131 encoding = "utf-8"
132 return text.encode(encoding, errors)
135def re_rsearch(
136 pattern: Union[str, Pattern], text: str, chunk_size: int = 1024
137) -> Optional[Tuple[int, int]]:
138 """
139 This function does a reverse search in a text using a regular expression
140 given in the attribute 'pattern'.
141 Since the re module does not provide this functionality, we have to find for
142 the expression into chunks of text extracted from the end (for the sake of efficiency).
143 At first, a chunk of 'chunk_size' kilobytes is extracted from the end, and searched for
144 the pattern. If the pattern is not found, another chunk is extracted, and another
145 search is performed.
146 This process continues until a match is found, or until the whole file is read.
147 In case the pattern wasn't found, None is returned, otherwise it returns a tuple containing
148 the start position of the match, and the ending (regarding the entire text).
149 """
151 def _chunk_iter() -> Generator[Tuple[str, int], Any, None]:
152 offset = len(text)
153 while True:
154 offset -= chunk_size * 1024
155 if offset <= 0:
156 break
157 yield (text[offset:], offset)
158 yield (text, 0)
160 if isinstance(pattern, str):
161 pattern = re.compile(pattern)
163 for chunk, offset in _chunk_iter():
164 matches = [match for match in pattern.finditer(chunk)]
165 if matches:
166 start, end = matches[-1].span()
167 return offset + start, offset + end
168 return None
171def memoizemethod_noargs(method: Callable) -> Callable:
172 """Decorator to cache the result of a method (without arguments) using a
173 weak reference to its object
174 """
175 cache: weakref.WeakKeyDictionary[Any, Any] = weakref.WeakKeyDictionary()
177 @wraps(method)
178 def new_method(self: Any, *args: Any, **kwargs: Any) -> Any:
179 if self not in cache:
180 cache[self] = method(self, *args, **kwargs)
181 return cache[self]
183 return new_method
186_BINARYCHARS = {
187 i for i in range(32) if to_bytes(chr(i)) not in {b"\0", b"\t", b"\n", b"\r"}
188}
191def binary_is_text(data: bytes) -> bool:
192 """Returns ``True`` if the given ``data`` argument (a ``bytes`` object)
193 does not contain unprintable control characters.
194 """
195 if not isinstance(data, bytes):
196 raise TypeError(f"data must be bytes, got '{type(data).__name__}'")
197 return all(c not in _BINARYCHARS for c in data)
200def get_func_args(func: Callable, stripself: bool = False) -> List[str]:
201 """Return the argument name list of a callable object"""
202 if not callable(func):
203 raise TypeError(f"func must be callable, got '{type(func).__name__}'")
205 args: List[str] = []
206 try:
207 sig = inspect.signature(func)
208 except ValueError:
209 return args
211 if isinstance(func, partial):
212 partial_args = func.args
213 partial_kw = func.keywords
215 for name, param in sig.parameters.items():
216 if param.name in partial_args:
217 continue
218 if partial_kw and param.name in partial_kw:
219 continue
220 args.append(name)
221 else:
222 for name in sig.parameters.keys():
223 args.append(name)
225 if stripself and args and args[0] == "self":
226 args = args[1:]
227 return args
230def get_spec(func: Callable) -> Tuple[List[str], Dict[str, Any]]:
231 """Returns (args, kwargs) tuple for a function
232 >>> import re
233 >>> get_spec(re.match)
234 (['pattern', 'string'], {'flags': 0})
236 >>> class Test:
237 ... def __call__(self, val):
238 ... pass
239 ... def method(self, val, flags=0):
240 ... pass
242 >>> get_spec(Test)
243 (['self', 'val'], {})
245 >>> get_spec(Test.method)
246 (['self', 'val'], {'flags': 0})
248 >>> get_spec(Test().method)
249 (['self', 'val'], {'flags': 0})
250 """
252 if inspect.isfunction(func) or inspect.ismethod(func):
253 spec = inspect.getfullargspec(func)
254 elif hasattr(func, "__call__"):
255 spec = inspect.getfullargspec(func.__call__)
256 else:
257 raise TypeError(f"{type(func)} is not callable")
259 defaults: Tuple[Any, ...] = spec.defaults or ()
261 firstdefault = len(spec.args) - len(defaults)
262 args = spec.args[:firstdefault]
263 kwargs = dict(zip(spec.args[firstdefault:], defaults))
264 return args, kwargs
267def equal_attributes(
268 obj1: Any, obj2: Any, attributes: Optional[List[Union[str, Callable]]]
269) -> bool:
270 """Compare two objects attributes"""
271 # not attributes given return False by default
272 if not attributes:
273 return False
275 temp1, temp2 = object(), object()
276 for attr in attributes:
277 # support callables like itemgetter
278 if callable(attr):
279 if attr(obj1) != attr(obj2):
280 return False
281 elif getattr(obj1, attr, temp1) != getattr(obj2, attr, temp2):
282 return False
283 # all attributes equal
284 return True
287@overload
288def without_none_values(iterable: Mapping) -> dict:
289 ...
292@overload
293def without_none_values(iterable: Iterable) -> Iterable:
294 ...
297def without_none_values(iterable: Union[Mapping, Iterable]) -> Union[dict, Iterable]:
298 """Return a copy of ``iterable`` with all ``None`` entries removed.
300 If ``iterable`` is a mapping, return a dictionary where all pairs that have
301 value ``None`` have been removed.
302 """
303 if isinstance(iterable, collections.abc.Mapping):
304 return {k: v for k, v in iterable.items() if v is not None}
305 else:
306 # the iterable __init__ must take another iterable
307 return type(iterable)(v for v in iterable if v is not None) # type: ignore[call-arg]
310def global_object_name(obj: Any) -> str:
311 """
312 Return full name of a global object.
314 >>> from scrapy import Request
315 >>> global_object_name(Request)
316 'scrapy.http.request.Request'
317 """
318 return f"{obj.__module__}.{obj.__name__}"
321if hasattr(sys, "pypy_version_info"):
323 def garbage_collect() -> None:
324 # Collecting weakreferences can take two collections on PyPy.
325 gc.collect()
326 gc.collect()
328else:
330 def garbage_collect() -> None:
331 gc.collect()
334class MutableChain(Iterable):
335 """
336 Thin wrapper around itertools.chain, allowing to add iterables "in-place"
337 """
339 def __init__(self, *args: Iterable):
340 self.data = chain.from_iterable(args)
342 def extend(self, *iterables: Iterable) -> None:
343 self.data = chain(self.data, chain.from_iterable(iterables))
345 def __iter__(self) -> Iterator:
346 return self
348 def __next__(self) -> Any:
349 return next(self.data)
352async def _async_chain(*iterables: Union[Iterable, AsyncIterable]) -> AsyncGenerator:
353 for it in iterables:
354 async for o in as_async_generator(it):
355 yield o
358class MutableAsyncChain(AsyncIterable):
359 """
360 Similar to MutableChain but for async iterables
361 """
363 def __init__(self, *args: Union[Iterable, AsyncIterable]):
364 self.data = _async_chain(*args)
366 def extend(self, *iterables: Union[Iterable, AsyncIterable]) -> None:
367 self.data = _async_chain(self.data, _async_chain(*iterables))
369 def __aiter__(self) -> AsyncIterator:
370 return self
372 async def __anext__(self) -> Any:
373 return await self.data.__anext__()