Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/tokenize.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

283 statements  

1from __future__ import annotations 

2 

3import copyreg 

4import dataclasses 

5import datetime 

6import decimal 

7import hashlib 

8import inspect 

9import pathlib 

10import pickle 

11import threading 

12import types 

13import uuid 

14from collections import OrderedDict 

15from collections.abc import Iterable 

16from contextvars import ContextVar 

17from functools import partial 

18 

19import cloudpickle 

20from tlz import curry, identity 

21from tlz.functoolz import Compose 

22 

23from dask import config 

24from dask.core import literal 

25from dask.hashing import hash_buffer_hex 

26from dask.utils import Dispatch 

27 

28 

29class TokenizationError(RuntimeError): 

30 pass 

31 

32 

33def _tokenize(*args: object, **kwargs: object) -> str: 

34 token: object = _normalize_seq_func(args) 

35 if kwargs: 

36 token = token, _normalize_seq_func(sorted(kwargs.items())) 

37 

38 # Pass `usedforsecurity=False` to support FIPS builds of Python 

39 return hashlib.md5(str(token).encode(), usedforsecurity=False).hexdigest() 

40 

41 

42tokenize_lock = threading.RLock() 

43_SEEN: dict[int, tuple[int, object]] = {} 

44_ENSURE_DETERMINISTIC: ContextVar[bool | None] = ContextVar("_ENSURE_DETERMINISTIC") 

45 

46 

47def tokenize( 

48 *args: object, ensure_deterministic: bool | None = None, **kwargs: object 

49) -> str: 

50 """Deterministic token 

51 

52 >>> tokenize([1, 2, '3']) # doctest: +SKIP 

53 '06961e8de572e73c2e74b51348177918' 

54 

55 >>> tokenize('Hello') == tokenize('Hello') 

56 True 

57 

58 Parameters 

59 ---------- 

60 args, kwargs: 

61 objects to tokenize 

62 ensure_deterministic: bool, optional 

63 If True, raise TokenizationError if the objects cannot be deterministically 

64 tokenized, e.g. two identical objects will return different tokens. 

65 Defaults to the `tokenize.ensure-deterministic` configuration parameter. 

66 """ 

67 global _SEEN 

68 with tokenize_lock: 

69 seen_before, _SEEN = _SEEN, {} 

70 token = None 

71 try: 

72 _ENSURE_DETERMINISTIC.get() 

73 except LookupError: 

74 token = _ENSURE_DETERMINISTIC.set(ensure_deterministic) 

75 try: 

76 return _tokenize(*args, **kwargs) 

77 finally: 

78 if token: 

79 _ENSURE_DETERMINISTIC.reset(token) 

80 _SEEN = seen_before 

81 

82 

83def _maybe_raise_nondeterministic(msg: str) -> None: 

84 try: 

85 val = _ENSURE_DETERMINISTIC.get() 

86 except LookupError: 

87 val = None 

88 if val or val is None and config.get("tokenize.ensure-deterministic"): 

89 raise TokenizationError(msg) 

90 

91 

92_IDENTITY_DISPATCH = ( 

93 int, 

94 float, 

95 str, 

96 bytes, 

97 type(None), 

98 slice, 

99 complex, 

100 type(Ellipsis), 

101 decimal.Decimal, 

102 datetime.date, 

103 datetime.time, 

104 datetime.datetime, 

105 datetime.timedelta, 

106 pathlib.PurePath, 

107) 

108normalize_token = Dispatch() 

109normalize_token.register( 

110 _IDENTITY_DISPATCH, 

111 identity, 

112) 

113 

114 

115@normalize_token.register((types.MappingProxyType, dict)) 

116def normalize_dict(d): 

117 with tokenize_lock: 

118 if id(d) in _SEEN: 

119 return "__seen", _SEEN[id(d)][0] 

120 _SEEN[id(d)] = len(_SEEN), d 

121 try: 

122 return "dict", _normalize_seq_func( 

123 sorted(d.items(), key=lambda kv: str(kv[0])) 

124 ) 

125 finally: 

126 _SEEN.pop(id(d), None) 

127 

128 

129@normalize_token.register(OrderedDict) 

130def normalize_ordered_dict(d): 

131 return _normalize_seq_func((type(d), list(d.items()))) 

132 

133 

134@normalize_token.register(set) 

135def normalize_set(s): 

136 # Note: in some Python version / OS combinations, set order changes every 

