Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/cycler/__init__.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

205 statements  

1""" 

2Cycler 

3====== 

4 

5Cycling through combinations of values, producing dictionaries. 

6 

7You can add cyclers:: 

8 

9 from cycler import cycler 

10 cc = (cycler(color=list('rgb')) + 

11 cycler(linestyle=['-', '--', '-.'])) 

12 for d in cc: 

13 print(d) 

14 

15Results in:: 

16 

17 {'color': 'r', 'linestyle': '-'} 

18 {'color': 'g', 'linestyle': '--'} 

19 {'color': 'b', 'linestyle': '-.'} 

20 

21 

22You can multiply cyclers:: 

23 

24 from cycler import cycler 

25 cc = (cycler(color=list('rgb')) * 

26 cycler(linestyle=['-', '--', '-.'])) 

27 for d in cc: 

28 print(d) 

29 

30Results in:: 

31 

32 {'color': 'r', 'linestyle': '-'} 

33 {'color': 'r', 'linestyle': '--'} 

34 {'color': 'r', 'linestyle': '-.'} 

35 {'color': 'g', 'linestyle': '-'} 

36 {'color': 'g', 'linestyle': '--'} 

37 {'color': 'g', 'linestyle': '-.'} 

38 {'color': 'b', 'linestyle': '-'} 

39 {'color': 'b', 'linestyle': '--'} 

40 {'color': 'b', 'linestyle': '-.'} 

41""" 

42 

43 

44from __future__ import annotations 

45 

46from collections.abc import Hashable, Iterable, Generator 

47import copy 

48from functools import reduce 

49from itertools import product, cycle 

50from operator import mul, add 

51# Dict, List, Union required for runtime cast calls 

52from typing import TypeVar, Generic, Callable, Union, Dict, List, Any, overload, cast 

53 

54__version__ = "0.12.1" 

55 

56K = TypeVar("K", bound=Hashable) 

57L = TypeVar("L", bound=Hashable) 

58V = TypeVar("V") 

59U = TypeVar("U") 

60 

61 

62def _process_keys( 

63 left: Cycler[K, V] | Iterable[dict[K, V]], 

64 right: Cycler[K, V] | Iterable[dict[K, V]] | None, 

65) -> set[K]: 

66 """ 

67 Helper function to compose cycler keys. 

68 

69 Parameters 

70 ---------- 

71 left, right : iterable of dictionaries or None 

72 The cyclers to be composed. 

73 

74 Returns 

75 ------- 

76 keys : set 

77 The keys in the composition of the two cyclers. 

78 """ 

79 l_peek: dict[K, V] = next(iter(left)) if left != [] else {} 

80 r_peek: dict[K, V] = next(iter(right)) if right is not None else {} 

81 l_key: set[K] = set(l_peek.keys()) 

82 r_key: set[K] = set(r_peek.keys()) 

83 if l_key & r_key: 

84 raise ValueError("Can not compose overlapping cycles") 

85 return l_key | r_key 

86 

87 

88def concat(left: Cycler[K, V], right: Cycler[K, U]) -> Cycler[K, V | U]: 

89 r""" 

90 Concatenate `Cycler`\s, as if chained using `itertools.chain`. 

91 

92 The keys must match exactly. 

93 

94 Examples 

95 -------- 

96 >>> num = cycler('a', range(3)) 

97 >>> let = cycler('a', 'abc') 

98 >>> num.concat(let) 

99 cycler('a', [0, 1, 2, 'a', 'b', 'c']) 

100 

101 Returns 

102 ------- 

103 `Cycler` 

104 The concatenated cycler. 

105 """ 

106 if left.keys != right.keys: 

107 raise ValueError( 

108 "Keys do not match:\n" 

109 "\tIntersection: {both!r}\n" 

110 "\tDisjoint: {just_one!r}".format( 

111 both=left.keys & right.keys, just_one=left.keys ^ right.keys 

112 ) 

113 ) 

114 _l = cast(Dict[K, List[Union[V, U]]], left.by_key()) 

115 _r = cast(Dict[K, List[Union[V, U]]], right.by_key()) 

116 return reduce(add, (_cycler(k, _l[k] + _r[k]) for k in left.keys)) 

117 

118 

119class Cycler(Generic[K, V]): 

120 """ 

121 Composable cycles. 

122 

123 This class has compositions methods: 

124 

125 ``+`` 

126 for 'inner' products (zip) 

127 

128 ``+=`` 

129 in-place ``+`` 

130 

131 ``*`` 

132 for outer products (`itertools.product`) and integer multiplication 

133 

134 ``*=`` 

135 in-place ``*`` 

136 

137 and supports basic slicing via ``[]``. 

138 

139 Parameters 

140 ---------- 

141 left, right : Cycler or None 

142 The 'left' and 'right' cyclers. 

143 op : func or None 

144 Function which composes the 'left' and 'right' cyclers. 

145 """ 

