Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/typing.py: 80%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1from __future__ import annotations 

2 

3import abc 

4from collections.abc import Callable, Hashable, Mapping, Sequence 

5from enum import Enum 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Literal, 

10 Protocol, 

11 TypeVar, 

12 Union, 

13 runtime_checkable, 

14) 

15 

16if TYPE_CHECKING: 

17 # IPython import is relatively slow. Avoid if not necessary 

18 # TODO import from typing (requires Python >=3.10) 

19 from typing import TypeAlias 

20 

21 from IPython.display import DisplayObject 

22 

23CollType = TypeVar("CollType", bound="DaskCollection") 

24CollType_co = TypeVar("CollType_co", bound="DaskCollection", covariant=True) 

25PostComputeCallable = Callable 

26 

27 

28Key: TypeAlias = Union[str, int, float, tuple["Key", ...]] 

29# FIXME: This type is a little misleading. Low level graphs are often 

30# MutableMappings but HLGs are not 

31Graph: TypeAlias = Mapping[Key, Any] 

32# Potentially nested list of Dask keys 

33NestedKeys: TypeAlias = list[Union[Key, "NestedKeys"]] 

34 

35 

36class SchedulerGetCallable(Protocol): 

37 """Protocol defining the signature of a ``__dask_scheduler__`` callable.""" 

38 

39 def __call__( 

40 self, 

41 dsk: Graph, 

42 keys: Sequence[Key] | Key, 

43 **kwargs: Any, 

44 ) -> Any: 

45 """Method called as the default scheduler for a collection. 

46 

47 Parameters 

48 ---------- 

49 dsk : 

50 The task graph. 

51 keys : 

52 Key(s) corresponding to the desired data. 

53 **kwargs : 

54 Additional arguments. 

55 

56 Returns 

57 ------- 

58 Any 

59 Result(s) associated with `keys` 

60 

61 """ 

62 raise NotImplementedError("Inheriting class must implement this method.") 

63 

64 

65class PostPersistCallable(Protocol[CollType_co]): 

66 """Protocol defining the signature of a ``__dask_postpersist__`` callable.""" 

67 

68 def __call__( 

69 self, 

70 dsk: Graph, 

71 *args: Any, 

72 rename: Mapping[str, str] | None = None, 

73 ) -> CollType_co: 

74 """Method called to rebuild a persisted collection. 

75 

76 Parameters 

77 ---------- 

78 dsk: Mapping 

79 A mapping which contains at least the output keys returned 

80 by __dask_keys__(). 

81 *args : Any 

82 Additional optional arguments If no extra arguments are 

83 necessary, it must be an empty tuple. 

84 rename : Mapping[str, str], optional 

85 If defined, it indicates that output keys may be changing 

86 too; e.g. if the previous output of :meth:`__dask_keys__` 

87 was ``[("a", 0), ("a", 1)]``, after calling 

88 ``rebuild(dsk, *extra_args, rename={"a": "b"})`` 

89 it must become ``[("b", 0), ("b", 1)]``. 

90 The ``rename`` mapping may not contain the collection 

91 name(s); in such case the associated keys do not change. 

92 It may contain replacements for unexpected names, which 

93 must be ignored. 

94 

95 Returns 

96 ------- 

97 Collection 

98 An equivalent Dask collection with the same keys as 

99 computed through a different graph. 

100 

101 """ 

102 raise NotImplementedError("Inheriting class must implement this method.") 

103 

104 

105@runtime_checkable 

106class DaskCollection(Protocol): 

107 """Protocol defining the interface of a Dask collection.""" 

108 

109 @abc.abstractmethod 

110 def __dask_graph__(self) -> Graph: 