137 # time you recreate the set (even within the same interpreter). 

138 # In most other cases, set ordering is consistent within the same interpreter. 

139 return "set", _normalize_seq_func(sorted(s, key=str)) 

140 

141 

142def _normalize_seq_func(seq: Iterable[object]) -> tuple[object, ...]: 

143 def _inner_normalize_token(item): 

144 # Don't go through Dispatch. That's slow 

145 if isinstance(item, _IDENTITY_DISPATCH): 

146 return item 

147 return normalize_token(item) 

148 

149 with tokenize_lock: 

150 if id(seq) in _SEEN: 

151 return "__seen", _SEEN[id(seq)][0] 

152 _SEEN[id(seq)] = len(_SEEN), seq 

153 try: 

154 return tuple(map(_inner_normalize_token, seq)) 

155 finally: 

156 del _SEEN[id(seq)] 

157 

158 

159@normalize_token.register((tuple, list)) 

160def normalize_seq(seq): 

161 return type(seq).__name__, _normalize_seq_func(seq) 

162 

163 

164@normalize_token.register(literal) 

165def normalize_literal(lit): 

166 return "literal", normalize_token(lit()) 

167 

168 

169@normalize_token.register(Compose) 

170def normalize_compose(func): 

171 return _normalize_seq_func((func.first,) + func.funcs) 

172 

173 

174@normalize_token.register((partial, curry)) 

175def normalize_partial(func): 

176 return _normalize_seq_func((func.func, func.args, func.keywords)) 

177 

178 

179@normalize_token.register((types.MethodType, types.MethodWrapperType)) 

180def normalize_bound_method(meth): 

181 return normalize_token(meth.__self__), meth.__name__ 

182 

183 

184@normalize_token.register(types.BuiltinFunctionType) 

185def normalize_builtin_function_or_method(func): 

186 # Note: BuiltinMethodType is BuiltinFunctionType 

187 self = getattr(func, "__self__", None) 

188 if self is not None and not inspect.ismodule(self): 

189 return normalize_bound_method(func) 

190 else: 

191 return normalize_object(func) 

192 

193 

194@normalize_token.register(object) 

195def normalize_object(o): 

196 method = getattr(o, "__dask_tokenize__", None) 

197 if method is not None and not isinstance(o, type): 

198 return method() 

199 

200 if type(o) is object: 

201 return _normalize_pure_object(o) 

202 

203 if isinstance(o, type): 

204 copyreg._slotnames(o) 

205 

206 if dataclasses.is_dataclass(o) and not isinstance(o, type): 

207 return _normalize_dataclass(o) 

208 

209 try: 

210 return _normalize_pickle(o) 

211 except Exception: 

212 _maybe_raise_nondeterministic( 

213 f"Object {o!r} cannot be deterministically hashed. This likely " 

214 "indicates that the object cannot be serialized deterministically." 

215 ) 

216 return uuid.uuid4().hex 

217 

218 

219_seen_objects = set() 

220 

221 

222def _normalize_pure_object(o: object) -> tuple[str, int]: 

223 _maybe_raise_nondeterministic( 

224 "object() cannot be deterministically hashed. See " 

225 "https://docs.dask.org/en/latest/custom-collections.html#implementing-deterministic-hashing " 

226 "for more information." 

227 ) 

228 # Idempotent, but not deterministic. Make sure that the id is not reused. 

229 _seen_objects.add(o) 

230 return "object", id(o) 

231 

232 

233def _normalize_pickle(o: object) -> tuple: 

234 buffers: list[pickle.PickleBuffer] = [] 

235 pik: int | None = None 

236 pik2: int | None = None 

237 for _ in range(3): 

238 buffers.clear() 

239 try: 

240 out = pickle.dumps(o, protocol=5, buffer_callback=buffers.append) 

241 if b"__main__" in out: 

242 # Use `cloudpickle` for objects defined in `__main__` 

243 buffers.clear() 

244 out = cloudpickle.dumps(o, protocol=5, buffer_callback=buffers.append) 

245 pickle.loads(out, buffers=buffers) 

246 pik2 = hash_buffer_hex(out) 

247 except Exception: 

248 buffers.clear() 

249 try: 

250 out = cloudpickle.dumps(o, protocol=5, buffer_callback=buffers.append) 

251 pickle.loads(out, buffers=buffers) 

252 pik2 = hash_buffer_hex(out) 

253 except Exception: 

254 break 

