Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/croniter/croniter.py: 84%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

728 statements  

1#!/usr/bin/env python 

2import binascii 

3import calendar 

4import copy 

5import datetime 

6import math 

7import platform 

8import random 

9import re 

10import struct 

11import sys 

12import traceback as _traceback 

13from time import time 

14from typing import Any, Literal, Optional, Union 

15 

16from dateutil.relativedelta import relativedelta 

17from dateutil.tz import datetime_exists, tzutc 

18 

19ExpandedExpression = list[Union[int, Literal["*", "l"]]] 

20 

21 

22def is_32bit() -> bool: 

23 """ 

24 Detect if Python is running in 32-bit mode. 

25 Returns True if running on 32-bit Python, False for 64-bit. 

26 """ 

27 # Method 1: Check pointer size 

28 bits = struct.calcsize("P") * 8 

29 

30 # Method 2: Check platform architecture string 

31 try: 

32 architecture = platform.architecture()[0] 

33 except RuntimeError: 

34 architecture = None 

35 

36 # Method 3: Check maxsize 

37 is_small_maxsize = sys.maxsize <= 2**32 

38 

39 # Evaluate all available methods 

40 is_32 = False 

41 

42 if bits == 32: 

43 is_32 = True 

44 elif architecture and "32" in architecture: 

45 is_32 = True 

46 elif is_small_maxsize: 

47 is_32 = True 

48 

49 return is_32 

50 

51 

52try: 

53 # https://github.com/python/cpython/issues/101069 detection 

54 if is_32bit(): 

55 datetime.datetime.fromtimestamp(3999999999) 

56 OVERFLOW32B_MODE = False 

57except OverflowError: 

58 OVERFLOW32B_MODE = True 

59 

60 

61UTC_DT = datetime.timezone.utc 

62EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT) 

63 

64M_ALPHAS: dict[str, Union[int, str]] = { 

65 "jan": 1, 

66 "feb": 2, 

67 "mar": 3, 

68 "apr": 4, # noqa: E241 

69 "may": 5, 

70 "jun": 6, 

71 "jul": 7, 

72 "aug": 8, # noqa: E241 

73 "sep": 9, 

74 "oct": 10, 

75 "nov": 11, 

76 "dec": 12, 

77} 

78DOW_ALPHAS: dict[str, Union[int, str]] = { 

79 "sun": 0, 

80 "mon": 1, 

81 "tue": 2, 

82 "wed": 3, 

83 "thu": 4, 

84 "fri": 5, 

85 "sat": 6, 

86} 

87 

88MINUTE_FIELD = 0 

89HOUR_FIELD = 1 

90DAY_FIELD = 2 

91MONTH_FIELD = 3 

92DOW_FIELD = 4 

93SECOND_FIELD = 5 

94YEAR_FIELD = 6 

95 

96UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD) 

97SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD) 

98YEAR_FIELDS = ( 

99 MINUTE_FIELD, 

100 HOUR_FIELD, 

101 DAY_FIELD, 

102 MONTH_FIELD, 

103 DOW_FIELD, 

104 SECOND_FIELD, 

105 YEAR_FIELD, 

106) 

107 

108step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$") 

109only_int_re = re.compile(r"^\d+$") 

110 

111DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31) 

112WEEKDAYS = "|".join(DOW_ALPHAS.keys()) 

113MONTHS = "|".join(M_ALPHAS.keys()) 

114star_or_int_re = re.compile(r"^(\d+|\*)$") 

115special_dow_re = re.compile( 

116 rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)" 

117 rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$" 

118) 

119re_star = re.compile("[*]") 

120hash_expression_re = re.compile( 

121 r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$" 

122) 

123 

124CRON_FIELDS = { 

125 "unix": UNIX_FIELDS, 

126 "second": SECOND_FIELDS, 

127 "year": YEAR_FIELDS, 

128 len(UNIX_FIELDS): UNIX_FIELDS, 

129 len(SECOND_FIELDS): SECOND_FIELDS, 

130 len(YEAR_FIELDS): YEAR_FIELDS, 

131} 

132UNIX_CRON_LEN = len(UNIX_FIELDS) 

133SECOND_CRON_LEN = len(SECOND_FIELDS) 

134YEAR_CRON_LEN = len(YEAR_FIELDS) 

135# retrocompat 

136VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)} 

137EXPRESSIONS: dict[tuple[str, Optional[bytes], bool], list[str]] = {} 

138MARKER = object() 

139 

140 

141def datetime_to_timestamp(d): 

142 if d.tzinfo is not None: 

143 d = d.replace(tzinfo=None) - d.utcoffset() 

144 

145 return (d - datetime.datetime(1970, 1, 1)).total_seconds() 

146 

147 

148def _is_leap(year: int) -> bool: 

149 return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0) 

150 

151 

152def _last_day_of_month(year: int, month: int) -> int: 

153 """Calculate the last day of the given month (honor leap years).""" 

154 last_day = DAYS[month - 1] 

155 if month == 2 and _is_leap(year): 

156 last_day += 1 

157 return last_day 

158 

159 

160def _is_successor( 

161 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool 

162) -> bool: 

163 """Check if the given date is a successor (after/before) of the previous date.""" 

164 if is_prev: 

165 return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT) 

166 return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT) 

167 

168 

169def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta: 

170 """Calculate the timezone difference of the given dates.""" 

171 offset1 = date1.utcoffset() 

172 offset2 = date2.utcoffset() 

173 assert offset1 is not None 

174 assert offset2 is not None 

175 return offset2 - offset1 

176 

177 

178def _add_tzinfo( 

179 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool 

180) -> tuple[datetime.datetime, bool]: 

181 """Add the tzinfo from the previous date to the given date. 

182 

183 In case the new date is ambiguous, determine the correct date 

184 based on it being closer to the previous date but still a successor 

185 (after/before based on `is_prev`). 

186 

187 In case the date does not exist, jump forward to the next existing date. 

188 """ 

