Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/typing.py: 82%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

60 statements  

1from __future__ import annotations 

2 

3import abc 

4from collections.abc import Callable, Hashable, Mapping, Sequence 

5from enum import Enum 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Literal, 

10 Protocol, 

11 TypeAlias, 

12 TypeVar, 

13 Union, 

14 runtime_checkable, 

15) 

16 

17if TYPE_CHECKING: 

18 # IPython import is relatively slow. Avoid if not necessary 

19 from IPython.display import DisplayObject 

20 

21CollType = TypeVar("CollType", bound="DaskCollection") 

22CollType_co = TypeVar("CollType_co", bound="DaskCollection", covariant=True) 

23PostComputeCallable = Callable 

24 

25 

26Key: TypeAlias = str | int | float | tuple["Key", ...] 

27# FIXME: This type is a little misleading. Low level graphs are often 

28# MutableMappings but HLGs are not 

29Graph: TypeAlias = Mapping[Key, Any] 

30# Potentially nested list of Dask keys 

31NestedKeys: TypeAlias = list[Union[Key, "NestedKeys"]] 

32 

33 

34class SchedulerGetCallable(Protocol): 

35 """Protocol defining the signature of a ``__dask_scheduler__`` callable.""" 

36 

37 def __call__( 

38 self, 

39 dsk: Graph, 

40 keys: Sequence[Key] | Key, 

41 **kwargs: Any, 

42 ) -> Any: 

43 """Method called as the default scheduler for a collection. 

44 

45 Parameters 

46 ---------- 

47 dsk : 

48 The task graph. 

49 keys : 

50 Key(s) corresponding to the desired data. 

51 **kwargs : 

52 Additional arguments. 

53 

54 Returns 

55 ------- 

56 Any 

57 Result(s) associated with `keys` 

58 

59 """ 

60 raise NotImplementedError("Inheriting class must implement this method.") 

61 

62 

63class PostPersistCallable(Protocol[CollType_co]): 

64 """Protocol defining the signature of a ``__dask_postpersist__`` callable.""" 

65 

66 def __call__( 

67 self, 

68 dsk: Graph, 

69 *args: Any, 

70 rename: Mapping[str, str] | None = None, 

71 ) -> CollType_co: 

72 """Method called to rebuild a persisted collection. 

73 

74 Parameters 

75 ---------- 

76 dsk: Mapping 

77 A mapping which contains at least the output keys returned 

78 by __dask_keys__(). 

79 *args : Any 

80 Additional optional arguments If no extra arguments are 

81 necessary, it must be an empty tuple. 

82 rename : Mapping[str, str], optional 

83 If defined, it indicates that output keys may be changing 

84 too; e.g. if the previous output of :meth:`__dask_keys__` 

85 was ``[("a", 0), ("a", 1)]``, after calling 

86 ``rebuild(dsk, *extra_args, rename={"a": "b"})`` 

87 it must become ``[("b", 0), ("b", 1)]``. 

88 The ``rename`` mapping may not contain the collection 

89 name(s); in such case the associated keys do not change. 

90 It may contain replacements for unexpected names, which 

91 must be ignored. 

92 

93 Returns 

94 ------- 

95 Collection 

96 An equivalent Dask collection with the same keys as 

97 computed through a different graph. 

98 

99 """ 

100 raise NotImplementedError("Inheriting class must implement this method.") 

101 

102 

103@runtime_checkable 

104class DaskCollection(Protocol): 

105 """Protocol defining the interface of a Dask collection.""" 

106 

107 @abc.abstractmethod 

108 def __dask_graph__(self) -> Graph: 