146 

147 def __call__(self): 

148 return cycle(self) 

149 

150 def __init__( 

151 self, 

152 left: Cycler[K, V] | Iterable[dict[K, V]] | None, 

153 right: Cycler[K, V] | None = None, 

154 op: Any = None, 

155 ): 

156 """ 

157 Semi-private init. 

158 

159 Do not use this directly, use `cycler` function instead. 

160 """ 

161 if isinstance(left, Cycler): 

162 self._left: Cycler[K, V] | list[dict[K, V]] = Cycler( 

163 left._left, left._right, left._op 

164 ) 

165 elif left is not None: 

166 # Need to copy the dictionary or else that will be a residual 

167 # mutable that could lead to strange errors 

168 self._left = [copy.copy(v) for v in left] 

169 else: 

170 self._left = [] 

171 

172 if isinstance(right, Cycler): 

173 self._right: Cycler[K, V] | None = Cycler( 

174 right._left, right._right, right._op 

175 ) 

176 else: 

177 self._right = None 

178 

179 self._keys: set[K] = _process_keys(self._left, self._right) 

180 self._op: Any = op 

181 

182 def __contains__(self, k): 

183 return k in self._keys 

184 

185 @property 

186 def keys(self) -> set[K]: 

187 """The keys this Cycler knows about.""" 

188 return set(self._keys) 

189 

190 def change_key(self, old: K, new: K) -> None: 

191 """ 

192 Change a key in this cycler to a new name. 

193 Modification is performed in-place. 

194 

195 Does nothing if the old key is the same as the new key. 

196 Raises a ValueError if the new key is already a key. 

197 Raises a KeyError if the old key isn't a key. 

198 """ 

199 if old == new: 

200 return 

201 if new in self._keys: 

202 raise ValueError( 

203 f"Can't replace {old} with {new}, {new} is already a key" 

204 ) 

205 if old not in self._keys: 

206 raise KeyError( 

207 f"Can't replace {old} with {new}, {old} is not a key" 

208 ) 

209 

210 self._keys.remove(old) 

211 self._keys.add(new) 

212 

213 if self._right is not None and old in self._right.keys: 

214 self._right.change_key(old, new) 

215 

216 # self._left should always be non-None 

217 # if self._keys is non-empty. 

218 elif isinstance(self._left, Cycler): 

219 self._left.change_key(old, new) 

220 else: 

221 # It should be completely safe at this point to 

222 # assume that the old key can be found in each 

223 # iteration. 

224 self._left = [{new: entry[old]} for entry in self._left] 

225 

226 @classmethod 

227 def _from_iter(cls, label: K, itr: Iterable[V]) -> Cycler[K, V]: 

228 """ 

229 Class method to create 'base' Cycler objects 

230 that do not have a 'right' or 'op' and for which 

231 the 'left' object is not another Cycler. 

232 

233 Parameters 

234 ---------- 

235 label : hashable 

236 The property key. 

237 

238 itr : iterable 

239 Finite length iterable of the property values. 

240 

241 Returns 

242 ------- 

243 `Cycler` 

244 New 'base' cycler. 

245 """ 

246 ret: Cycler[K, V] = cls(None) 

247 ret._left = list({label: v} for v in itr) 

248 ret._keys = {label} 

249 return ret 

250 

251 def __getitem__(self, key: slice) -> Cycler[K, V]: 

252 # TODO : maybe add numpy style fancy slicing 

253 if isinstance(key, slice): 

254 trans = self.by_key() 

255 return reduce(add, (_cycler(k, v[key]) for k, v in trans.items())) 

256 else: 

257 raise ValueError("Can only use slices with Cycler.__getitem__") 

258 

259 def __iter__(self) -> Generator[dict[K, V], None, None]: 

260 if self._right is None: 

261 for left in self._left: 

262 yield dict(left) 

263 else: 

264 if self._op is None: 

265 raise TypeError( 

266 "Operation cannot be None when both left and right are defined" 

267 ) 

268 for a, b in self._op(self._left, self._right): 

269 out = {} 

270 out.update(a) 

271 out.update(b) 

272 yield out 

273 

274 def __add__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: 

275 """ 

276 Pair-wise combine two equal length cyclers (zip). 

277 

278 Parameters 

279 ---------- 

280 other : Cycler 

281 """ 

282 if len(self) != len(other): 

283 raise ValueError( 

284 f"Can only add equal length cycles, not {len(self)} and {len(other)}" 

285 ) 