189 localize = getattr(previous_date.tzinfo, "localize", None) 

190 if localize is not None: 

191 # pylint: disable-next=import-outside-toplevel 

192 import pytz 

193 

194 try: 

195 result = localize(date, is_dst=None) 

196 except pytz.NonExistentTimeError: 

197 while True: 

198 date += datetime.timedelta(minutes=1) 

199 try: 

200 result = localize(date, is_dst=None) 

201 except pytz.NonExistentTimeError: 

202 continue 

203 break 

204 return result, False 

205 except pytz.AmbiguousTimeError: 

206 closer = localize(date, is_dst=not is_prev) 

207 farther = localize(date, is_dst=is_prev) 

208 # TODO: Check negative DST 

209 assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev 

210 if _is_successor(closer, previous_date, is_prev): 

211 result = closer 

212 else: 

213 assert _is_successor(farther, previous_date, is_prev) 

214 result = farther 

215 return result, True 

216 

217 result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo) 

218 if not datetime_exists(result): 

219 while not datetime_exists(result): 

220 result += datetime.timedelta(minutes=1) 

221 return result, False 

222 

223 # result is closer to the previous date 

224 farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo) 

225 # Comparing the UTC offsets in the check for the date being ambiguous. 

226 if result.utcoffset() != farther.utcoffset(): 

227 # TODO: Check negative DST 

228 assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev 

229 if not _is_successor(result, previous_date, is_prev): 

230 assert _is_successor(farther, previous_date, is_prev) 

231 result = farther 

232 return result, True 

233 

234 

235class CroniterError(ValueError): 

236 """General top-level Croniter base exception""" 

237 

238 

239class CroniterBadTypeRangeError(TypeError): 

240 """.""" 

241 

242 

243class CroniterBadCronError(CroniterError): 

244 """Syntax, unknown value, or range error within a cron expression""" 

245 

246 

247class CroniterUnsupportedSyntaxError(CroniterBadCronError): 

248 """Valid cron syntax, but likely to produce inaccurate results""" 

249 

250 # Extending CroniterBadCronError, which may be contridatory, but this allows 

251 # catching both errors with a single exception. From a user perspective 

252 # these will likely be handled the same way. 

253 

254 

255class CroniterBadDateError(CroniterError): 

256 """Unable to find next/prev timestamp match""" 

257 

258 

259class CroniterNotAlphaError(CroniterBadCronError): 

260 """Cron syntax contains an invalid day or month abbreviation""" 

261 

262 

263class croniter: 

264 MONTHS_IN_YEAR = 12 

265 

266 # This helps with expanding `*` fields into `lower-upper` ranges. Each item 

267 # in this tuple maps to the corresponding field index 

268 RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099)) 

269 

270 ALPHACONV: tuple[dict[str, Union[int, str]], ...] = ( 

271 {}, # 0: min 

272 {}, # 1: hour 

273 {"l": "l"}, # 2: dom 

274 # 3: mon 

275 copy.deepcopy(M_ALPHAS), 

276 # 4: dow 

277 copy.deepcopy(DOW_ALPHAS), 

278 # 5: second 

279 {}, 

280 # 6: year 

281 {}, 

282 ) 

283 

284 LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {}) 

285 

286 LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130) 

287 

288 def __init__( 

289 self, 

290 expr_format: str, 

291 start_time: Optional[Union[datetime.datetime, float]] = None, 

292 ret_type: type = float, 

293 day_or: bool = True, 

294 max_years_between_matches: Optional[int] = None, 

295 is_prev: bool = False, 

296 hash_id: Optional[Union[bytes, str]] = None, 

297 implement_cron_bug: bool = False, 

298 second_at_beginning: bool = False, 

299 expand_from_start_time: bool = False, 

300 ) -> None: 

301 self._ret_type = ret_type 

302 self._day_or = day_or 

303 self._implement_cron_bug = implement_cron_bug 

304 self.second_at_beginning = bool(second_at_beginning) 

305 self._expand_from_start_time = expand_from_start_time 

306 

307 if hash_id is not None: 

308 if not isinstance(hash_id, (bytes, str)): 

309 raise TypeError("hash_id must be bytes or UTF-8 string") 

310 if not isinstance(hash_id, bytes): 

311 hash_id = hash_id.encode("UTF-8") 

312 

313 self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None 

314 if max_years_between_matches is None: 

315 max_years_between_matches = 50 

316 self._max_years_between_matches = max(int(max_years_between_matches), 1) 

317 

318 if start_time is None: 

319 start_time = time() 

320 

321 self.tzinfo: Optional[datetime.tzinfo] = None 

322 

323 self.start_time = 0.0 

324 self.dst_start_time = 0.0 

325 self.cur = 0.0 

326 self.set_current(start_time, force=True) 

327 

328 self.expanded, self.nth_weekday_of_month = self.expand( 

329 expr_format, 

330 hash_id=hash_id, 

331 from_timestamp=self.dst_start_time if self._expand_from_start_time else None, 

332 second_at_beginning=second_at_beginning, 

333 ) 

334 self.fields = CRON_FIELDS[len(self.expanded)] 

335 self.expressions = EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] 

336 self._is_prev = is_prev 

337 

338 @classmethod 

339 def _alphaconv(cls, index, key, expressions): 

340 try: 

341 return cls.ALPHACONV[index][key] 

342 except KeyError: 

343 raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable") 

344 

345 def get_next(self, ret_type=None, start_time=None, update_current=True): 

346 if start_time and self._expand_from_start_time: 

347 raise ValueError( 

348 "start_time is not supported when using expand_from_start_time = True." 

349 ) 

350 return self._get_next( 

351 ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current 

352 ) 

353 

354 def get_prev(self, ret_type=None, start_time=None, update_current=True): 

355 return self._get_next( 

356 ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current 

357 ) 

358 

359 def get_current(self, ret_type=None): 