109 """The Dask task graph. 

110 

111 The core Dask collections (Array, DataFrame, Bag, and Delayed) 

112 use a :py:class:`~dask.highlevelgraph.HighLevelGraph` to 

113 represent the collection task graph. It is also possible to 

114 represent the task graph as a low level graph using a Python 

115 dictionary. 

116 

117 Returns 

118 ------- 

119 Mapping 

120 The Dask task graph. If the instance returns a 

121 :py:class:`dask.highlevelgraph.HighLevelGraph` then the 

122 :py:func:`__dask_layers__` method must be implemented, as 

123 defined by the :py:class:`~dask.typing.HLGDaskCollection` 

124 protocol. 

125 

126 """ 

127 raise NotImplementedError("Inheriting class must implement this method.") 

128 

129 @abc.abstractmethod 

130 def __dask_keys__(self) -> NestedKeys: 

131 """The output keys of the task graph. 

132 

133 Note that there are additional constraints on keys for a Dask 

134 collection than those described in the :doc:`task graph 

135 specification documentation <spec>`. These additional 

136 constraints are described below. 

137 

138 All keys must either be non-empty strings or tuples where the first element is a 

139 non-empty string, followed by zero or more arbitrary str, bytes, int, float, or 

140 tuples thereof. The non-empty string is commonly known as the *collection name*. 

141 All collections embedded in the dask package have exactly one name, but this is 

142 not a requirement. 

143 

144 These are all valid outputs: 

145 

146 - ``[]`` 

147 - ``["x", "y"]`` 

148 - ``[[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]`` 

149 

150 Returns 

151 ------- 

152 list 

153 A possibly nested list of keys that represent the outputs 

154 of the graph. After computation, the results will be 

155 returned in the same layout, with the keys replaced with 

156 their corresponding outputs. 

157 

158 """ 

159 raise NotImplementedError("Inheriting class must implement this method.") 

160 

161 @abc.abstractmethod 

162 def __dask_postcompute__(self) -> tuple[PostComputeCallable, tuple]: 

163 """Finalizer function and optional arguments to construct final result. 

164 

165 Upon computation each key in the collection will have an in 

166 memory result, the postcompute function combines each key's 

167 result into a final in memory representation. For example, 

168 dask.array.Array concatenates the arrays at each chunk into a 

169 final in-memory array. 

170 

171 Returns 

172 ------- 

173 PostComputeCallable 

174 Callable that receives the sequence of the results of each 

175 final key along with optional arguments. An example signature 

176 would be ``finalize(results: Sequence[Any], *args)``. 

177 tuple[Any, ...] 

178 Optional arguments passed to the function following the 

179 key results (the `*args` part of the 

180 ``PostComputeCallable``. If no additional arguments are to 

181 be passed then this must be an empty tuple. 

182 

183 """ 

184 raise NotImplementedError("Inheriting class must implement this method.") 

185 

186 @abc.abstractmethod 

187 def __dask_postpersist__(self) -> tuple[PostPersistCallable, tuple]: 

188 """Rebuilder function and optional arguments to construct a persisted collection. 

189 

190 See also the documentation for :py:class:`dask.typing.PostPersistCallable`. 

191 

192 Returns 

193 ------- 

194 PostPersistCallable 

195 Callable that rebuilds the collection. The signature 

196 should be 

197 ``rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)`` 

198 (as defined by the 

199 :py:class:`~dask.typing.PostPersistCallable` protocol). 

200 The callable should return an equivalent Dask collection 

201 with the same keys as `self`, but with results that are 

202 computed through a different graph. In the case of 

203 :py:func:`dask.persist`, the new graph will have just the 

204 output keys and the values already computed. 

205 tuple[Any, ...] 

206 Optional arguments passed to the rebuild callable. If no 

207 additional arguments are to be passed then this must be an 

208 empty tuple. 

209 

210 """ 

211 raise NotImplementedError("Inheriting class must implement this method.") 

212 

213 @abc.abstractmethod 

214 def __dask_tokenize__(self) -> Hashable: 

215 """Value that must fully represent the object.""" 

216 raise NotImplementedError("Inheriting class must implement this method.") 

217 

218 __dask_optimize__: Any 

