Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/dask/typing.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1from __future__ import annotations 

2 

3import abc 

4from collections.abc import Callable, Hashable, Mapping, Sequence 

5from enum import Enum 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Literal, 

10 Protocol, 

11 TypeVar, 

12 Union, 

13 runtime_checkable, 

14) 

15 

16if TYPE_CHECKING: 

17 # IPython import is relatively slow. Avoid if not necessary 

18 from IPython.display import DisplayObject 

19 

20 # TODO import from typing (requires Python >=3.10) 

21 from typing_extensions import TypeAlias 

22 

23CollType = TypeVar("CollType", bound="DaskCollection") 

24CollType_co = TypeVar("CollType_co", bound="DaskCollection", covariant=True) 

25PostComputeCallable = Callable 

26 

27 

28Key: TypeAlias = Union[str, bytes, int, float, tuple["Key", ...]] 

29# FIXME: This type is a little misleading. Low level graphs are often 

30# MutableMappings but HLGs are not 

31Graph: TypeAlias = Mapping[Key, Any] 

32# Potentially nested list of Dask keys 

33NestedKeys: TypeAlias = list[Union[Key, "NestedKeys"]] 

34 

35 

36class SchedulerGetCallable(Protocol): 

37 """Protocol defining the signature of a ``__dask_scheduler__`` callable.""" 

38 

39 def __call__( 

40 self, 

41 dsk: Graph, 

42 keys: Sequence[Key] | Key, 

43 **kwargs: Any, 

44 ) -> Any: 

45 """Method called as the default scheduler for a collection. 

46 

47 Parameters 

48 ---------- 

49 dsk : 

50 The task graph. 

51 keys : 

52 Key(s) corresponding to the desired data. 

53 **kwargs : 

54 Additional arguments. 

55 

56 Returns 

57 ------- 

58 Any 

59 Result(s) associated with `keys` 

60 

61 """ 

62 raise NotImplementedError("Inheriting class must implement this method.") 

63 

64 

65class PostPersistCallable(Protocol[CollType_co]): 

66 """Protocol defining the signature of a ``__dask_postpersist__`` callable.""" 

67 

68 def __call__( 

69 self, 

70 dsk: Graph, 

71 *args: Any, 

72 rename: Mapping[str, str] | None = None, 

73 ) -> CollType_co: 

74 """Method called to rebuild a persisted collection. 

75 

76 Parameters 

77 ---------- 

78 dsk: Mapping 

79 A mapping which contains at least the output keys returned 

80 by __dask_keys__(). 

81 *args : Any 

82 Additional optional arguments If no extra arguments are 

83 necessary, it must be an empty tuple. 

84 rename : Mapping[str, str], optional 

85 If defined, it indicates that output keys may be changing 

86 too; e.g. if the previous output of :meth:`__dask_keys__` 

87 was ``[("a", 0), ("a", 1)]``, after calling 

88 ``rebuild(dsk, *extra_args, rename={"a": "b"})`` 

89 it must become ``[("b", 0), ("b", 1)]``. 

90 The ``rename`` mapping may not contain the collection 

91 name(s); in such case the associated keys do not change. 

92 It may contain replacements for unexpected names, which 

93 must be ignored. 

94 

95 Returns 

96 ------- 

97 Collection 

98 An equivalent Dask collection with the same keys as 

99 computed through a different graph. 

100 

101 """ 

102 raise NotImplementedError("Inheriting class must implement this method.") 

103 

104 

105@runtime_checkable 

106class DaskCollection(Protocol): 

107 """Protocol defining the interface of a Dask collection.""" 

108 

109 @abc.abstractmethod 

110 def __dask_graph__(self) -> Graph: 

111 """The Dask task graph. 

112 

113 The core Dask collections (Array, DataFrame, Bag, and Delayed) 

114 use a :py:class:`~dask.highlevelgraph.HighLevelGraph` to 

115 represent the collection task graph. It is also possible to 

116 represent the task graph as a low level graph using a Python 

117 dictionary. 

118 

119 Returns 

120 ------- 

121 Mapping 

122 The Dask task graph. If the instance returns a 

123 :py:class:`dask.highlevelgraph.HighLevelGraph` then the 

124 :py:func:`__dask_layers__` method must be implemented, as 

125 defined by the :py:class:`~dask.typing.HLGDaskCollection` 

126 protocol. 

127 

128 """ 

129 raise NotImplementedError("Inheriting class must implement this method.") 

130 

131 @abc.abstractmethod 

132 def __dask_keys__(self) -> NestedKeys: 

133 """The output keys of the task graph. 

134 

135 Note that there are additional constraints on keys for a Dask 

136 collection than those described in the :doc:`task graph 

137 specification documentation <spec>`. These additional 

138 constraints are described below. 

139 

140 All keys must either be non-empty strings or tuples where the first element is a 

141 non-empty string, followed by zero or more arbitrary str, bytes, int, float, or 

142 tuples thereof. The non-empty string is commonly known as the *collection name*. 

143 All collections embedded in the dask package have exactly one name, but this is 

144 not a requirement. 

145 

146 These are all valid outputs: 

147 

148 - ``[]`` 

149 - ``["x", "y"]`` 

150 - ``[[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]`` 

151 

152 Returns 

153 ------- 

154 list 

155 A possibly nested list of keys that represent the outputs 

156 of the graph. After computation, the results will be 

157 returned in the same layout, with the keys replaced with 

158 their corresponding outputs. 

159 

160 """ 