111 """The Dask task graph. 

112 

113 The core Dask collections (Array, DataFrame, Bag, and Delayed) 

114 use a :py:class:`~dask.highlevelgraph.HighLevelGraph` to 

115 represent the collection task graph. It is also possible to 

116 represent the task graph as a low level graph using a Python 

117 dictionary. 

118 

119 Returns 

120 ------- 

121 Mapping 

122 The Dask task graph. If the instance returns a 

123 :py:class:`dask.highlevelgraph.HighLevelGraph` then the 

124 :py:func:`__dask_layers__` method must be implemented, as 

125 defined by the :py:class:`~dask.typing.HLGDaskCollection` 

126 protocol. 

127 

128 """ 

129 raise NotImplementedError("Inheriting class must implement this method.") 

130 

131 @abc.abstractmethod 

132 def __dask_keys__(self) -> NestedKeys: 

133 """The output keys of the task graph. 

134 

135 Note that there are additional constraints on keys for a Dask 

136 collection than those described in the :doc:`task graph 

137 specification documentation <spec>`. These additional 

138 constraints are described below. 

139 

140 All keys must either be non-empty strings or tuples where the first element is a 

141 non-empty string, followed by zero or more arbitrary str, bytes, int, float, or 

142 tuples thereof. The non-empty string is commonly known as the *collection name*. 

143 All collections embedded in the dask package have exactly one name, but this is 

144 not a requirement. 

145 

146 These are all valid outputs: 

147 

148 - ``[]`` 

149 - ``["x", "y"]`` 

150 - ``[[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]`` 

151 

152 Returns 

153 ------- 

154 list 

155 A possibly nested list of keys that represent the outputs 

156 of the graph. After computation, the results will be 

157 returned in the same layout, with the keys replaced with 

158 their corresponding outputs. 

159 

160 """ 

161 raise NotImplementedError("Inheriting class must implement this method.") 

162 

163 @abc.abstractmethod 

164 def __dask_postcompute__(self) -> tuple[PostComputeCallable, tuple]: 

165 """Finalizer function and optional arguments to construct final result. 

166 

167 Upon computation each key in the collection will have an in 

168 memory result, the postcompute function combines each key's 

169 result into a final in memory representation. For example, 

170 dask.array.Array concatenates the arrays at each chunk into a 

171 final in-memory array. 

172 

173 Returns 

174 ------- 

175 PostComputeCallable 

176 Callable that receives the sequence of the results of each 

177 final key along with optional arguments. An example signature 

178 would be ``finalize(results: Sequence[Any], *args)``. 

179 tuple[Any, ...] 

180 Optional arguments passed to the function following the 

181 key results (the `*args` part of the 

182 ``PostComputeCallable``. If no additional arguments are to 

183 be passed then this must be an empty tuple. 

184 

185 """ 

186 raise NotImplementedError("Inheriting class must implement this method.") 

187 

188 @abc.abstractmethod 

189 def __dask_postpersist__(self) -> tuple[PostPersistCallable, tuple]: 

190 """Rebuilder function and optional arguments to construct a persisted collection. 

191 

192 See also the documentation for :py:class:`dask.typing.PostPersistCallable`. 

193 

194 Returns 

195 ------- 

196 PostPersistCallable 

197 Callable that rebuilds the collection. The signature 

198 should be 

199 ``rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)`` 

200 (as defined by the 

201 :py:class:`~dask.typing.PostPersistCallable` protocol). 

202 The callable should return an equivalent Dask collection 

203 with the same keys as `self`, but with results that are 

204 computed through a different graph. In the case of 

205 :py:func:`dask.persist`, the new graph will have just the 

206 output keys and the values already computed. 

207 tuple[Any, ...] 

208 Optional arguments passed to the rebuild callable. If no 

209 additional arguments are to be passed then this must be an 

210 empty tuple. 

211 

212 """ 

213 raise NotImplementedError("Inheriting class must implement this method.") 

214 

215 @abc.abstractmethod 

216 def __dask_tokenize__(self) -> Hashable: 

217 """Value that must fully represent the object.""" 

218 raise NotImplementedError("Inheriting class must implement this method.") 

219 

220 __dask_optimize__: Any 