219 """Given a graph and keys, return a new optimized graph. 

220 

221 This method can be either a ``staticmethod`` or a ``classmethod``, 

222 but not an ``instancemethod``. For example implementations see the 

223 definitions of ``__dask_optimize__`` in the core Dask collections: 

224 ``dask.array.Array``, ``dask.dataframe.DataFrame``, etc. 

225 

226 Note that graphs and keys are merged before calling 

227 ``__dask_optimize__``; as such, the graph and keys passed to 

228 this method may represent more than one collection sharing the 

229 same optimize method. 

230 

231 Parameters 

232 ---------- 

233 dsk : Graph 

234 The merged graphs from all collections sharing the same 

235 ``__dask_optimize__`` method. 

236 keys : Sequence[Key] 

237 A list of the outputs from ``__dask_keys__`` from all 

238 collections sharing the same ``__dask_optimize__`` method. 

239 **kwargs : Any 

240 Extra keyword arguments forwarded from the call to 

241 ``compute`` or ``persist``. Can be used or ignored as 

242 needed. 

243 

244 Returns 

245 ------- 

246 MutableMapping 

247 The optimized Dask graph. 

248 

249 """ 

250 

251 # FIXME: It is currently not possible to define a staticmethod from a callback protocol 

252 # Also, the definition in `is_dask_collection` cannot be satisfied by a 

253 # protocol / typing check 

254 # staticmethod[SchedulerGetCallable] 

255 __dask_scheduler__: staticmethod 

256 """The default scheduler ``get`` to use for this object. 

257 

258 Usually attached to the class as a staticmethod, e.g.: 

259 

260 >>> import dask.threaded 

261 >>> class MyCollection: 

262 ... # Use the threaded scheduler by default 

263 ... __dask_scheduler__ = staticmethod(dask.threaded.get) 

264 

265 """ 

266 

267 @abc.abstractmethod 

268 def compute(self, **kwargs: Any) -> Any: 

269 """Compute this dask collection. 

270 

271 This turns a lazy Dask collection into its in-memory 

272 equivalent. For example a Dask array turns into a NumPy array 

273 and a Dask dataframe turns into a Pandas dataframe. The entire 

274 dataset must fit into memory before calling this operation. 

275 

276 Parameters 

277 ---------- 

278 scheduler : string, optional 

279 Which scheduler to use like "threads", "synchronous" or 

280 "processes". If not provided, the default is to check the 

281 global settings first, and then fall back to the 

282 collection defaults. 

283 optimize_graph : bool, optional 

284 If True [default], the graph is optimized before 

285 computation. Otherwise the graph is run as is. This can be 

286 useful for debugging. 

287 kwargs : 

288 Extra keywords to forward to the scheduler function. 

289 

290 Returns 

291 ------- 

292 The collection's computed result. 

293 

294 See Also 

295 -------- 

296 dask.compute 

297 

298 """ 

299 raise NotImplementedError("Inheriting class must implement this method.") 

300 

301 @abc.abstractmethod 

302 def persist(self: CollType, **kwargs: Any) -> CollType: 

303 """Persist this dask collection into memory 

304 

305 This turns a lazy Dask collection into a Dask collection with 

306 the same metadata, but now with the results fully computed or 

307 actively computing in the background. 

308 

309 The action of function differs significantly depending on the 

310 active task scheduler. If the task scheduler supports 

311 asynchronous computing, such as is the case of the 

312 dask.distributed scheduler, then persist will return 

313 *immediately* and the return value's task graph will contain 

314 Dask Future objects. However if the task scheduler only 

315 supports blocking computation then the call to persist will 

316 *block* and the return value's task graph will contain 

317 concrete Python results. 

318 

319 This function is particularly useful when using distributed 

320 systems, because the results will be kept in distributed 

321 memory, rather than returned to the local process as with 

322 compute. 

323 

324 Parameters 

325 ---------- 

326 scheduler : string, optional 

327 Which scheduler to use like "threads", "synchronous" or 

328 "processes". If not provided, the default is to check the 

329 global settings first, and then fall back to the 

330 collection defaults. 

331 optimize_graph : bool, optional 

332 If True [default], the graph is optimized before 

333 computation. Otherwise the graph is run as is. This can be 

334 useful for debugging. 

335 **kwargs 

336 Extra keywords to forward to the scheduler function. 

337 

338 Returns 

339 ------- 

340 New dask collections backed by in-memory data 

341 

342 See Also 

343 -------- 

344 dask.persist 

345 

346 """ 