255 if pik and pik2 and pik == pik2: 

256 break 

257 pik = pik2 

258 else: 

259 _maybe_raise_nondeterministic("Failed to tokenize deterministically") 

260 if pik is None: 

261 _maybe_raise_nondeterministic("Failed to tokenize deterministically") 

262 pik = int(uuid.uuid4()) 

263 return pik, [hash_buffer_hex(buf) for buf in buffers] 

264 

265 

266def _normalize_dataclass(obj): 

267 fields = [ 

268 (field.name, normalize_token(getattr(obj, field.name, None))) 

269 for field in dataclasses.fields(obj) 

270 ] 

271 params = obj.__dataclass_params__ 

272 params = [(attr, getattr(params, attr)) for attr in params.__slots__] 

273 

274 return normalize_object(type(obj)), params, fields 

275 

276 

277@normalize_token.register_lazy("pandas") 

278def register_pandas(): 

279 import pandas as pd 

280 

281 # use dask._pandas_compat to avoid importing dask.dataframe here 

282 from dask._pandas_compat import PANDAS_GE_210 

283 

284 @normalize_token.register(pd.RangeIndex) 

285 def normalize_range_index(x): 

286 return type(x), x.start, x.stop, x.step, x.dtype, x.name 

287 

288 @normalize_token.register(pd.Index) 

289 def normalize_index(ind): 

290 values = ind.array 

291 

292 if isinstance(values, pd.arrays.ArrowExtensionArray): 

293 import pyarrow as pa 

294 

295 # these are sensitive to fragmentation of the backing Arrow array. 

296 # Because common operations like DataFrame.getitem and DataFrame.setitem 

297 # result in fragmented Arrow arrays, we'll consolidate them here. 

298 

299 if PANDAS_GE_210: 

300 # avoid combining chunks by using chunked_array 

301 values = pa.chunked_array([values._pa_array]).combine_chunks() 

302 else: 

303 values = pa.array(values) 

304 

305 return type(ind), ind.name, normalize_token(values) 

306 

307 @normalize_token.register(pd.MultiIndex) 

308 def normalize_index(ind): 

309 codes = ind.codes 

310 return ( 

311 [ind.name] 

312 + [normalize_token(x) for x in ind.levels] 

313 + [normalize_token(x) for x in codes] 

314 ) 

315 

316 @normalize_token.register(pd.Categorical) 

317 def normalize_categorical(cat): 

318 return [normalize_token(cat.codes), normalize_token(cat.dtype)] 

319 

320 @normalize_token.register(pd.arrays.PeriodArray) 

321 @normalize_token.register(pd.arrays.DatetimeArray) 

322 @normalize_token.register(pd.arrays.TimedeltaArray) 

323 def normalize_period_array(arr): 

324 return [normalize_token(arr.asi8), normalize_token(arr.dtype)] 

325 

326 @normalize_token.register(pd.arrays.IntervalArray) 

327 def normalize_interval_array(arr): 

328 return [ 

329 normalize_token(arr.left), 

330 normalize_token(arr.right), 

331 normalize_token(arr.closed), 

332 ] 

333 

334 @normalize_token.register(pd.Series) 

335 def normalize_series(s): 

336 return [ 

337 s.name, 

338 s.dtype, 

339 normalize_token(s._values), 

340 normalize_token(s.index), 

341 ] 

342 

343 @normalize_token.register(pd.DataFrame) 

344 def normalize_dataframe(df): 

345 mgr = df._mgr 

346 data = list(mgr.arrays) + [df.columns, df.index] 

347 return list(map(normalize_token, data)) 

348 

349 @normalize_token.register(pd.arrays.ArrowExtensionArray) 

350 def normalize_extension_array(arr): 

351 try: 

352 return (type(arr), normalize_token(arr._pa_array)) 

353 except AttributeError: 

354 return (type(arr), normalize_token(arr._data)) 

355 

356 @normalize_token.register(pd.api.extensions.ExtensionArray) 

357 def normalize_extension_array(arr): 

358 import numpy as np 

359 

360 return normalize_token(np.asarray(arr)) 

361 

362 # Dtypes 

363 @normalize_token.register(pd.api.types.CategoricalDtype) 

364 def normalize_categorical_dtype(dtype): 

365 return [normalize_token(dtype.categories), normalize_token(dtype.ordered)] 

366 

367 @normalize_token.register(pd.api.extensions.ExtensionDtype) 