221 """Given a graph and keys, return a new optimized graph. 

222 

223 This method can be either a ``staticmethod`` or a ``classmethod``, 

224 but not an ``instancemethod``. For example implementations see the 

225 definitions of ``__dask_optimize__`` in the core Dask collections: 

226 ``dask.array.Array``, ``dask.dataframe.DataFrame``, etc. 

227 

228 Note that graphs and keys are merged before calling 

229 ``__dask_optimize__``; as such, the graph and keys passed to 

230 this method may represent more than one collection sharing the 

231 same optimize method. 

232 

233 Parameters 

234 ---------- 

235 dsk : Graph 

236 The merged graphs from all collections sharing the same 

237 ``__dask_optimize__`` method. 

238 keys : Sequence[Key] 

239 A list of the outputs from ``__dask_keys__`` from all 

240 collections sharing the same ``__dask_optimize__`` method. 

241 **kwargs : Any 

242 Extra keyword arguments forwarded from the call to 

243 ``compute`` or ``persist``. Can be used or ignored as 

244 needed. 

245 

246 Returns 

247 ------- 

248 MutableMapping 

249 The optimized Dask graph. 

250 

251 """ 

252 

253 # FIXME: It is currently not possible to define a staticmethod from a callback protocol 

254 # Also, the definition in `is_dask_collection` cannot be satisfied by a 

255 # protocol / typing check 

256 # staticmethod[SchedulerGetCallable] 

257 __dask_scheduler__: staticmethod 

258 """The default scheduler ``get`` to use for this object. 

259 

260 Usually attached to the class as a staticmethod, e.g.: 

261 

262 >>> import dask.threaded 

263 >>> class MyCollection: 

264 ... # Use the threaded scheduler by default 

265 ... __dask_scheduler__ = staticmethod(dask.threaded.get) 

266 

267 """ 

268 

269 @abc.abstractmethod 

270 def compute(self, **kwargs: Any) -> Any: 

271 """Compute this dask collection. 

272 

273 This turns a lazy Dask collection into its in-memory 

274 equivalent. For example a Dask array turns into a NumPy array 

275 and a Dask dataframe turns into a Pandas dataframe. The entire 

276 dataset must fit into memory before calling this operation. 

277 

278 Parameters 

279 ---------- 

280 scheduler : string, optional 

281 Which scheduler to use like "threads", "synchronous" or 

282 "processes". If not provided, the default is to check the 

283 global settings first, and then fall back to the 

284 collection defaults. 

285 optimize_graph : bool, optional 

286 If True [default], the graph is optimized before 

287 computation. Otherwise the graph is run as is. This can be 

288 useful for debugging. 

289 kwargs : 

290 Extra keywords to forward to the scheduler function. 

291 

292 Returns 

293 ------- 

294 The collection's computed result. 

295 

296 See Also 

297 -------- 

298 dask.compute 

299 

300 """ 

301 raise NotImplementedError("Inheriting class must implement this method.") 

302 

303 @abc.abstractmethod 

304 def persist(self: CollType, **kwargs: Any) -> CollType: 

305 """Persist this dask collection into memory 

306 

307 This turns a lazy Dask collection into a Dask collection with 

308 the same metadata, but now with the results fully computed or 

309 actively computing in the background. 

310 

311 The action of function differs significantly depending on the 

312 active task scheduler. If the task scheduler supports 

313 asynchronous computing, such as is the case of the 

314 dask.distributed scheduler, then persist will return 

315 *immediately* and the return value's task graph will contain 

316 Dask Future objects. However if the task scheduler only 

317 supports blocking computation then the call to persist will 

318 *block* and the return value's task graph will contain 

319 concrete Python results. 

320 

321 This function is particularly useful when using distributed 

322 systems, because the results will be kept in distributed 

323 memory, rather than returned to the local process as with 

324 compute. 

325 

326 Parameters 

327 ---------- 

328 scheduler : string, optional 

329 Which scheduler to use like "threads", "synchronous" or 

330 "processes". If not provided, the default is to check the 

331 global settings first, and then fall back to the 

332 collection defaults. 

333 optimize_graph : bool, optional 

334 If True [default], the graph is optimized before 

335 computation. Otherwise the graph is run as is. This can be 

336 useful for debugging. 

337 **kwargs 

338 Extra keywords to forward to the scheduler function. 

339 

340 Returns 

341 ------- 

342 New dask collections backed by in-memory data 

343 

344 See Also 

345 -------- 

346 dask.persist 

347 

348 """ 