347 raise NotImplementedError("Inheriting class must implement this method.") 

348 

349 @abc.abstractmethod 

350 def visualize( 

351 self, 

352 filename: str = "mydask", 

353 format: str | None = None, 

354 optimize_graph: bool = False, 

355 **kwargs: Any, 

356 ) -> DisplayObject | None: 

357 """Render the computation of this object's task graph using graphviz. 

358 

359 Requires ``graphviz`` to be installed. 

360 

361 Parameters 

362 ---------- 

363 filename : str or None, optional 

364 The name of the file to write to disk. If the provided 

365 `filename` doesn't include an extension, '.png' will be 

366 used by default. If `filename` is None, no file will be 

367 written, and we communicate with dot using only pipes. 

368 format : {'png', 'pdf', 'dot', 'svg', 'jpeg', 'jpg'}, optional 

369 Format in which to write output file. Default is 'png'. 

370 optimize_graph : bool, optional 

371 If True, the graph is optimized before rendering. 

372 Otherwise, the graph is displayed as is. Default is False. 

373 color: {None, 'order'}, optional 

374 Options to color nodes. Provide ``cmap=`` keyword for 

375 additional colormap 

376 **kwargs 

377 Additional keyword arguments to forward to ``to_graphviz``. 

378 

379 Examples 

380 -------- 

381 >>> x.visualize(filename='dask.pdf') # doctest: +SKIP 

382 >>> x.visualize(filename='dask.pdf', color='order') # doctest: +SKIP 

383 

384 Returns 

385 ------- 

386 result : IPython.display.Image, IPython.display.SVG, or None 

387 See dask.dot.dot_graph for more information. 

388 

389 See Also 

390 -------- 

391 dask.visualize 

392 dask.dot.dot_graph 

393 

394 Notes 

395 ----- 

396 For more information on optimization see here: 

397 

398 https://docs.dask.org/en/latest/optimize.html 

399 

400 """ 

401 raise NotImplementedError("Inheriting class must implement this method.") 

402 

403 

404@runtime_checkable 

405class HLGDaskCollection(DaskCollection, Protocol): 

406 """Protocol defining a Dask collection that uses HighLevelGraphs. 

407 

408 This protocol is nearly identical to 

409 :py:class:`~dask.typing.DaskCollection`, with the addition of the 

410 ``__dask_layers__`` method (required for collections backed by 

411 high level graphs). 

412 

413 """ 

414 

415 @abc.abstractmethod 

416 def __dask_layers__(self) -> Sequence[str]: 

417 """Names of the HighLevelGraph layers.""" 

418 raise NotImplementedError("Inheriting class must implement this method.") 

419 

420 

421class _NoDefault(Enum): 

422 """typing-aware constant to detect when the user omits a parameter and you can't use 

423 None. 

424 

425 Copied from pandas._libs.lib._NoDefault. 

426 

427 Usage 

428 ----- 

429 from dask.typing import NoDefault, no_default 

430 

431 def f(x: int | None | NoDefault = no_default) -> int: 

432 if x is no_default: 

433 ... 

434 """ 

435 

436 no_default = "NO_DEFAULT" 

437 

438 def __repr__(self) -> str: 

439 return "<no_default>" 

440 

441 

442no_default = _NoDefault.no_default 

443NoDefault = Literal[_NoDefault.no_default]