Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/tokenize.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import copyreg
4import dataclasses
5import datetime
6import decimal
7import hashlib
8import inspect
9import pathlib
10import pickle
11import threading
12import types
13import uuid
14from collections import OrderedDict
15from collections.abc import Iterable
16from contextvars import ContextVar
17from functools import partial
19import cloudpickle
20from tlz import curry, identity
21from tlz.functoolz import Compose
23from dask import config
24from dask.core import literal
25from dask.hashing import hash_buffer_hex
26from dask.utils import Dispatch
29class TokenizationError(RuntimeError):
30 pass
33def _tokenize(*args: object, **kwargs: object) -> str:
34 token: object = _normalize_seq_func(args)
35 if kwargs:
36 token = token, _normalize_seq_func(sorted(kwargs.items()))
38 # Pass `usedforsecurity=False` to support FIPS builds of Python
39 return hashlib.md5(str(token).encode(), usedforsecurity=False).hexdigest()
42tokenize_lock = threading.RLock()
43_SEEN: dict[int, tuple[int, object]] = {}
44_ENSURE_DETERMINISTIC: ContextVar[bool | None] = ContextVar("_ENSURE_DETERMINISTIC")
47def tokenize(
48 *args: object, ensure_deterministic: bool | None = None, **kwargs: object
49) -> str:
50 """Deterministic token
52 >>> tokenize([1, 2, '3']) # doctest: +SKIP
53 '06961e8de572e73c2e74b51348177918'
55 >>> tokenize('Hello') == tokenize('Hello')
56 True
58 Parameters
59 ----------
60 args, kwargs:
61 objects to tokenize
62 ensure_deterministic: bool, optional
63 If True, raise TokenizationError if the objects cannot be deterministically
64 tokenized, e.g. two identical objects will return different tokens.
65 Defaults to the `tokenize.ensure-deterministic` configuration parameter.
66 """
67 global _SEEN
68 with tokenize_lock:
69 seen_before, _SEEN = _SEEN, {}
70 token = None
71 try:
72 _ENSURE_DETERMINISTIC.get()
73 except LookupError:
74 token = _ENSURE_DETERMINISTIC.set(ensure_deterministic)
75 try:
76 return _tokenize(*args, **kwargs)
77 finally:
78 if token:
79 _ENSURE_DETERMINISTIC.reset(token)
80 _SEEN = seen_before
83def _maybe_raise_nondeterministic(msg: str) -> None:
84 try:
85 val = _ENSURE_DETERMINISTIC.get()
86 except LookupError:
87 val = None
88 if val or val is None and config.get("tokenize.ensure-deterministic"):
89 raise TokenizationError(msg)
92_IDENTITY_DISPATCH = (
93 int,
94 float,
95 str,
96 bytes,
97 type(None),
98 slice,
99 complex,
100 type(Ellipsis),
101 decimal.Decimal,
102 datetime.date,
103 datetime.time,
104 datetime.datetime,
105 datetime.timedelta,
106 pathlib.PurePath,
107)
108normalize_token = Dispatch()
109normalize_token.register(
110 _IDENTITY_DISPATCH,
111 identity,
112)
115@normalize_token.register((types.MappingProxyType, dict))
116def normalize_dict(d):
117 with tokenize_lock:
118 if id(d) in _SEEN:
119 return "__seen", _SEEN[id(d)][0]
120 _SEEN[id(d)] = len(_SEEN), d
121 try:
122 return "dict", _normalize_seq_func(
123 sorted(d.items(), key=lambda kv: str(kv[0]))
124 )
125 finally:
126 _SEEN.pop(id(d), None)
129@normalize_token.register(OrderedDict)
130def normalize_ordered_dict(d):
131 return _normalize_seq_func((type(d), list(d.items())))
134@normalize_token.register(set)
135def normalize_set(s):
136 # Note: in some Python version / OS combinations, set order changes every
137 # time you recreate the set (even within the same interpreter).
138 # In most other cases, set ordering is consistent within the same interpreter.
139 return "set", _normalize_seq_func(sorted(s, key=str))
142def _normalize_seq_func(seq: Iterable[object]) -> tuple[object, ...]:
143 def _inner_normalize_token(item):
144 # Don't go through Dispatch. That's slow
145 if isinstance(item, _IDENTITY_DISPATCH):
146 return item
147 return normalize_token(item)
149 with tokenize_lock:
150 if id(seq) in _SEEN:
151 return "__seen", _SEEN[id(seq)][0]
152 _SEEN[id(seq)] = len(_SEEN), seq
153 try:
154 return tuple(map(_inner_normalize_token, seq))
155 finally:
156 del _SEEN[id(seq)]
159@normalize_token.register((tuple, list))
160def normalize_seq(seq):
161 return type(seq).__name__, _normalize_seq_func(seq)
164@normalize_token.register(literal)
165def normalize_literal(lit):
166 return "literal", normalize_token(lit())
169@normalize_token.register(Compose)
170def normalize_compose(func):
171 return _normalize_seq_func((func.first,) + func.funcs)
174@normalize_token.register((partial, curry))
175def normalize_partial(func):
176 return _normalize_seq_func((func.func, func.args, func.keywords))
179@normalize_token.register((types.MethodType, types.MethodWrapperType))
180def normalize_bound_method(meth):
181 return normalize_token(meth.__self__), meth.__name__
184@normalize_token.register(types.BuiltinFunctionType)
185def normalize_builtin_function_or_method(func):
186 # Note: BuiltinMethodType is BuiltinFunctionType
187 self = getattr(func, "__self__", None)
188 if self is not None and not inspect.ismodule(self):
189 return normalize_bound_method(func)
190 else:
191 return normalize_object(func)
194@normalize_token.register(object)
195def normalize_object(o):
196 method = getattr(o, "__dask_tokenize__", None)
197 if method is not None and not isinstance(o, type):
198 return method()
200 if type(o) is object:
201 return _normalize_pure_object(o)
203 if isinstance(o, type):
204 copyreg._slotnames(o)
206 if dataclasses.is_dataclass(o) and not isinstance(o, type):
207 return _normalize_dataclass(o)
209 try:
210 return _normalize_pickle(o)
211 except Exception:
212 _maybe_raise_nondeterministic(
213 f"Object {o!r} cannot be deterministically hashed. This likely "
214 "indicates that the object cannot be serialized deterministically."
215 )
216 return uuid.uuid4().hex
219_seen_objects = set()
222def _normalize_pure_object(o: object) -> tuple[str, int]:
223 _maybe_raise_nondeterministic(
224 "object() cannot be deterministically hashed. See "
225 "https://docs.dask.org/en/latest/custom-collections.html#implementing-deterministic-hashing "
226 "for more information."
227 )
228 # Idempotent, but not deterministic. Make sure that the id is not reused.
229 _seen_objects.add(o)
230 return "object", id(o)
233def _normalize_pickle(o: object) -> tuple:
234 buffers: list[pickle.PickleBuffer] = []
235 pik: int | None = None
236 pik2: int | None = None
237 for _ in range(3):
238 buffers.clear()
239 try:
240 out = pickle.dumps(o, protocol=5, buffer_callback=buffers.append)
241 if b"__main__" in out:
242 # Use `cloudpickle` for objects defined in `__main__`
243 buffers.clear()
244 out = cloudpickle.dumps(o, protocol=5, buffer_callback=buffers.append)
245 pickle.loads(out, buffers=buffers)
246 pik2 = hash_buffer_hex(out)
247 except Exception:
248 buffers.clear()
249 try:
250 out = cloudpickle.dumps(o, protocol=5, buffer_callback=buffers.append)
251 pickle.loads(out, buffers=buffers)
252 pik2 = hash_buffer_hex(out)
253 except Exception:
254 break
255 if pik and pik2 and pik == pik2:
256 break
257 pik = pik2
258 else:
259 _maybe_raise_nondeterministic("Failed to tokenize deterministically")
260 if pik is None:
261 _maybe_raise_nondeterministic("Failed to tokenize deterministically")
262 pik = int(uuid.uuid4())
263 return pik, [hash_buffer_hex(buf) for buf in buffers]
266def _normalize_dataclass(obj):
267 fields = [
268 (field.name, normalize_token(getattr(obj, field.name, None)))
269 for field in dataclasses.fields(obj)
270 ]
271 params = obj.__dataclass_params__
272 params = [(attr, getattr(params, attr)) for attr in params.__slots__]
274 return normalize_object(type(obj)), params, fields
277@normalize_token.register_lazy("pandas")
278def register_pandas():
279 import pandas as pd
281 from dask.dataframe._compat import PANDAS_GE_210
283 @normalize_token.register(pd.RangeIndex)
284 def normalize_range_index(x):
285 return type(x), x.start, x.stop, x.step, x.dtype, x.name
287 @normalize_token.register(pd.Index)
288 def normalize_index(ind):
289 values = ind.array
291 if isinstance(values, pd.arrays.ArrowExtensionArray):
292 import pyarrow as pa
294 # these are sensitive to fragmentation of the backing Arrow array.
295 # Because common operations like DataFrame.getitem and DataFrame.setitem
296 # result in fragmented Arrow arrays, we'll consolidate them here.
298 if PANDAS_GE_210:
299 # avoid combining chunks by using chunked_array
300 values = pa.chunked_array([values._pa_array]).combine_chunks()
301 else:
302 values = pa.array(values)
304 return type(ind), ind.name, normalize_token(values)
306 @normalize_token.register(pd.MultiIndex)
307 def normalize_index(ind):
308 codes = ind.codes
309 return (
310 [ind.name]
311 + [normalize_token(x) for x in ind.levels]
312 + [normalize_token(x) for x in codes]
313 )
315 @normalize_token.register(pd.Categorical)
316 def normalize_categorical(cat):
317 return [normalize_token(cat.codes), normalize_token(cat.dtype)]
319 @normalize_token.register(pd.arrays.PeriodArray)
320 @normalize_token.register(pd.arrays.DatetimeArray)
321 @normalize_token.register(pd.arrays.TimedeltaArray)
322 def normalize_period_array(arr):
323 return [normalize_token(arr.asi8), normalize_token(arr.dtype)]
325 @normalize_token.register(pd.arrays.IntervalArray)
326 def normalize_interval_array(arr):
327 return [
328 normalize_token(arr.left),
329 normalize_token(arr.right),
330 normalize_token(arr.closed),
331 ]
333 @normalize_token.register(pd.Series)
334 def normalize_series(s):
335 return [
336 s.name,
337 s.dtype,
338 normalize_token(s._values),
339 normalize_token(s.index),
340 ]
342 @normalize_token.register(pd.DataFrame)
343 def normalize_dataframe(df):
344 mgr = df._mgr
345 data = list(mgr.arrays) + [df.columns, df.index]
346 return list(map(normalize_token, data))
348 @normalize_token.register(pd.arrays.ArrowExtensionArray)
349 def normalize_extension_array(arr):
350 try:
351 return (type(arr), normalize_token(arr._pa_array))
352 except AttributeError:
353 return (type(arr), normalize_token(arr._data))
355 @normalize_token.register(pd.api.extensions.ExtensionArray)
356 def normalize_extension_array(arr):
357 import numpy as np
359 return normalize_token(np.asarray(arr))
361 # Dtypes
362 @normalize_token.register(pd.api.types.CategoricalDtype)
363 def normalize_categorical_dtype(dtype):
364 return [normalize_token(dtype.categories), normalize_token(dtype.ordered)]
366 @normalize_token.register(pd.api.extensions.ExtensionDtype)
367 def normalize_period_dtype(dtype):
368 return normalize_token(dtype.name)
370 @normalize_token.register(type(pd.NA))
371 def normalize_na(na):
372 return pd.NA
374 @normalize_token.register(pd.offsets.BaseOffset)
375 def normalize_offset(offset):
376 return offset.freqstr
379@normalize_token.register_lazy("numba")
380def register_numba():
381 import numba
383 @normalize_token.register(numba.core.serialize.ReduceMixin)
384 def normalize_numba_ufunc(obj):
385 return normalize_token((obj._reduce_class(), obj._reduce_states()))
388@normalize_token.register_lazy("pyarrow")
389def register_pyarrow():
390 import pyarrow as pa
392 @normalize_token.register(pa.DataType)
393 def normalize_datatype(dt):
394 return pickle.dumps(dt, protocol=4)
396 @normalize_token.register(pa.Table)
397 def normalize_table(dt):
398 return (
399 "pa.Table",
400 normalize_token(dt.schema),
401 normalize_token(dt.columns),
402 )
404 @normalize_token.register(pa.ChunkedArray)
405 def normalize_chunked_array(arr):
406 return (
407 "pa.ChunkedArray",
408 normalize_token(arr.type),
409 normalize_token(arr.chunks),
410 )
412 @normalize_token.register(pa.Array)
413 def normalize_array(arr):
414 buffers = arr.buffers()
415 # pyarrow does something clever when (de)serializing an array that has
416 # an empty validity map: The buffers for the deserialized array will
417 # have `None` instead of the empty validity map.
418 #
419 # We'll replicate that behavior here to ensure we get consistent
420 # tokenization.
421 buffers = arr.buffers()
422 if len(buffers) and buffers[0] is not None and arr.null_count == 0:
423 buffers[0] = None
425 return (
426 "pa.Array",
427 normalize_token(arr.type),
428 normalize_token(buffers),
429 )
431 @normalize_token.register(pa.Buffer)
432 def normalize_buffer(buf):
433 return ("pa.Buffer", hash_buffer_hex(buf))
436@normalize_token.register_lazy("numpy")
437def register_numpy():
438 import numpy as np
440 @normalize_token.register(np.ndarray)
441 def normalize_array(x):
442 if not x.shape:
443 return (x.item(), x.dtype)
444 if x.dtype.hasobject:
445 try:
446 try:
447 # string fast-path
448 data = hash_buffer_hex(
449 "-".join(x.flat).encode(
450 encoding="utf-8", errors="surrogatepass"
451 )
452 )
453 except UnicodeDecodeError:
454 # bytes fast-path
455 data = hash_buffer_hex(b"-".join(x.flat))
456 except (TypeError, UnicodeDecodeError):
457 return normalize_object(x)
458 else:
459 try:
460 data = hash_buffer_hex(x.ravel(order="K").view("i1"))
461 except (BufferError, AttributeError, ValueError):
462 data = hash_buffer_hex(x.copy().ravel(order="K").view("i1"))
463 return (data, x.dtype, x.shape)
465 @normalize_token.register(np.memmap)
466 def normalize_mmap(mm):
467 return hash_buffer_hex(np.ascontiguousarray(mm))
469 @normalize_token.register(np.ufunc)
470 def normalize_ufunc(func):
471 try:
472 return _normalize_pickle(func)
473 except Exception:
474 _maybe_raise_nondeterministic(
475 f"Cannot tokenize numpy ufunc {func!r}. Please use functions "
476 "of the dask.array.ufunc module instead. See also "
477 "https://docs.dask.org/en/latest/array-numpy-compatibility.html"
478 )
479 return uuid.uuid4().hex
481 @normalize_token.register(np.dtype)
482 def normalize_dtype(dtype):
483 return dtype.str
486def _tokenize_deterministic(*args, **kwargs) -> str:
487 # Utility to be strict about deterministic tokens
488 return tokenize(*args, ensure_deterministic=True, **kwargs)