360 ret_type = ret_type or self._ret_type 

361 if issubclass(ret_type, datetime.datetime): 

362 return self.timestamp_to_datetime(self.cur) 

363 return self.cur 

364 

365 def set_current( 

366 self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True 

367 ) -> float: 

368 if (force or (self.cur is None)) and start_time is not None: 

369 if isinstance(start_time, datetime.datetime): 

370 self.tzinfo = start_time.tzinfo 

371 start_time = self.datetime_to_timestamp(start_time) 

372 

373 self.start_time = start_time 

374 self.dst_start_time = start_time 

375 self.cur = start_time 

376 return self.cur 

377 

378 @staticmethod 

379 def datetime_to_timestamp(d: datetime.datetime) -> float: 

380 """ 

381 Converts a `datetime` object `d` into a UNIX timestamp. 

382 """ 

383 return datetime_to_timestamp(d) 

384 

385 _datetime_to_timestamp = datetime_to_timestamp # retrocompat 

386 

387 def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime: 

388 """ 

389 Converts a UNIX `timestamp` into a `datetime` object. 

390 """ 

391 if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set 

392 tzinfo = self.tzinfo 

393 if OVERFLOW32B_MODE: 

394 # degraded mode to workaround Y2038 

395 # see https://github.com/python/cpython/issues/101069 

396 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp) 

397 else: 

398 result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None) 

399 if tzinfo: 

400 result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo) 

401 return result 

402 

403 _timestamp_to_datetime = timestamp_to_datetime # retrocompat 

404 

405 def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None): 

406 if update_current is None: 

407 update_current = True 

408 self.set_current(start_time, force=True) 

409 if is_prev is None: 

410 is_prev = self._is_prev 

411 self._is_prev = is_prev 

412 

413 ret_type = ret_type or self._ret_type 

414 

415 if not issubclass(ret_type, (float, datetime.datetime)): 

416 raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.") 

417 

418 result = self._calc_next(is_prev) 

419 timestamp = self.datetime_to_timestamp(result) 

420 if update_current: 

421 self.cur = timestamp 

422 if issubclass(ret_type, datetime.datetime): 

423 return result 

424 return timestamp 

425 

426 # iterator protocol, to enable direct use of croniter 

427 # objects in a loop, like "for dt in croniter("5 0 * * *'): ..." 

428 # or for combining multiple croniters into single 

429 # dates feed using 'itertools' module 

430 def all_next(self, ret_type=None, start_time=None, update_current=None): 

431 """ 

432 Returns a generator yielding consecutive dates. 

433 

434 May be used instead of an implicit call to __iter__ whenever a 

435 non-default `ret_type` needs to be specified. 

436 """ 

437 # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could 

438 # be used instead 

439 try: 

440 while True: 

441 self._is_prev = False 

442 yield self._get_next( 

443 ret_type=ret_type, start_time=start_time, update_current=update_current 

444 ) 

445 start_time = None 

446 except CroniterBadDateError: 

447 if self._max_years_btw_matches_explicitly_set: 

448 return 

449 raise 

450 

451 def all_prev(self, ret_type=None, start_time=None, update_current=None): 

452 """ 

453 Returns a generator yielding previous dates. 

454 """ 

455 try: 

456 while True: 

457 self._is_prev = True 

458 yield self._get_next( 

459 ret_type=ret_type, start_time=start_time, update_current=update_current 

460 ) 

461 start_time = None 

462 except CroniterBadDateError: 

463 if self._max_years_btw_matches_explicitly_set: 

464 return 

465 raise 

466 

467 def iter(self, *args, **kwargs): 

468 return self.all_prev if self._is_prev else self.all_next 

469 

470 def __iter__(self): 

471 return self 

472 

473 __next__ = next = _get_next 

474 

475 def _calc_next(self, is_prev: bool) -> datetime.datetime: 

476 current = self.timestamp_to_datetime(self.cur) 

477 expanded = self.expanded[:] 

478 nth_weekday_of_month = self.nth_weekday_of_month.copy() 

479 

480 # exception to support day of month and day of week as defined in cron 

481 if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or: 

482 # If requested, handle a bug in vixie cron/ISC cron where day_of_month and 

483 # day_of_week form an intersection (AND) instead of a union (OR) if either 

484 # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html) 

485 if self._implement_cron_bug and ( 

486 re_star.match(self.expressions[DAY_FIELD]) 

487 or re_star.match(self.expressions[DOW_FIELD]) 

488 ): 

489 # To produce a schedule identical to the cron bug, we'll bypass the code 

490 # that makes a union of DOM and DOW, and instead skip to the code that 

491 # does an intersect instead 

492 pass 

493 else: 

494 bak = expanded[DOW_FIELD] 

495 expanded[DOW_FIELD] = ["*"] 

496 t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev) 

497 expanded[DOW_FIELD] = bak 

498 expanded[DAY_FIELD] = ["*"] 

499 

500 t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev) 

501 if is_prev: 

502 return t1 if t1 > t2 else t2 

503 return t1 if t1 < t2 else t2 

504 

505 return self._calc(current, expanded, nth_weekday_of_month, is_prev) 

506 

507 def _calc( 

508 self, 

509 now: datetime.datetime, 

510 expanded: list[ExpandedExpression], 

511 nth_weekday_of_month: dict[int, set[int]], 

512 is_prev: bool, 

513 ) -> datetime.datetime: 

514 if is_prev: 

515 nearest_diff_method = self._get_prev_nearest_diff 

516 offset = relativedelta(microseconds=-1) 

517 else: 

518 nearest_diff_method = self._get_next_nearest_diff 

519 if len(expanded) > UNIX_CRON_LEN: 

520 offset = relativedelta(seconds=1) 

521 else: 

522 offset = relativedelta(minutes=1) 

523 # Calculate the next cron time in local time a.k.a. timezone unaware time. 

524 unaware_time = now.replace(tzinfo=None) + offset 