161 raise NotImplementedError("Inheriting class must implement this method.") 

162 

163 @abc.abstractmethod 

164 def __dask_postcompute__(self) -> tuple[PostComputeCallable, tuple]: 

165 """Finalizer function and optional arguments to construct final result. 

166 

167 Upon computation each key in the collection will have an in 

168 memory result, the postcompute function combines each key's 

169 result into a final in memory representation. For example, 

170 dask.array.Array concatenates the arrays at each chunk into a 

171 final in-memory array. 

172 

173 Returns 

174 ------- 

175 PostComputeCallable 

176 Callable that receives the sequence of the results of each 

177 final key along with optional arguments. An example signature 

178 would be ``finalize(results: Sequence[Any], *args)``. 

179 tuple[Any, ...] 

180 Optional arguments passed to the function following the 

181 key results (the `*args` part of the 

182 ``PostComputeCallable``. If no additional arguments are to 

183 be passed then this must be an empty tuple. 

184 

185 """ 

186 raise NotImplementedError("Inheriting class must implement this method.") 

187 

188 @abc.abstractmethod 

189 def __dask_postpersist__(self) -> tuple[PostPersistCallable, tuple]: 

190 """Rebuilder function and optional arguments to construct a persisted collection. 

191 

192 See also the documentation for :py:class:`dask.typing.PostPersistCallable`. 

193 

194 Returns 

195 ------- 

196 PostPersistCallable 

197 Callable that rebuilds the collection. The signature 

198 should be 

199 ``rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)`` 

200 (as defined by the 

201 :py:class:`~dask.typing.PostPersistCallable` protocol). 

202 The callable should return an equivalent Dask collection 

203 with the same keys as `self`, but with results that are 

204 computed through a different graph. In the case of 

205 :py:func:`dask.persist`, the new graph will have just the 

206 output keys and the values already computed. 

207 tuple[Any, ...] 

208 Optional arguments passed to the rebuild callable. If no 

209 additional arguments are to be passed then this must be an 

210 empty tuple. 

211 

212 """ 

213 raise NotImplementedError("Inheriting class must implement this method.") 

214 

215 @abc.abstractmethod 

216 def __dask_tokenize__(self) -> Hashable: 

217 """Value that must fully represent the object.""" 

218 raise NotImplementedError("Inheriting class must implement this method.") 

219 

220 __dask_optimize__: Any 

221 """Given a graph and keys, return a new optimized graph. 

222 

223 This method can be either a ``staticmethod`` or a ``classmethod``, 

224 but not an ``instancemethod``. For example implementations see the 

225 definitions of ``__dask_optimize__`` in the core Dask collections: 

226 ``dask.array.Array``, ``dask.dataframe.DataFrame``, etc. 

227 

228 Note that graphs and keys are merged before calling 

229 ``__dask_optimize__``; as such, the graph and keys passed to 

230 this method may represent more than one collection sharing the 

231 same optimize method. 

232 

233 Parameters 

234 ---------- 

235 dsk : Graph 

236 The merged graphs from all collections sharing the same 

237 ``__dask_optimize__`` method. 

238 keys : Sequence[Key] 

239 A list of the outputs from ``__dask_keys__`` from all 

240 collections sharing the same ``__dask_optimize__`` method. 

241 **kwargs : Any 

242 Extra keyword arguments forwarded from the call to 

243 ``compute`` or ``persist``. Can be used or ignored as 

244 needed. 

245 

246 Returns 

247 ------- 

248 MutableMapping 

249 The optimized Dask graph. 

250 

251 """ 

252 

253 __dask_scheduler__: staticmethod[SchedulerGetCallable] 

254 """The default scheduler ``get`` to use for this object. 

255 

256 Usually attached to the class as a staticmethod, e.g.: 

257 

258 >>> import dask.threaded 

259 >>> class MyCollection: 

260 ... # Use the threaded scheduler by default 

261 ... __dask_scheduler__ = staticmethod(dask.threaded.get) 

262 

263 """ 

264 

265 @abc.abstractmethod 

266 def compute(self, **kwargs: Any) -> Any: 

267 """Compute this dask collection. 

268 

269 This turns a lazy Dask collection into its in-memory 

270 equivalent. For example a Dask array turns into a NumPy array 

271 and a Dask dataframe turns into a Pandas dataframe. The entire 

272 dataset must fit into memory before calling this operation. 

273 

274 Parameters 

275 ---------- 

276 scheduler : string, optional 

277 Which scheduler to use like "threads", "synchronous" or 

278 "processes". If not provided, the default is to check the 

279 global settings first, and then fall back to the 

280 collection defaults. 

281 optimize_graph : bool, optional 

282 If True [default], the graph is optimized before 

283 computation. Otherwise the graph is run as is. This can be 

284 useful for debugging. 

285 kwargs : 

286 Extra keywords to forward to the scheduler function. 

287 

288 Returns 

289 ------- 

290 The collection's computed result. 

291 

292 See Also 

293 -------- 

294 dask.compute 

295 

296 """ 