286 return Cycler( 

287 cast(Cycler[Union[K, L], Union[V, U]], self), 

288 cast(Cycler[Union[K, L], Union[V, U]], other), 

289 zip 

290 ) 

291 

292 @overload 

293 def __mul__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: 

294 ... 

295 

296 @overload 

297 def __mul__(self, other: int) -> Cycler[K, V]: 

298 ... 

299 

300 def __mul__(self, other): 

301 """ 

302 Outer product of two cyclers (`itertools.product`) or integer 

303 multiplication. 

304 

305 Parameters 

306 ---------- 

307 other : Cycler or int 

308 """ 

309 if isinstance(other, Cycler): 

310 return Cycler( 

311 cast(Cycler[Union[K, L], Union[V, U]], self), 

312 cast(Cycler[Union[K, L], Union[V, U]], other), 

313 product 

314 ) 

315 elif isinstance(other, int): 

316 trans = self.by_key() 

317 return reduce( 

318 add, (_cycler(k, v * other) for k, v in trans.items()) 

319 ) 

320 else: 

321 return NotImplemented 

322 

323 @overload 

324 def __rmul__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: 

325 ... 

326 

327 @overload 

328 def __rmul__(self, other: int) -> Cycler[K, V]: 

329 ... 

330 

331 def __rmul__(self, other): 

332 return self * other 

333 

334 def __len__(self) -> int: 

335 op_dict: dict[Callable, Callable[[int, int], int]] = {zip: min, product: mul} 

336 if self._right is None: 

337 return len(self._left) 

338 l_len = len(self._left) 

339 r_len = len(self._right) 

340 return op_dict[self._op](l_len, r_len) 

341 

342 # iadd and imul do not exapand the the type as the returns must be consistent with 

343 # self, thus they flag as inconsistent with add/mul 

344 def __iadd__(self, other: Cycler[K, V]) -> Cycler[K, V]: # type: ignore[misc] 

345 """ 

346 In-place pair-wise combine two equal length cyclers (zip). 

347 

348 Parameters 

349 ---------- 

350 other : Cycler 

351 """ 

352 if not isinstance(other, Cycler): 

353 raise TypeError("Cannot += with a non-Cycler object") 

354 # True shallow copy of self is fine since this is in-place 

355 old_self = copy.copy(self) 

356 self._keys = _process_keys(old_self, other) 

357 self._left = old_self 

358 self._op = zip 

359 self._right = Cycler(other._left, other._right, other._op) 

360 return self 

361 

362 def __imul__(self, other: Cycler[K, V] | int) -> Cycler[K, V]: # type: ignore[misc] 

363 """ 

364 In-place outer product of two cyclers (`itertools.product`). 

365 

366 Parameters 

367 ---------- 

368 other : Cycler 

369 """ 

370 if not isinstance(other, Cycler): 

371 raise TypeError("Cannot *= with a non-Cycler object") 

372 # True shallow copy of self is fine since this is in-place 

373 old_self = copy.copy(self) 

374 self._keys = _process_keys(old_self, other) 

375 self._left = old_self 

376 self._op = product 

377 self._right = Cycler(other._left, other._right, other._op) 

378 return self 

379 

380 def __eq__(self, other: object) -> bool: 

381 if not isinstance(other, Cycler): 

382 return False 

383 if len(self) != len(other): 

384 return False 

385 if self.keys ^ other.keys: 

386 return False 

387 return all(a == b for a, b in zip(self, other)) 

388 

389 __hash__ = None # type: ignore 

390 

391 def __repr__(self) -> str: 

392 op_map = {zip: "+", product: "*"} 

393 if self._right is None: 

394 lab = self.keys.pop() 

395 itr = list(v[lab] for v in self) 

396 return f"cycler({lab!r}, {itr!r})" 

397 else: 

398 op = op_map.get(self._op, "?") 

399 msg = "({left!r} {op} {right!r})" 

400 return msg.format(left=self._left, op=op, right=self._right) 

401 

402 def _repr_html_(self) -> str: 

403 # an table showing the value of each key through a full cycle 

404 output = "<table>" 

405 sorted_keys = sorted(self.keys, key=repr) 

406 for key in sorted_keys: 

407 output += f"<th>{key!r}</th>" 

408 for d in iter(self): 

409 output += "<tr>" 

410 for k in sorted_keys: 

411 output += f"<td>{d[k]!r}</td>" 

412 output += "</tr>" 

413 output += "</table>" 

414 return output 

415 

416 def by_key(self) -> dict[K, list[V]]: 

417 """ 

418 Values by key. 

419 

420 This returns the transposed values of the cycler. Iterating 

421 over a `Cycler` yields dicts with a single value for each key, 

422 this method returns a `dict` of `list` which are the values 

423 for the given key. 

424 

425 The returned value can be used to create an equivalent `Cycler` 

426 using only `+`. 

427 

428 Returns 

429 ------- 

430 transpose : dict 

431 dict of lists of the values for each key. 

432 """ 