525 if len(expanded) > UNIX_CRON_LEN: 

526 unaware_time = unaware_time.replace(microsecond=0) 

527 else: 

528 unaware_time = unaware_time.replace(second=0, microsecond=0) 

529 

530 month = unaware_time.month 

531 year = current_year = unaware_time.year 

532 

533 def proc_year(d): 

534 if len(expanded) == YEAR_CRON_LEN: 

535 try: 

536 expanded[YEAR_FIELD].index("*") 

537 except ValueError: 

538 # use None as range_val to indicate no loop 

539 diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None) 

540 if diff_year is None: 

541 return None, d 

542 if diff_year != 0: 

543 if is_prev: 

544 d += relativedelta( 

545 years=diff_year, month=12, day=31, hour=23, minute=59, second=59 

546 ) 

547 else: 

548 d += relativedelta( 

549 years=diff_year, month=1, day=1, hour=0, minute=0, second=0 

550 ) 

551 return True, d 

552 return False, d 

553 

554 def proc_month(d): 

555 try: 

556 expanded[MONTH_FIELD].index("*") 

557 except ValueError: 

558 diff_month = nearest_diff_method( 

559 d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR 

560 ) 

561 reset_day = 1 

562 

563 if diff_month is not None and diff_month != 0: 

564 if is_prev: 

565 d += relativedelta(months=diff_month) 

566 reset_day = _last_day_of_month(d.year, d.month) 

567 d += relativedelta(day=reset_day, hour=23, minute=59, second=59) 

568 else: 

569 d += relativedelta( 

570 months=diff_month, day=reset_day, hour=0, minute=0, second=0 

571 ) 

572 return True, d 

573 return False, d 

574 

575 def proc_day_of_month(d): 

576 try: 

577 expanded[DAY_FIELD].index("*") 

578 except ValueError: 

579 days = _last_day_of_month(year, month) 

580 if "l" in expanded[DAY_FIELD] and days == d.day: 

581 return False, d 

582 

583 if is_prev: 

584 days_in_prev_month = DAYS[(month - 2) % self.MONTHS_IN_YEAR] 

585 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month) 

586 else: 

587 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days) 

588 

589 if diff_day is not None and diff_day != 0: 

590 if is_prev: 

591 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

592 else: 

593 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

594 return True, d 

595 return False, d 

596 

597 def proc_day_of_week(d): 

598 try: 

599 expanded[DOW_FIELD].index("*") 

600 except ValueError: 

601 diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7) 

602 if diff_day_of_week is not None and diff_day_of_week != 0: 

603 if is_prev: 

604 d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59) 

605 else: 

606 d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0) 

607 return True, d 

608 return False, d 

609 

610 def proc_day_of_week_nth(d): 

611 if "*" in nth_weekday_of_month: 

612 s = nth_weekday_of_month["*"] 

613 for i in range(0, 7): 

614 if i in nth_weekday_of_month: 

615 nth_weekday_of_month[i].update(s) 

616 else: 

617 nth_weekday_of_month[i] = s 

618 del nth_weekday_of_month["*"] 

619 

620 candidates = [] 

621 for wday, nth in nth_weekday_of_month.items(): 

622 c = self._get_nth_weekday_of_month(d.year, d.month, wday) 

623 for n in nth: 

624 if n == "l": 

625 candidate = c[-1] 

626 elif len(c) < n: 

627 continue 

628 else: 

629 candidate = c[n - 1] 

630 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate): 

631 candidates.append(candidate) 

632 

633 if not candidates: 

634 if is_prev: 

635 d += relativedelta(days=-d.day, hour=23, minute=59, second=59) 

636 else: 

637 days = _last_day_of_month(year, month) 

638 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0) 

639 return True, d 

640 

641 candidates.sort() 

642 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day 

643 if diff_day != 0: 

644 if is_prev: 

645 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

646 else: 

647 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

648 return True, d 

649 return False, d 

650 

651 def proc_hour(d): 

652 try: 

653 expanded[HOUR_FIELD].index("*") 

654 except ValueError: 

655 diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24) 

656 if diff_hour is not None and diff_hour != 0: 

657 if is_prev: 

658 d += relativedelta(hours=diff_hour, minute=59, second=59) 

659 else: 

660 d += relativedelta(hours=diff_hour, minute=0, second=0) 

661 return True, d 

662 return False, d 

663 

664 def proc_minute(d): 

665 try: 

666 expanded[MINUTE_FIELD].index("*") 

667 except ValueError: 

668 diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60) 

669 if diff_min is not None and diff_min != 0: 

670 if is_prev: 

671 d += relativedelta(minutes=diff_min, second=59) 

672 else: 

673 d += relativedelta(minutes=diff_min, second=0) 

674 return True, d 

675 return False, d 

676 

677 def proc_second(d): 

678 if len(expanded) > UNIX_CRON_LEN: 

679 try: 

680 expanded[SECOND_FIELD].index("*") 

681 except ValueError: 

682 diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60) 

683 if diff_sec is not None and diff_sec != 0: 

684 d += relativedelta(seconds=diff_sec) 

685 return True, d 

686 else: 

687 d += relativedelta(second=0) 

688 return False, d 

689 

690 procs = [ 

691 proc_year, 

692 proc_month, 

693 proc_day_of_month, 

694 (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week), 

695 proc_hour, 

696 proc_minute, 

697 proc_second, 

698 ] 

699 

700 while abs(year - current_year) <= self._max_years_between_matches: 

701 next = False 

702 stop = False 

703 for proc in procs: 

704 (changed, unaware_time) = proc(unaware_time) 

705 # `None` can be set mostly for year processing 

706 # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff 

707 if changed is None: 

708 stop = True 

709 break 

710 if changed: 

711 month, year = unaware_time.month, unaware_time.year 

712 next = True 

713 break 

714 if stop: 

715 break 

716 if next: 

717 continue 

718 

719 unaware_time = unaware_time.replace(microsecond=0) 