368 def normalize_period_dtype(dtype): 

369 return normalize_token(dtype.name) 

370 

371 @normalize_token.register(type(pd.NA)) 

372 def normalize_na(na): 

373 return pd.NA 

374 

375 @normalize_token.register(pd.offsets.BaseOffset) 

376 def normalize_offset(offset): 

377 return offset.freqstr 

378 

379 

380@normalize_token.register_lazy("numba") 

381def register_numba(): 

382 import numba 

383 

384 @normalize_token.register(numba.core.serialize.ReduceMixin) 

385 def normalize_numba_ufunc(obj): 

386 return normalize_token((obj._reduce_class(), obj._reduce_states())) 

387 

388 

389@normalize_token.register_lazy("pyarrow") 

390def register_pyarrow(): 

391 import pyarrow as pa 

392 

393 @normalize_token.register(pa.DataType) 

394 def normalize_datatype(dt): 

395 return pickle.dumps(dt, protocol=4) 

396 

397 @normalize_token.register(pa.Table) 

398 def normalize_table(dt): 

399 return ( 

400 "pa.Table", 

401 normalize_token(dt.schema), 

402 normalize_token(dt.columns), 

403 ) 

404 

405 @normalize_token.register(pa.ChunkedArray) 

406 def normalize_chunked_array(arr): 

407 return ( 

408 "pa.ChunkedArray", 

409 normalize_token(arr.type), 

410 normalize_token(arr.chunks), 

411 ) 

412 

413 @normalize_token.register(pa.Array) 

414 def normalize_array(arr): 

415 buffers = arr.buffers() 

416 # pyarrow does something clever when (de)serializing an array that has 

417 # an empty validity map: The buffers for the deserialized array will 

418 # have `None` instead of the empty validity map. 

419 # 

420 # We'll replicate that behavior here to ensure we get consistent 

421 # tokenization. 

422 buffers = arr.buffers() 

423 if len(buffers) and buffers[0] is not None and arr.null_count == 0: 

424 buffers[0] = None 

425 

426 return ( 

427 "pa.Array", 

428 normalize_token(arr.type), 

429 normalize_token(buffers), 

430 ) 

431 

432 @normalize_token.register(pa.Buffer) 

433 def normalize_buffer(buf): 

434 return ("pa.Buffer", hash_buffer_hex(buf)) 

435 

436 

437@normalize_token.register_lazy("numpy") 

438def register_numpy(): 

439 import numpy as np 

440 

441 @normalize_token.register(np.ndarray) 

442 def normalize_array(x): 

443 if not x.shape: 

444 return (x.item(), x.dtype) 

445 if x.dtype.hasobject: 

446 try: 

447 try: 

448 # string fast-path 

449 data = hash_buffer_hex( 

450 "-".join(x.flat).encode( 

451 encoding="utf-8", errors="surrogatepass" 

452 ) 

453 ) 

454 except UnicodeDecodeError: 

455 # bytes fast-path 

456 data = hash_buffer_hex(b"-".join(x.flat)) 

457 except (TypeError, UnicodeDecodeError): 

458 return normalize_object(x) 

459 else: 

460 try: 

461 data = hash_buffer_hex(x.ravel(order="K").view("i1")) 

462 except (BufferError, AttributeError, ValueError): 

463 data = hash_buffer_hex(x.copy().ravel(order="K").view("i1")) 

464 return (data, x.dtype, x.shape) 

465 

466 @normalize_token.register(np.memmap) 

467 def normalize_mmap(mm): 

468 return hash_buffer_hex(np.ascontiguousarray(mm)) 

469 

470 @normalize_token.register(np.ufunc) 

471 def normalize_ufunc(func): 

472 try: 

473 return _normalize_pickle(func) 

474 except Exception: 

475 _maybe_raise_nondeterministic( 

476 f"Cannot tokenize numpy ufunc {func!r}. Please use functions " 

477 "of the dask.array.ufunc module instead. See also " 

478 "https://docs.dask.org/en/latest/array-numpy-compatibility.html" 

479 ) 

480 return uuid.uuid4().hex 

481 

482 @normalize_token.register(np.dtype) 

483 def normalize_dtype(dtype): 

484 return dtype.str 

485 

486 

487def _tokenize_deterministic(*args, **kwargs) -> str: 

488 # Utility to be strict about deterministic tokens 

489 return tokenize(*args, ensure_deterministic=True, **kwargs)