297 raise NotImplementedError("Inheriting class must implement this method.") 

298 

299 @abc.abstractmethod 

300 def persist(self: CollType, **kwargs: Any) -> CollType: 

301 """Persist this dask collection into memory 

302 

303 This turns a lazy Dask collection into a Dask collection with 

304 the same metadata, but now with the results fully computed or 

305 actively computing in the background. 

306 

307 The action of function differs significantly depending on the 

308 active task scheduler. If the task scheduler supports 

309 asynchronous computing, such as is the case of the 

310 dask.distributed scheduler, then persist will return 

311 *immediately* and the return value's task graph will contain 

312 Dask Future objects. However if the task scheduler only 

313 supports blocking computation then the call to persist will 

314 *block* and the return value's task graph will contain 

315 concrete Python results. 

316 

317 This function is particularly useful when using distributed 

318 systems, because the results will be kept in distributed 

319 memory, rather than returned to the local process as with 

320 compute. 

321 

322 Parameters 

323 ---------- 

324 scheduler : string, optional 

325 Which scheduler to use like "threads", "synchronous" or 

326 "processes". If not provided, the default is to check the 

327 global settings first, and then fall back to the 

328 collection defaults. 

329 optimize_graph : bool, optional 

330 If True [default], the graph is optimized before 

331 computation. Otherwise the graph is run as is. This can be 

332 useful for debugging. 

333 **kwargs 

334 Extra keywords to forward to the scheduler function. 

335 

336 Returns 

337 ------- 

338 New dask collections backed by in-memory data 

339 

340 See Also 

341 -------- 

342 dask.persist 

343 

344 """ 

345 raise NotImplementedError("Inheriting class must implement this method.") 

346 

347 @abc.abstractmethod 

348 def visualize( 

349 self, 

350 filename: str = "mydask", 

351 format: str | None = None, 

352 optimize_graph: bool = False, 

353 **kwargs: Any, 

354 ) -> DisplayObject | None: 

355 """Render the computation of this object's task graph using graphviz. 

356 

357 Requires ``graphviz`` to be installed. 

358 

359 Parameters 

360 ---------- 

361 filename : str or None, optional 

362 The name of the file to write to disk. If the provided 

363 `filename` doesn't include an extension, '.png' will be 

364 used by default. If `filename` is None, no file will be 

365 written, and we communicate with dot using only pipes. 

366 format : {'png', 'pdf', 'dot', 'svg', 'jpeg', 'jpg'}, optional 

367 Format in which to write output file. Default is 'png'. 

368 optimize_graph : bool, optional 

369 If True, the graph is optimized before rendering. 

370 Otherwise, the graph is displayed as is. Default is False. 

371 color: {None, 'order'}, optional 

372 Options to color nodes. Provide ``cmap=`` keyword for 

373 additional colormap 

374 **kwargs 

375 Additional keyword arguments to forward to ``to_graphviz``. 

376 

377 Examples 

378 -------- 

379 >>> x.visualize(filename='dask.pdf') # doctest: +SKIP 

380 >>> x.visualize(filename='dask.pdf', color='order') # doctest: +SKIP 

381 

382 Returns 

383 ------- 

384 result : IPython.display.Image, IPython.display.SVG, or None 

385 See dask.dot.dot_graph for more information. 

386 

387 See Also 

388 -------- 

389 dask.visualize 

390 dask.dot.dot_graph 

391 

392 Notes 

393 ----- 

394 For more information on optimization see here: 

395 

396 https://docs.dask.org/en/latest/optimize.html 

397 

398 """ 

399 raise NotImplementedError("Inheriting class must implement this method.") 

400 

401 

402@runtime_checkable 

403class HLGDaskCollection(DaskCollection, Protocol): 

404 """Protocol defining a Dask collection that uses HighLevelGraphs. 

405 

406 This protocol is nearly identical to 

407 :py:class:`~dask.typing.DaskCollection`, with the addition of the 

408 ``__dask_layers__`` method (required for collections backed by 

409 high level graphs). 

410 

411 """ 

412 

413 @abc.abstractmethod 

414 def __dask_layers__(self) -> Sequence[str]: 

415 """Names of the HighLevelGraph layers.""" 

416 raise NotImplementedError("Inheriting class must implement this method.") 

417 

418 

419class _NoDefault(Enum): 

420 """typing-aware constant to detect when the user omits a parameter and you can't use 

421 None. 

422 

423 Copied from pandas._libs.lib._NoDefault. 

424 

425 Usage 

426 ----- 

427 from dask.typing import NoDefault, no_default 

428 

429 def f(x: int | None | NoDefault = no_default) -> int: 

430 if x is no_default: 

431 ... 

432 """ 

433 

434 no_default = "NO_DEFAULT" 

435 

436 def __repr__(self) -> str: 

437 return "<no_default>" 

438 

439 

440no_default = _NoDefault.no_default 

441NoDefault = Literal[_NoDefault.no_default]