720 if now.tzinfo is None: 

721 return unaware_time 

722 

723 # Add timezone information back and handle DST changes 

724 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev) 

725 

726 if not exists and ( 

727 not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD] 

728 ): 

729 # The calculated local date does not exist and moving the time forward 

730 # to the next valid time isn't the correct solution. Search for the 

731 # next matching cron time that exists. 

732 while not exists: 

733 unaware_time = self._calc( 

734 unaware_time, expanded, nth_weekday_of_month, is_prev 

735 ) 

736 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev) 

737 

738 offset_delta = _timezone_delta(now, aware_time) 

739 if not offset_delta: 

740 # There was no DST change. 

741 return aware_time 

742 

743 # There was a DST change. So check if there is a alternative cron time 

744 # for the other UTC offset. 

745 alternative_unaware_time = now.replace(tzinfo=None) + offset_delta 

746 alternative_unaware_time = self._calc( 

747 alternative_unaware_time, expanded, nth_weekday_of_month, is_prev 

748 ) 

749 alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev) 

750 

751 if not _is_successor(alternative_aware_time, now, is_prev): 

752 # The alternative time is an ancestor of now. Thus it is not an alternative. 

753 return aware_time 

754 

755 if _is_successor(aware_time, alternative_aware_time, is_prev): 

756 return alternative_aware_time 

757 

758 return aware_time 

759 

760 if is_prev: 

761 raise CroniterBadDateError("failed to find prev date") 

762 raise CroniterBadDateError("failed to find next date") 

763 

764 @staticmethod 

765 def _get_next_nearest_diff(x, to_check, range_val): 

766 """ 

767 `range_val` is the range of a field. 

768 If no available time, we can move to next loop(like next month). 

769 `range_val` can also be set to `None` to indicate that there is no loop. 

770 ( Currently, should only used for `year` field ) 

771 """ 

772 for i, d in enumerate(to_check): 

773 if range_val is not None: 

774 if d == "l": 

775 # if 'l' then it is the last day of month 

776 # => its value of range_val 

777 d = range_val 

778 elif d > range_val: 

779 continue 

780 if d >= x: 

781 return d - x 

782 # When range_val is None and x not exists in to_check, 

783 # `None` will be returned to suggest no more available time 

784 if range_val is None: 

785 return None 

786 return to_check[0] - x + range_val 

787 

788 @staticmethod 

789 def _get_prev_nearest_diff(x, to_check, range_val): 

790 """ 

791 `range_val` is the range of a field. 

792 If no available time, we can move to previous loop(like previous month). 

793 Range_val can also be set to `None` to indicate that there is no loop. 

794 ( Currently should only used for `year` field ) 

795 """ 

796 candidates = to_check[:] 

797 candidates.reverse() 

798 for d in candidates: 

799 if d != "l" and d <= x: 

800 return d - x 

801 if "l" in candidates: 

802 return -x 

803 # When range_val is None and x not exists in to_check, 

804 # `None` will be returned to suggest no more available time 

805 if range_val is None: 

806 return None 

807 candidate = candidates[0] 

808 for c in candidates: 

809 # fixed: c < range_val 

810 # this code will reject all 31 day of month, 12 month, 59 second, 

811 # 23 hour and so on. 

812 # if candidates has just a element, this will not harmful. 

813 # but candidates have multiple elements, then values equal to 

814 # range_val will rejected. 

815 if c <= range_val: 

816 candidate = c 

817 break 

818 # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error 

819 # return 2021-03-02 06:00:00 

820 if candidate > range_val: 

821 return -range_val 

822 return candidate - x - range_val 

823 

824 @staticmethod 

825 def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]: 

826 """For a given year/month return a list of days in nth-day-of-month order. 

827 The last weekday of the month is always [-1]. 

828 """ 

829 w = (day_of_week + 6) % 7 

830 c = calendar.Calendar(w).monthdayscalendar(year, month) 

831 if c[0][0] == 0: 

832 c.pop(0) 

833 return tuple(i[0] for i in c) 

834 

835 @classmethod 

836 def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN): 

837 if isinstance(len_expressions, (list, dict, tuple, set)): 

838 len_expressions = len(len_expressions) 

839 if val in cls.LOWMAP[field_index] and not ( 

840 # do not support 0 as a month either for classical 5 fields cron, 

841 # 6fields second repeat form or 7 fields year form 

842 # but still let conversion happen if day field is shifted 

843 (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN) 

844 or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN) 

845 or ( 

846 field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD] 

847 and len_expressions == YEAR_CRON_LEN 

848 ) 

849 ): 

850 val = cls.LOWMAP[field_index][val] 

851 return val 

852 

853 @classmethod 

854 def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None): 

855 # Split the expression in components, and normalize L -> l, MON -> mon, 

856 # etc. Keep expr_format untouched so we can use it in the exception 

857 # messages. 

858 expr_aliases = { 

859 "@midnight": ("0 0 * * *", "h h(0-2) * * * h"), 

860 "@hourly": ("0 * * * *", "h * * * * h"), 

861 "@daily": ("0 0 * * *", "h h * * * h"), 

862 "@weekly": ("0 0 * * 0", "h h * * h h"), 

863 "@monthly": ("0 0 1 * *", "h h h * * h"), 

864 "@yearly": ("0 0 1 1 *", "h h h h * h"), 

865 "@annually": ("0 0 1 1 *", "h h h h * h"), 

866 } 

867 

868 efl = expr_format.lower() 

869 hash_id_expr = 1 if hash_id is not None else 0 

870 try: 

871 efl = expr_aliases[efl][hash_id_expr] 

872 except KeyError: 

873 pass 

874 

875 expressions = efl.split() 

876 

877 if len(expressions) not in VALID_LEN_EXPRESSION: 

878 raise CroniterBadCronError( 

879 "Exactly 5, 6 or 7 columns has to be specified for iterator expression." 

880 ) 