433 

434 # TODO : sort out if this is a bottle neck, if there is a better way 

435 # and if we care. 

436 

437 keys = self.keys 

438 out: dict[K, list[V]] = {k: list() for k in keys} 

439 

440 for d in self: 

441 for k in keys: 

442 out[k].append(d[k]) 

443 return out 

444 

445 # for back compatibility 

446 _transpose = by_key 

447 

448 def simplify(self) -> Cycler[K, V]: 

449 """ 

450 Simplify the cycler into a sum (but no products) of cyclers. 

451 

452 Returns 

453 ------- 

454 simple : Cycler 

455 """ 

456 # TODO: sort out if it is worth the effort to make sure this is 

457 # balanced. Currently it is is 

458 # (((a + b) + c) + d) vs 

459 # ((a + b) + (c + d)) 

460 # I would believe that there is some performance implications 

461 trans = self.by_key() 

462 return reduce(add, (_cycler(k, v) for k, v in trans.items())) 

463 

464 concat = concat 

465 

466 

467@overload 

468def cycler(arg: Cycler[K, V]) -> Cycler[K, V]: 

469 ... 

470 

471 

472@overload 

473def cycler(**kwargs: Iterable[V]) -> Cycler[str, V]: 

474 ... 

475 

476 

477@overload 

478def cycler(label: K, itr: Iterable[V]) -> Cycler[K, V]: 

479 ... 

480 

481 

482def cycler(*args, **kwargs): 

483 """ 

484 Create a new `Cycler` object from a single positional argument, 

485 a pair of positional arguments, or the combination of keyword arguments. 

486 

487 cycler(arg) 

488 cycler(label1=itr1[, label2=iter2[, ...]]) 

489 cycler(label, itr) 

490 

491 Form 1 simply copies a given `Cycler` object. 

492 

493 Form 2 composes a `Cycler` as an inner product of the 

494 pairs of keyword arguments. In other words, all of the 

495 iterables are cycled simultaneously, as if through zip(). 

496 

497 Form 3 creates a `Cycler` from a label and an iterable. 

498 This is useful for when the label cannot be a keyword argument 

499 (e.g., an integer or a name that has a space in it). 

500 

501 Parameters 

502 ---------- 

503 arg : Cycler 

504 Copy constructor for Cycler (does a shallow copy of iterables). 

505 label : name 

506 The property key. In the 2-arg form of the function, 

507 the label can be any hashable object. In the keyword argument 

508 form of the function, it must be a valid python identifier. 

509 itr : iterable 

510 Finite length iterable of the property values. 

511 Can be a single-property `Cycler` that would 

512 be like a key change, but as a shallow copy. 

513 

514 Returns 

515 ------- 

516 cycler : Cycler 

517 New `Cycler` for the given property 

518 

519 """ 

520 if args and kwargs: 

521 raise TypeError( 

522 "cycler() can only accept positional OR keyword arguments -- not both." 

523 ) 

524 

525 if len(args) == 1: 

526 if not isinstance(args[0], Cycler): 

527 raise TypeError( 

528 "If only one positional argument given, it must " 

529 "be a Cycler instance." 

530 ) 

531 return Cycler(args[0]) 

532 elif len(args) == 2: 

533 return _cycler(*args) 

534 elif len(args) > 2: 

535 raise TypeError( 

536 "Only a single Cycler can be accepted as the lone " 

537 "positional argument. Use keyword arguments instead." 

538 ) 

539 

540 if kwargs: 

541 return reduce(add, (_cycler(k, v) for k, v in kwargs.items())) 

542 

543 raise TypeError("Must have at least a positional OR keyword arguments") 

544 

545 

546def _cycler(label: K, itr: Iterable[V]) -> Cycler[K, V]: 

547 """ 

548 Create a new `Cycler` object from a property name and iterable of values. 

549 

550 Parameters 

551 ---------- 

552 label : hashable 

553 The property key. 

554 itr : iterable 

555 Finite length iterable of the property values. 

556 

557 Returns 

558 ------- 

559 cycler : Cycler 

560 New `Cycler` for the given property 

561 """ 

562 if isinstance(itr, Cycler): 

563 keys = itr.keys 

564 if len(keys) != 1: 

565 msg = "Can not create Cycler from a multi-property Cycler" 

566 raise ValueError(msg) 

567 

568 lab = keys.pop() 

569 # Doesn't need to be a new list because 

570 # _from_iter() will be creating that new list anyway. 

571 itr = (v[lab] for v in itr) 

572 

573 return Cycler._from_iter(label, itr)