349 raise NotImplementedError("Inheriting class must implement this method.") 

350 

351 @abc.abstractmethod 

352 def visualize( 

353 self, 

354 filename: str = "mydask", 

355 format: str | None = None, 

356 optimize_graph: bool = False, 

357 **kwargs: Any, 

358 ) -> DisplayObject | None: 

359 """Render the computation of this object's task graph using graphviz. 

360 

361 Requires ``graphviz`` to be installed. 

362 

363 Parameters 

364 ---------- 

365 filename : str or None, optional 

366 The name of the file to write to disk. If the provided 

367 `filename` doesn't include an extension, '.png' will be 

368 used by default. If `filename` is None, no file will be 

369 written, and we communicate with dot using only pipes. 

370 format : {'png', 'pdf', 'dot', 'svg', 'jpeg', 'jpg'}, optional 

371 Format in which to write output file. Default is 'png'. 

372 optimize_graph : bool, optional 

373 If True, the graph is optimized before rendering. 

374 Otherwise, the graph is displayed as is. Default is False. 

375 color: {None, 'order'}, optional 

376 Options to color nodes. Provide ``cmap=`` keyword for 

377 additional colormap 

378 **kwargs 

379 Additional keyword arguments to forward to ``to_graphviz``. 

380 

381 Examples 

382 -------- 

383 >>> x.visualize(filename='dask.pdf') # doctest: +SKIP 

384 >>> x.visualize(filename='dask.pdf', color='order') # doctest: +SKIP 

385 

386 Returns 

387 ------- 

388 result : IPython.display.Image, IPython.display.SVG, or None 

389 See dask.dot.dot_graph for more information. 

390 

391 See Also 

392 -------- 

393 dask.visualize 

394 dask.dot.dot_graph 

395 

396 Notes 

397 ----- 

398 For more information on optimization see here: 

399 

400 https://docs.dask.org/en/latest/optimize.html 

401 

402 """ 

403 raise NotImplementedError("Inheriting class must implement this method.") 

404 

405 

406@runtime_checkable 

407class HLGDaskCollection(DaskCollection, Protocol): 

408 """Protocol defining a Dask collection that uses HighLevelGraphs. 

409 

410 This protocol is nearly identical to 

411 :py:class:`~dask.typing.DaskCollection`, with the addition of the 

412 ``__dask_layers__`` method (required for collections backed by 

413 high level graphs). 

414 

415 """ 

416 

417 @abc.abstractmethod 

418 def __dask_layers__(self) -> Sequence[str]: 

419 """Names of the HighLevelGraph layers.""" 

420 raise NotImplementedError("Inheriting class must implement this method.") 

421 

422 

423class _NoDefault(Enum): 

424 """typing-aware constant to detect when the user omits a parameter and you can't use 

425 None. 

426 

427 Copied from pandas._libs.lib._NoDefault. 

428 

429 Usage 

430 ----- 

431 from dask.typing import NoDefault, no_default 

432 

433 def f(x: int | None | NoDefault = no_default) -> int: 

434 if x is no_default: 

435 ... 

436 """ 

437 

438 no_default = "NO_DEFAULT" 

439 

440 def __repr__(self) -> str: 

441 return "<no_default>" 

442 

443 

444no_default = _NoDefault.no_default 

445NoDefault = Literal[_NoDefault.no_default]