881 

882 if len(expressions) > UNIX_CRON_LEN and second_at_beginning: 

883 # move second to it's own(6th) field to process by same logical 

884 expressions.insert(SECOND_FIELD, expressions.pop(0)) 

885 

886 expanded = [] 

887 nth_weekday_of_month = {} 

888 

889 for field_index, expr in enumerate(expressions): 

890 for expanderid, expander in EXPANDERS.items(): 

891 expr = expander(cls).expand( 

892 efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp 

893 ) 

894 

895 if "?" in expr: 

896 if expr != "?": 

897 raise CroniterBadCronError( 

898 f"[{expr_format}] is not acceptable." 

899 f" Question mark can not used with other characters" 

900 ) 

901 if field_index not in [DAY_FIELD, DOW_FIELD]: 

902 raise CroniterBadCronError( 

903 f"[{expr_format}] is not acceptable. " 

904 f"Question mark can only used in day_of_month or day_of_week" 

905 ) 

906 # currently just trade `?` as `*` 

907 expr = "*" 

908 

909 e_list = expr.split(",") 

910 res = [] 

911 

912 while len(e_list) > 0: 

913 e = e_list.pop() 

914 nth = None 

915 

916 if field_index == DOW_FIELD: 

917 # Handle special case in the dow expression: 2#3, l3 

918 special_dow_rem = special_dow_re.match(str(e)) 

919 if special_dow_rem: 

920 g = special_dow_rem.groupdict() 

921 he, last = g.get("he", ""), g.get("last", "") 

922 if he: 

923 e = he 

924 try: 

925 nth = int(last) 

926 assert 5 >= nth >= 1 

927 except (KeyError, ValueError, AssertionError): 

928 raise CroniterBadCronError( 

929 f"[{expr_format}] is not acceptable." 

930 f" Invalid day_of_week value: '{nth}'" 

931 ) 

932 elif last: 

933 e = last 

934 nth = g["pre"] # 'l' 

935 

936 # Before matching step_search_re, normalize "*" to "{min}-{max}". 

937 # Example: in the minute field, "*/5" normalizes to "0-59/5" 

938 t = re.sub( 

939 r"^\*(\/.+)$", 

940 r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]), 

941 str(e), 

942 ) 

943 m = step_search_re.search(t) 

944 

945 if not m: 

946 # Before matching step_search_re, 

947 # normalize "{start}/{step}" to "{start}-{max}/{step}". 

948 # Example: in the minute field, "10/5" normalizes to "10-59/5" 

949 t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e)) 

950 m = step_search_re.search(t) 

951 

952 if m: 

953 # early abort if low/high are out of bounds 

954 (low, high, step) = m.group(1), m.group(2), m.group(4) or 1 

955 if field_index == DAY_FIELD and high == "l": 

956 high = "31" 

957 

958 if not only_int_re.search(low): 

959 low = str(cls._alphaconv(field_index, low, expressions)) 

960 

961 if not only_int_re.search(high): 

962 high = str(cls._alphaconv(field_index, high, expressions)) 

963 

964 # normally, it's already guarded by the RE that should not accept 

965 # not-int values. 

966 if not only_int_re.search(str(step)): 

967 raise CroniterBadCronError( 

968 f"[{expr_format}] step '{step}'" 

969 f" in field {field_index} is not acceptable" 

970 ) 

971 step = int(step) 

972 

973 for band in low, high: 

974 if not only_int_re.search(str(band)): 

975 raise CroniterBadCronError( 

976 f"[{expr_format}] bands '{low}-{high}'" 

977 f" in field {field_index} are not acceptable" 

978 ) 

979 

980 low, high = ( 

981 cls.value_alias(int(_val), field_index, expressions) 

982 for _val in (low, high) 

983 ) 

984 

985 if max(low, high) > max( 

986 cls.RANGES[field_index][0], cls.RANGES[field_index][1] 

987 ): 

988 raise CroniterBadCronError(f"{expr_format} is out of bands") 

989 

990 if from_timestamp: 

991 low = cls._get_low_from_current_date_number( 

992 field_index, int(step), int(from_timestamp) 

993 ) 

994 

995 # Handle when the second bound of the range is in backtracking order: 

996 # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH 

997 if low > high: 

998 whole_field_range = list( 

999 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1) 

1000 ) 

1001 # Add FirstBound -> ENDRANGE, respecting step 

1002 rng = list(range(low, cls.RANGES[field_index][1] + 1, step)) 

1003 # Then 0 -> SecondBound, but skipping n first occurences according to step 

1004 # EG to respect such expressions : Apr-Jan/3 

1005 to_skip = 0 

1006 if rng: 

1007 already_skipped = list(reversed(whole_field_range)).index(rng[-1]) 

1008 curpos = whole_field_range.index(rng[-1]) 

1009 if ((curpos + step) > len(whole_field_range)) and ( 

1010 already_skipped < step 

1011 ): 

1012 to_skip = step - already_skipped 

1013 rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step)) 

1014 # if we include a range type: Jan-Jan, or Sun-Sun, 

1015 # it means the whole cycle (all days of week, # all monthes of year, etc) 

1016 elif low == high: 

1017 rng = list( 

1018 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step) 

1019 ) 

1020 else: 

1021 try: 

1022 rng = list(range(low, high + 1, step)) 

1023 except ValueError as exc: 

1024 raise CroniterBadCronError(f"invalid range: {exc}") 

1025 

1026 if field_index == DOW_FIELD and nth and nth != "l": 

1027 rng = [f"{item}#{nth}" for item in rng] 

1028 e_list += [a for a in rng if a not in e_list] 

1029 else: 

1030 if t.startswith("-"): 

1031 raise CroniterBadCronError( 

1032 f"[{expr_format}] is not acceptable, negative numbers not allowed" 

1033 ) 

1034 if not star_or_int_re.search(t): 

1035 t = cls._alphaconv(field_index, t, expressions) 

1036 

1037 try: 

1038 t = int(t) 

1039 except ValueError: 

1040 pass 

1041 

1042 t = cls.value_alias(t, field_index, expressions) 

1043 

1044 if t not in ["*", "l"] and ( 

1045 int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1] 

1046 ): 

1047 raise CroniterBadCronError( 

1048 f"[{expr_format}] is not acceptable, out of range" 

1049 ) 

1050 

1051 res.append(t) 

1052 

1053 if field_index == DOW_FIELD and nth: 

1054 if t not in nth_weekday_of_month: 

1055 nth_weekday_of_month[t] = set() 

1056 nth_weekday_of_month[t].add(nth) 

1057 

1058 res = set(res) 

1059 res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i) 

1060 if len(res) == cls.LEN_MEANS_ALL[field_index]: 

1061 # Make sure the wildcard is used in the correct way (avoid over-optimization) 

1062 if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or ( 

1063 field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD] 

1064 ): 

1065 pass 

1066 else: 

1067 res = ["*"] 

1068 

1069 expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res) 

1070 

1071 # Check to make sure the dow combo in use is supported 

1072 if nth_weekday_of_month: 

1073 dow_expanded_set = set(expanded[DOW_FIELD]) 

1074 dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys()) 

1075 dow_expanded_set.discard("*") 

1076 # Skip: if it's all weeks instead of wildcard 

1077 if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]: 

1078 raise CroniterUnsupportedSyntaxError( 

1079 f"day-of-week field does not support mixing literal values and nth" 

1080 f" day of week syntax. Cron: '{expr_format}'" 

1081 f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}" 

1082 ) 

1083 

1084 EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] = expressions 

1085 return expanded, nth_weekday_of_month 

1086 

1087 @classmethod 

1088 def expand( 

1089 cls, 

1090 expr_format: str, 

1091 hash_id: Optional[Union[bytes, str]] = None, 

1092 second_at_beginning: bool = False, 

1093 from_timestamp: Optional[float] = None, 

1094 ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]: 

1095 """ 

1096 Expand a cron expression format into a noramlized format of 

1097 list[list[int | 'l' | '*']]. The first list representing each element 

1098 of the epxression, and each sub-list representing the allowed values 

1099 for that expression component. 

1100 

1101 A tuple is returned, the first value being the expanded epxression 

1102 list, and the second being a `nth_weekday_of_month` mapping. 

1103 

1104 Examples: 

1105 

1106 # Every minute 

1107 >>> croniter.expand('* * * * *') 

1108 ([['*'], ['*'], ['*'], ['*'], ['*']], {}) 

1109 

1110 # On the hour 

1111 >>> croniter.expand('0 0 * * *') 

1112 ([[0], [0], ['*'], ['*'], ['*']], {}) 

1113 

1114 # Hours 0-5 and 10 monday through friday 

1115 >>> croniter.expand('0-5,10 * * * mon-fri') 

1116 ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {}) 

1117 

1118 Note that some special values such as nth day of week are expanded to a 

1119 special mapping format for later processing: 

1120 

1121 # Every minute on the 3rd tuesday of the month 

1122 >>> croniter.expand('* * * * 2#3') 

1123 ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}}) 

1124 

1125 # Every hour on the last day of the month 

1126 >>> croniter.expand('0 * l * *') 

1127 ([[0], ['*'], ['l'], ['*'], ['*']], {}) 

1128 

1129 # On the hour every 15 seconds 

1130 >>> croniter.expand('0 0 * * * */15') 

1131 ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {}) 

1132 """ 

1133 try: 

1134 return cls._expand( 

1135 expr_format, 

1136 hash_id=hash_id, 

1137 second_at_beginning=second_at_beginning, 

1138 from_timestamp=from_timestamp, 

1139 ) 

1140 except (ValueError,) as exc: 

1141 if isinstance(exc, CroniterError): 

1142 raise 

1143 trace = _traceback.format_exc() 

1144 raise CroniterBadCronError(trace) 

1145 

1146 @classmethod 

1147 def _get_low_from_current_date_number(cls, field_index, step, from_timestamp): 

1148 dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT) 

1149 if field_index == MINUTE_FIELD: 

1150 return dt.minute % step 

1151 if field_index == HOUR_FIELD: 

1152 return dt.hour % step 

1153 if field_index == DAY_FIELD: 

1154 return ((dt.day - 1) % step) + 1 

1155 if field_index == MONTH_FIELD: 

1156 return dt.month % step 

1157 if field_index == DOW_FIELD: 

1158 return (dt.weekday() + 1) % step 

1159 

1160 raise ValueError("Can't get current date number for index larger than 4") 

1161 

1162 @classmethod 

1163 def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False): 

1164 if hash_id: 

1165 if not isinstance(hash_id, (bytes, str)): 

1166 raise TypeError("hash_id must be bytes or UTF-8 string") 

1167 if not isinstance(hash_id, bytes): 

1168 hash_id = hash_id.encode(encoding) 

1169 try: 

1170 cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning) 

1171 except CroniterError: 

1172 return False 

1173 return True 

1174 

1175 @classmethod 

1176 def match(cls, cron_expression, testdate, day_or=True, second_at_beginning=False): 

1177 return cls.match_range(cron_expression, testdate, testdate, day_or, second_at_beginning) 

1178 

1179 @classmethod 

1180 def match_range( 

1181 cls, cron_expression, from_datetime, to_datetime, day_or=True, second_at_beginning=False 

1182 ): 

1183 cron = cls( 

1184 cron_expression, 

1185 to_datetime, 

1186 ret_type=datetime.datetime, 

1187 day_or=day_or, 

1188 second_at_beginning=second_at_beginning, 

1189 ) 

1190 tdp = cron.get_current(datetime.datetime) 

1191 if not tdp.microsecond: 

1192 tdp += relativedelta(microseconds=1) 

1193 cron.set_current(tdp, force=True) 

1194 try: 

1195 tdt = cron.get_prev() 

1196 except CroniterBadDateError: 

1197 return False 

1198 precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60 

1199 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds 

1200 return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second 

1201 

1202 

1203def croniter_range( 

1204 start, 

1205 stop, 

1206 expr_format, 

1207 ret_type=None, 

1208 day_or=True, 

1209 exclude_ends=False, 

1210 _croniter=None, 

1211 second_at_beginning=False, 

1212 expand_from_start_time=False, 

1213): 

1214 """ 

1215 Generator that provides all times from start to stop matching the given cron expression. 

1216 If the cron expression matches either 'start' and/or 'stop', those times will be returned as 

1217 well unless 'exclude_ends=True' is passed. 

1218 

1219 You can think of this function as sibling to the builtin range function for datetime objects. 

1220 Like range(start,stop,step), except that here 'step' is a cron expression. 

1221 """ 

1222 _croniter = _croniter or croniter 

1223 auto_rt = datetime.datetime 

1224 # type is used in first if branch for perfs reasons 

1225 if type(start) is not type(stop) and not ( 

1226 isinstance(start, type(stop)) or isinstance(stop, type(start)) 

1227 ): 

1228 raise CroniterBadTypeRangeError( 

1229 f"The start and stop must be same type. {type(start)} != {type(stop)}" 

1230 ) 

1231 if isinstance(start, (float, int)): 

1232 start, stop = ( 

1233 datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop) 

1234 ) 

1235 auto_rt = float 

1236 if ret_type is None: 

1237 ret_type = auto_rt 

1238 if not exclude_ends: 

1239 ms1 = relativedelta(microseconds=1) 

1240 if start < stop: # Forward (normal) time order 

1241 start -= ms1 

1242 stop += ms1 

1243 else: # Reverse time order 

1244 start += ms1 

1245 stop -= ms1 

1246 year_span = math.floor(abs(stop.year - start.year)) + 1 

1247 ic = _croniter( 

1248 expr_format, 

1249 start, 

1250 ret_type=datetime.datetime, 

1251 day_or=day_or, 

1252 max_years_between_matches=year_span, 

1253 second_at_beginning=second_at_beginning, 

1254 expand_from_start_time=expand_from_start_time, 

1255 ) 

1256 # define a continue (cont) condition function and step function for the main while loop 

1257 if start < stop: # Forward 

1258 

1259 def cont(v): 

1260 return v < stop 

1261 

1262 step = ic.get_next 

1263 else: # Reverse 

1264 

1265 def cont(v): 

1266 return v > stop 

1267 

1268 step = ic.get_prev 

1269 try: 

1270 dt = step() 

1271 while cont(dt): 

1272 if ret_type is float: 

1273 yield ic.get_current(float) 

1274 else: 

1275 yield dt 

1276 dt = step() 

1277 except CroniterBadDateError: 

1278 # Stop iteration when this exception is raised; no match found within the given year range 

1279 return 

1280 

1281 

1282class HashExpander: 

1283 def __init__(self, cronit): 

1284 self.cron = cronit 

1285 

1286 def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None): 

1287 """Return a hashed/random integer given range/hash information""" 

1288 if range_end is None: 

1289 range_end = self.cron.RANGES[idx][1] 

1290 if range_begin is None: 

1291 range_begin = self.cron.RANGES[idx][0] 

1292 if hash_type == "r": 

1293 crc = random.randint(0, 0xFFFFFFFF) 

1294 else: 

1295 crc = binascii.crc32(hash_id) & 0xFFFFFFFF 

1296 return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin 

1297 

1298 def match(self, efl, idx, expr, hash_id=None, **kw): 

1299 return hash_expression_re.match(expr) 

1300 

1301 def expand(self, efl, idx, expr, hash_id=None, match="", **kw): 

1302 """Expand a hashed/random expression to its normal representation""" 

1303 if match == "": 

1304 match = self.match(efl, idx, expr, hash_id, **kw) 

1305 if not match: 

1306 return expr 

1307 m = match.groupdict() 

1308 

1309 if m["hash_type"] == "h" and hash_id is None: 

1310 raise CroniterBadCronError("Hashed definitions must include hash_id") 

1311 

1312 if m["range_begin"] and m["range_end"]: 

1313 if int(m["range_begin"]) >= int(m["range_end"]): 

1314 raise CroniterBadCronError("Range end must be greater than range begin") 

1315 

1316 if m["range_begin"] and m["range_end"] and m["divisor"]: 

1317 # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54) 

1318 if int(m["divisor"]) == 0: 

1319 raise CroniterBadCronError(f"Bad expression: {expr}") 

1320 

1321 x = self.do( 

1322 idx, 

1323 hash_type=m["hash_type"], 

1324 hash_id=hash_id, 

1325 range_begin=int(m["range_begin"]), 

1326 range_end=int(m["divisor"]) - 1 + int(m["range_begin"]), 

1327 ) 

1328 return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}" 

1329 if m["range_begin"] and m["range_end"]: 

1330 # Example: H(0-29) -> 12 

1331 return str( 

1332 self.do( 

1333 idx, 

1334 hash_type=m["hash_type"], 

1335 hash_id=hash_id, 

1336 range_end=int(m["range_end"]), 

1337 range_begin=int(m["range_begin"]), 

1338 ) 

1339 ) 

1340 if m["divisor"]: 

1341 # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52) 

1342 if int(m["divisor"]) == 0: 

1343 raise CroniterBadCronError(f"Bad expression: {expr}") 

1344 

1345 x = self.do( 

1346 idx, 

1347 hash_type=m["hash_type"], 

1348 hash_id=hash_id, 

1349 range_begin=self.cron.RANGES[idx][0], 

1350 range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0], 

1351 ) 

1352 return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}" 

1353 

1354 # Example: H -> 32 

1355 return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id)) 

1356 

1357 

1358EXPANDERS = {"hash": HashExpander}