Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/croniter/croniter.py: 84%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

735 statements  

1#!/usr/bin/env python 

2import binascii 

3import calendar 

4import copy 

5import datetime 

6import math 

7import platform 

8import random 

9import re 

10import struct 

11import sys 

12import traceback as _traceback 

13from time import time 

14from typing import Any, Literal, Optional, Union 

15 

16from dateutil.relativedelta import relativedelta 

17from dateutil.tz import datetime_exists, tzutc 

18 

19ExpandedExpression = list[Union[int, Literal["*", "l"]]] 

20 

21 

22def is_32bit() -> bool: 

23 """ 

24 Detect if Python is running in 32-bit mode. 

25 Returns True if running on 32-bit Python, False for 64-bit. 

26 """ 

27 # Method 1: Check pointer size 

28 bits = struct.calcsize("P") * 8 

29 

30 # Method 2: Check platform architecture string 

31 try: 

32 architecture = platform.architecture()[0] 

33 except RuntimeError: 

34 architecture = None 

35 

36 # Method 3: Check maxsize 

37 is_small_maxsize = sys.maxsize <= 2**32 

38 

39 # Evaluate all available methods 

40 is_32 = False 

41 

42 if bits == 32: 

43 is_32 = True 

44 elif architecture and "32" in architecture: 

45 is_32 = True 

46 elif is_small_maxsize: 

47 is_32 = True 

48 

49 return is_32 

50 

51 

52try: 

53 # https://github.com/python/cpython/issues/101069 detection 

54 if is_32bit(): 

55 datetime.datetime.fromtimestamp(3999999999) 

56 OVERFLOW32B_MODE = False 

57except OverflowError: 

58 OVERFLOW32B_MODE = True 

59 

60 

61UTC_DT = datetime.timezone.utc 

62EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT) 

63 

64# fmt: off 

65M_ALPHAS: dict[str, Union[int, str]] = { 

66 "jan": 1, "feb": 2, "mar": 3, "apr": 4, # noqa: E241 

67 "may": 5, "jun": 6, "jul": 7, "aug": 8, # noqa: E241 

68 "sep": 9, "oct": 10, "nov": 11, "dec": 12, 

69} 

70DOW_ALPHAS: dict[str, Union[int, str]] = { 

71 "sun": 0, "mon": 1, "tue": 2, "wed": 3, "thu": 4, "fri": 5, "sat": 6 

72} 

73# fmt: on 

74 

75MINUTE_FIELD = 0 

76HOUR_FIELD = 1 

77DAY_FIELD = 2 

78MONTH_FIELD = 3 

79DOW_FIELD = 4 

80SECOND_FIELD = 5 

81YEAR_FIELD = 6 

82 

83UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD) 

84SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD) 

85YEAR_FIELDS = ( 

86 MINUTE_FIELD, 

87 HOUR_FIELD, 

88 DAY_FIELD, 

89 MONTH_FIELD, 

90 DOW_FIELD, 

91 SECOND_FIELD, 

92 YEAR_FIELD, 

93) 

94 

95step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$") 

96only_int_re = re.compile(r"^\d+$") 

97 

98DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31) 

99WEEKDAYS = "|".join(DOW_ALPHAS.keys()) 

100MONTHS = "|".join(M_ALPHAS.keys()) 

101star_or_int_re = re.compile(r"^(\d+|\*)$") 

102special_dow_re = re.compile( 

103 rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)" 

104 rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$" 

105) 

106re_star = re.compile("[*]") 

107hash_expression_re = re.compile( 

108 r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$" 

109) 

110 

111CRON_FIELDS = { 

112 "unix": UNIX_FIELDS, 

113 "second": SECOND_FIELDS, 

114 "year": YEAR_FIELDS, 

115 len(UNIX_FIELDS): UNIX_FIELDS, 

116 len(SECOND_FIELDS): SECOND_FIELDS, 

117 len(YEAR_FIELDS): YEAR_FIELDS, 

118} 

119UNIX_CRON_LEN = len(UNIX_FIELDS) 

120SECOND_CRON_LEN = len(SECOND_FIELDS) 

121YEAR_CRON_LEN = len(YEAR_FIELDS) 

122# retrocompat 

123VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)} 

124TIMESTAMP_TO_DT_CACHE: dict[tuple[float, str], datetime.datetime] = {} 

125EXPRESSIONS: dict[tuple[str, Optional[bytes], bool], list[str]] = {} 

126MARKER = object() 

127 

128 

129def datetime_to_timestamp(d): 

130 if d.tzinfo is not None: 

131 d = d.replace(tzinfo=None) - d.utcoffset() 

132 

133 return (d - datetime.datetime(1970, 1, 1)).total_seconds() 

134 

135 

136def _is_leap(year: int) -> bool: 

137 return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0) 

138 

139 

140def _last_day_of_month(year: int, month: int) -> int: 

141 """Calculate the last day of the given month (honor leap years).""" 

142 last_day = DAYS[month - 1] 

143 if month == 2 and _is_leap(year): 

144 last_day += 1 

145 return last_day 

146 

147 

148def _is_successor( 

149 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool 

150) -> bool: 

151 """Check if the given date is a successor (after/before) of the previous date.""" 

152 if is_prev: 

153 return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT) 

154 return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT) 

155 

156 

157def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta: 

158 """Calculate the timezone difference of the given dates.""" 

159 offset1 = date1.utcoffset() 

160 offset2 = date2.utcoffset() 

161 assert offset1 is not None 

162 assert offset2 is not None 

163 return offset2 - offset1 

164 

165 

166def _add_tzinfo( 

167 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool 

168) -> tuple[datetime.datetime, bool]: 

169 """Add the tzinfo from the previous date to the given date. 

170 

171 In case the new date is ambiguous, determine the correct date 

172 based on it being closer to the previous date but still a successor 

173 (after/before based on `is_prev`). 

174 

175 In case the date does not exist, jump forward to the next existing date. 

176 """ 

177 localize = getattr(previous_date.tzinfo, "localize", None) 

178 if localize is not None: 

179 # pylint: disable-next=import-outside-toplevel 

180 import pytz 

181 

182 try: 

183 result = localize(date, is_dst=None) 

184 except pytz.NonExistentTimeError: 

185 while True: 

186 date += datetime.timedelta(minutes=1) 

187 try: 

188 result = localize(date, is_dst=None) 

189 except pytz.NonExistentTimeError: 

190 continue 

191 break 

192 return result, False 

193 except pytz.AmbiguousTimeError: 

194 closer = localize(date, is_dst=not is_prev) 

195 farther = localize(date, is_dst=is_prev) 

196 # TODO: Check negative DST 

197 assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev 

198 if _is_successor(closer, previous_date, is_prev): 

199 result = closer 

200 else: 

201 assert _is_successor(farther, previous_date, is_prev) 

202 result = farther 

203 return result, True 

204 

205 result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo) 

206 if not datetime_exists(result): 

207 while not datetime_exists(result): 

208 result += datetime.timedelta(minutes=1) 

209 return result, False 

210 

211 # result is closer to the previous date 

212 farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo) 

213 # Comparing the UTC offsets in the check for the date being ambiguous. 

214 if result.utcoffset() != farther.utcoffset(): 

215 # TODO: Check negative DST 

216 assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev 

217 if not _is_successor(result, previous_date, is_prev): 

218 assert _is_successor(farther, previous_date, is_prev) 

219 result = farther 

220 return result, True 

221 

222 

223class CroniterError(ValueError): 

224 """General top-level Croniter base exception""" 

225 

226 

227class CroniterBadTypeRangeError(TypeError): 

228 """.""" 

229 

230 

231class CroniterBadCronError(CroniterError): 

232 """Syntax, unknown value, or range error within a cron expression""" 

233 

234 

235class CroniterUnsupportedSyntaxError(CroniterBadCronError): 

236 """Valid cron syntax, but likely to produce inaccurate results""" 

237 

238 # Extending CroniterBadCronError, which may be contridatory, but this allows 

239 # catching both errors with a single exception. From a user perspective 

240 # these will likely be handled the same way. 

241 

242 

243class CroniterBadDateError(CroniterError): 

244 """Unable to find next/prev timestamp match""" 

245 

246 

247class CroniterNotAlphaError(CroniterBadCronError): 

248 """Cron syntax contains an invalid day or month abbreviation""" 

249 

250 

251class croniter: 

252 MONTHS_IN_YEAR = 12 

253 

254 # This helps with expanding `*` fields into `lower-upper` ranges. Each item 

255 # in this tuple maps to the corresponding field index 

256 RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099)) 

257 

258 ALPHACONV: tuple[dict[str, Union[int, str]], ...] = ( 

259 {}, # 0: min 

260 {}, # 1: hour 

261 {"l": "l"}, # 2: dom 

262 # 3: mon 

263 copy.deepcopy(M_ALPHAS), 

264 # 4: dow 

265 copy.deepcopy(DOW_ALPHAS), 

266 # 5: second 

267 {}, 

268 # 6: year 

269 {}, 

270 ) 

271 

272 LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {}) 

273 

274 LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130) 

275 

276 def __init__( 

277 self, 

278 expr_format, 

279 start_time=None, 

280 ret_type=float, 

281 day_or=True, 

282 max_years_between_matches=None, 

283 is_prev=False, 

284 hash_id=None, 

285 implement_cron_bug=False, 

286 second_at_beginning=None, 

287 expand_from_start_time=False, 

288 ): 

289 self._ret_type = ret_type 

290 self._day_or = day_or 

291 self._implement_cron_bug = implement_cron_bug 

292 self.second_at_beginning = bool(second_at_beginning) 

293 self._expand_from_start_time = expand_from_start_time 

294 

295 if hash_id: 

296 if not isinstance(hash_id, (bytes, str)): 

297 raise TypeError("hash_id must be bytes or UTF-8 string") 

298 if not isinstance(hash_id, bytes): 

299 hash_id = hash_id.encode("UTF-8") 

300 

301 self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None 

302 if not self._max_years_btw_matches_explicitly_set: 

303 max_years_between_matches = 50 

304 self._max_years_between_matches = max(int(max_years_between_matches), 1) 

305 

306 if start_time is None: 

307 start_time = time() 

308 

309 self.tzinfo = None 

310 

311 self.start_time = None 

312 self.dst_start_time = None 

313 self.cur = None 

314 self.set_current(start_time, force=False) 

315 

316 self.expanded, self.nth_weekday_of_month = self.expand( 

317 expr_format, 

318 hash_id=hash_id, 

319 from_timestamp=self.dst_start_time if self._expand_from_start_time else None, 

320 second_at_beginning=second_at_beginning, 

321 ) 

322 self.fields = CRON_FIELDS[len(self.expanded)] 

323 self.expressions = EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] 

324 self._is_prev = is_prev 

325 

326 @classmethod 

327 def _alphaconv(cls, index, key, expressions): 

328 try: 

329 return cls.ALPHACONV[index][key] 

330 except KeyError: 

331 raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable") 

332 

333 def get_next(self, ret_type=None, start_time=None, update_current=True): 

334 if start_time and self._expand_from_start_time: 

335 raise ValueError( 

336 "start_time is not supported when using expand_from_start_time = True." 

337 ) 

338 return self._get_next( 

339 ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current 

340 ) 

341 

342 def get_prev(self, ret_type=None, start_time=None, update_current=True): 

343 return self._get_next( 

344 ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current 

345 ) 

346 

347 def get_current(self, ret_type=None): 

348 ret_type = ret_type or self._ret_type 

349 if issubclass(ret_type, datetime.datetime): 

350 return self.timestamp_to_datetime(self.cur) 

351 return self.cur 

352 

353 def set_current( 

354 self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True 

355 ) -> Optional[float]: 

356 if (force or (self.cur is None)) and start_time is not None: 

357 if isinstance(start_time, datetime.datetime): 

358 self.tzinfo = start_time.tzinfo 

359 start_time = self.datetime_to_timestamp(start_time) 

360 

361 self.start_time = start_time 

362 self.dst_start_time = start_time 

363 self.cur = start_time 

364 return self.cur 

365 

366 @staticmethod 

367 def datetime_to_timestamp(d: datetime.datetime) -> float: 

368 """ 

369 Converts a `datetime` object `d` into a UNIX timestamp. 

370 """ 

371 return datetime_to_timestamp(d) 

372 

373 _datetime_to_timestamp = datetime_to_timestamp # retrocompat 

374 

375 def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime: 

376 """ 

377 Converts a UNIX `timestamp` into a `datetime` object. 

378 """ 

379 if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set 

380 tzinfo = self.tzinfo 

381 key = (timestamp, repr(tzinfo)) 

382 try: 

383 return TIMESTAMP_TO_DT_CACHE[key] 

384 except KeyError: 

385 pass 

386 if OVERFLOW32B_MODE: 

387 # degraded mode to workaround Y2038 

388 # see https://github.com/python/cpython/issues/101069 

389 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp) 

390 else: 

391 result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None) 

392 if tzinfo: 

393 result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo) 

394 TIMESTAMP_TO_DT_CACHE[key] = result 

395 return result 

396 

397 _timestamp_to_datetime = timestamp_to_datetime # retrocompat 

398 

399 def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None): 

400 if update_current is None: 

401 update_current = True 

402 self.set_current(start_time, force=True) 

403 if is_prev is None: 

404 is_prev = self._is_prev 

405 self._is_prev = is_prev 

406 

407 ret_type = ret_type or self._ret_type 

408 

409 if not issubclass(ret_type, (float, datetime.datetime)): 

410 raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.") 

411 

412 result = self._calc_next(is_prev) 

413 timestamp = self.datetime_to_timestamp(result) 

414 if update_current: 

415 self.cur = timestamp 

416 if issubclass(ret_type, datetime.datetime): 

417 return result 

418 return timestamp 

419 

420 # iterator protocol, to enable direct use of croniter 

421 # objects in a loop, like "for dt in croniter("5 0 * * *'): ..." 

422 # or for combining multiple croniters into single 

423 # dates feed using 'itertools' module 

424 def all_next(self, ret_type=None, start_time=None, update_current=None): 

425 """ 

426 Returns a generator yielding consecutive dates. 

427 

428 May be used instead of an implicit call to __iter__ whenever a 

429 non-default `ret_type` needs to be specified. 

430 """ 

431 # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could 

432 # be used instead 

433 try: 

434 while True: 

435 self._is_prev = False 

436 yield self._get_next( 

437 ret_type=ret_type, start_time=start_time, update_current=update_current 

438 ) 

439 start_time = None 

440 except CroniterBadDateError: 

441 if self._max_years_btw_matches_explicitly_set: 

442 return 

443 raise 

444 

445 def all_prev(self, ret_type=None, start_time=None, update_current=None): 

446 """ 

447 Returns a generator yielding previous dates. 

448 """ 

449 try: 

450 while True: 

451 self._is_prev = True 

452 yield self._get_next( 

453 ret_type=ret_type, start_time=start_time, update_current=update_current 

454 ) 

455 start_time = None 

456 except CroniterBadDateError: 

457 if self._max_years_btw_matches_explicitly_set: 

458 return 

459 raise 

460 

461 def iter(self, *args, **kwargs): 

462 return self.all_prev if self._is_prev else self.all_next 

463 

464 def __iter__(self): 

465 return self 

466 

467 __next__ = next = _get_next 

468 

469 def _calc_next(self, is_prev: bool) -> datetime.datetime: 

470 current = self.timestamp_to_datetime(self.cur) 

471 expanded = self.expanded[:] 

472 nth_weekday_of_month = self.nth_weekday_of_month.copy() 

473 

474 # exception to support day of month and day of week as defined in cron 

475 if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or: 

476 # If requested, handle a bug in vixie cron/ISC cron where day_of_month and 

477 # day_of_week form an intersection (AND) instead of a union (OR) if either 

478 # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html) 

479 if self._implement_cron_bug and ( 

480 re_star.match(self.expressions[DAY_FIELD]) 

481 or re_star.match(self.expressions[DOW_FIELD]) 

482 ): 

483 # To produce a schedule identical to the cron bug, we'll bypass the code 

484 # that makes a union of DOM and DOW, and instead skip to the code that 

485 # does an intersect instead 

486 pass 

487 else: 

488 bak = expanded[DOW_FIELD] 

489 expanded[DOW_FIELD] = ["*"] 

490 t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev) 

491 expanded[DOW_FIELD] = bak 

492 expanded[DAY_FIELD] = ["*"] 

493 

494 t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev) 

495 if is_prev: 

496 return t1 if t1 > t2 else t2 

497 return t1 if t1 < t2 else t2 

498 

499 return self._calc(current, expanded, nth_weekday_of_month, is_prev) 

500 

501 def _calc( 

502 self, 

503 now: datetime.datetime, 

504 expanded: list[ExpandedExpression], 

505 nth_weekday_of_month: dict[int, set[int]], 

506 is_prev: bool, 

507 ) -> datetime.datetime: 

508 if is_prev: 

509 nearest_diff_method = self._get_prev_nearest_diff 

510 offset = relativedelta(microseconds=-1) 

511 else: 

512 nearest_diff_method = self._get_next_nearest_diff 

513 if len(expanded) > UNIX_CRON_LEN: 

514 offset = relativedelta(seconds=1) 

515 else: 

516 offset = relativedelta(minutes=1) 

517 # Calculate the next cron time in local time a.k.a. timezone unaware time. 

518 unaware_time = now.replace(tzinfo=None) + offset 

519 if len(expanded) > UNIX_CRON_LEN: 

520 unaware_time = unaware_time.replace(microsecond=0) 

521 else: 

522 unaware_time = unaware_time.replace(second=0, microsecond=0) 

523 

524 month = unaware_time.month 

525 year = current_year = unaware_time.year 

526 

527 def proc_year(d): 

528 if len(expanded) == YEAR_CRON_LEN: 

529 try: 

530 expanded[YEAR_FIELD].index("*") 

531 except ValueError: 

532 # use None as range_val to indicate no loop 

533 diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None) 

534 if diff_year is None: 

535 return None, d 

536 if diff_year != 0: 

537 if is_prev: 

538 d += relativedelta( 

539 years=diff_year, month=12, day=31, hour=23, minute=59, second=59 

540 ) 

541 else: 

542 d += relativedelta( 

543 years=diff_year, month=1, day=1, hour=0, minute=0, second=0 

544 ) 

545 return True, d 

546 return False, d 

547 

548 def proc_month(d): 

549 try: 

550 expanded[MONTH_FIELD].index("*") 

551 except ValueError: 

552 diff_month = nearest_diff_method( 

553 d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR 

554 ) 

555 reset_day = 1 

556 

557 if diff_month is not None and diff_month != 0: 

558 if is_prev: 

559 d += relativedelta(months=diff_month) 

560 reset_day = _last_day_of_month(d.year, d.month) 

561 d += relativedelta(day=reset_day, hour=23, minute=59, second=59) 

562 else: 

563 d += relativedelta( 

564 months=diff_month, day=reset_day, hour=0, minute=0, second=0 

565 ) 

566 return True, d 

567 return False, d 

568 

569 def proc_day_of_month(d): 

570 try: 

571 expanded[DAY_FIELD].index("*") 

572 except ValueError: 

573 days = _last_day_of_month(year, month) 

574 if "l" in expanded[DAY_FIELD] and days == d.day: 

575 return False, d 

576 

577 if is_prev: 

578 days_in_prev_month = DAYS[(month - 2) % self.MONTHS_IN_YEAR] 

579 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month) 

580 else: 

581 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days) 

582 

583 if diff_day is not None and diff_day != 0: 

584 if is_prev: 

585 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

586 else: 

587 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

588 return True, d 

589 return False, d 

590 

591 def proc_day_of_week(d): 

592 try: 

593 expanded[DOW_FIELD].index("*") 

594 except ValueError: 

595 diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7) 

596 if diff_day_of_week is not None and diff_day_of_week != 0: 

597 if is_prev: 

598 d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59) 

599 else: 

600 d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0) 

601 return True, d 

602 return False, d 

603 

604 def proc_day_of_week_nth(d): 

605 if "*" in nth_weekday_of_month: 

606 s = nth_weekday_of_month["*"] 

607 for i in range(0, 7): 

608 if i in nth_weekday_of_month: 

609 nth_weekday_of_month[i].update(s) 

610 else: 

611 nth_weekday_of_month[i] = s 

612 del nth_weekday_of_month["*"] 

613 

614 candidates = [] 

615 for wday, nth in nth_weekday_of_month.items(): 

616 c = self._get_nth_weekday_of_month(d.year, d.month, wday) 

617 for n in nth: 

618 if n == "l": 

619 candidate = c[-1] 

620 elif len(c) < n: 

621 continue 

622 else: 

623 candidate = c[n - 1] 

624 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate): 

625 candidates.append(candidate) 

626 

627 if not candidates: 

628 if is_prev: 

629 d += relativedelta(days=-d.day, hour=23, minute=59, second=59) 

630 else: 

631 days = _last_day_of_month(year, month) 

632 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0) 

633 return True, d 

634 

635 candidates.sort() 

636 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day 

637 if diff_day != 0: 

638 if is_prev: 

639 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

640 else: 

641 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

642 return True, d 

643 return False, d 

644 

645 def proc_hour(d): 

646 try: 

647 expanded[HOUR_FIELD].index("*") 

648 except ValueError: 

649 diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24) 

650 if diff_hour is not None and diff_hour != 0: 

651 if is_prev: 

652 d += relativedelta(hours=diff_hour, minute=59, second=59) 

653 else: 

654 d += relativedelta(hours=diff_hour, minute=0, second=0) 

655 return True, d 

656 return False, d 

657 

658 def proc_minute(d): 

659 try: 

660 expanded[MINUTE_FIELD].index("*") 

661 except ValueError: 

662 diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60) 

663 if diff_min is not None and diff_min != 0: 

664 if is_prev: 

665 d += relativedelta(minutes=diff_min, second=59) 

666 else: 

667 d += relativedelta(minutes=diff_min, second=0) 

668 return True, d 

669 return False, d 

670 

671 def proc_second(d): 

672 if len(expanded) > UNIX_CRON_LEN: 

673 try: 

674 expanded[SECOND_FIELD].index("*") 

675 except ValueError: 

676 diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60) 

677 if diff_sec is not None and diff_sec != 0: 

678 d += relativedelta(seconds=diff_sec) 

679 return True, d 

680 else: 

681 d += relativedelta(second=0) 

682 return False, d 

683 

684 procs = [ 

685 proc_year, 

686 proc_month, 

687 proc_day_of_month, 

688 (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week), 

689 proc_hour, 

690 proc_minute, 

691 proc_second, 

692 ] 

693 

694 while abs(year - current_year) <= self._max_years_between_matches: 

695 next = False 

696 stop = False 

697 for proc in procs: 

698 (changed, unaware_time) = proc(unaware_time) 

699 # `None` can be set mostly for year processing 

700 # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff 

701 if changed is None: 

702 stop = True 

703 break 

704 if changed: 

705 month, year = unaware_time.month, unaware_time.year 

706 next = True 

707 break 

708 if stop: 

709 break 

710 if next: 

711 continue 

712 

713 unaware_time = unaware_time.replace(microsecond=0) 

714 if now.tzinfo is None: 

715 return unaware_time 

716 

717 # Add timezone information back and handle DST changes 

718 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev) 

719 

720 if not exists and ( 

721 not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD] 

722 ): 

723 # The calculated local date does not exist and moving the time forward 

724 # to the next valid time isn't the correct solution. Search for the 

725 # next matching cron time that exists. 

726 while not exists: 

727 unaware_time = self._calc( 

728 unaware_time, expanded, nth_weekday_of_month, is_prev 

729 ) 

730 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev) 

731 

732 offset_delta = _timezone_delta(now, aware_time) 

733 if not offset_delta: 

734 # There was no DST change. 

735 return aware_time 

736 

737 # There was a DST change. So check if there is a alternative cron time 

738 # for the other UTC offset. 

739 alternative_unaware_time = now.replace(tzinfo=None) + offset_delta 

740 alternative_unaware_time = self._calc( 

741 alternative_unaware_time, expanded, nth_weekday_of_month, is_prev 

742 ) 

743 alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev) 

744 

745 if not _is_successor(alternative_aware_time, now, is_prev): 

746 # The alternative time is an ancestor of now. Thus it is not an alternative. 

747 return aware_time 

748 

749 if _is_successor(aware_time, alternative_aware_time, is_prev): 

750 return alternative_aware_time 

751 

752 return aware_time 

753 

754 if is_prev: 

755 raise CroniterBadDateError("failed to find prev date") 

756 raise CroniterBadDateError("failed to find next date") 

757 

758 @staticmethod 

759 def _get_next_nearest_diff(x, to_check, range_val): 

760 """ 

761 `range_val` is the range of a field. 

762 If no available time, we can move to next loop(like next month). 

763 `range_val` can also be set to `None` to indicate that there is no loop. 

764 ( Currently, should only used for `year` field ) 

765 """ 

766 for i, d in enumerate(to_check): 

767 if range_val is not None: 

768 if d == "l": 

769 # if 'l' then it is the last day of month 

770 # => its value of range_val 

771 d = range_val 

772 elif d > range_val: 

773 continue 

774 if d >= x: 

775 return d - x 

776 # When range_val is None and x not exists in to_check, 

777 # `None` will be returned to suggest no more available time 

778 if range_val is None: 

779 return None 

780 return to_check[0] - x + range_val 

781 

782 @staticmethod 

783 def _get_prev_nearest_diff(x, to_check, range_val): 

784 """ 

785 `range_val` is the range of a field. 

786 If no available time, we can move to previous loop(like previous month). 

787 Range_val can also be set to `None` to indicate that there is no loop. 

788 ( Currently should only used for `year` field ) 

789 """ 

790 candidates = to_check[:] 

791 candidates.reverse() 

792 for d in candidates: 

793 if d != "l" and d <= x: 

794 return d - x 

795 if "l" in candidates: 

796 return -x 

797 # When range_val is None and x not exists in to_check, 

798 # `None` will be returned to suggest no more available time 

799 if range_val is None: 

800 return None 

801 candidate = candidates[0] 

802 for c in candidates: 

803 # fixed: c < range_val 

804 # this code will reject all 31 day of month, 12 month, 59 second, 

805 # 23 hour and so on. 

806 # if candidates has just a element, this will not harmful. 

807 # but candidates have multiple elements, then values equal to 

808 # range_val will rejected. 

809 if c <= range_val: 

810 candidate = c 

811 break 

812 # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error 

813 # return 2021-03-02 06:00:00 

814 if candidate > range_val: 

815 return -range_val 

816 return candidate - x - range_val 

817 

818 @staticmethod 

819 def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]: 

820 """For a given year/month return a list of days in nth-day-of-month order. 

821 The last weekday of the month is always [-1]. 

822 """ 

823 w = (day_of_week + 6) % 7 

824 c = calendar.Calendar(w).monthdayscalendar(year, month) 

825 if c[0][0] == 0: 

826 c.pop(0) 

827 return tuple(i[0] for i in c) 

828 

829 @classmethod 

830 def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN): 

831 if isinstance(len_expressions, (list, dict, tuple, set)): 

832 len_expressions = len(len_expressions) 

833 if val in cls.LOWMAP[field_index] and not ( 

834 # do not support 0 as a month either for classical 5 fields cron, 

835 # 6fields second repeat form or 7 fields year form 

836 # but still let conversion happen if day field is shifted 

837 (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN) 

838 or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN) 

839 or ( 

840 field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD] 

841 and len_expressions == YEAR_CRON_LEN 

842 ) 

843 ): 

844 val = cls.LOWMAP[field_index][val] 

845 return val 

846 

847 @classmethod 

848 def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None): 

849 # Split the expression in components, and normalize L -> l, MON -> mon, 

850 # etc. Keep expr_format untouched so we can use it in the exception 

851 # messages. 

852 expr_aliases = { 

853 "@midnight": ("0 0 * * *", "h h(0-2) * * * h"), 

854 "@hourly": ("0 * * * *", "h * * * * h"), 

855 "@daily": ("0 0 * * *", "h h * * * h"), 

856 "@weekly": ("0 0 * * 0", "h h * * h h"), 

857 "@monthly": ("0 0 1 * *", "h h h * * h"), 

858 "@yearly": ("0 0 1 1 *", "h h h h * h"), 

859 "@annually": ("0 0 1 1 *", "h h h h * h"), 

860 } 

861 

862 efl = expr_format.lower() 

863 hash_id_expr = 1 if hash_id is not None else 0 

864 try: 

865 efl = expr_aliases[efl][hash_id_expr] 

866 except KeyError: 

867 pass 

868 

869 expressions = efl.split() 

870 

871 if len(expressions) not in VALID_LEN_EXPRESSION: 

872 raise CroniterBadCronError( 

873 "Exactly 5, 6 or 7 columns has to be specified for iterator expression." 

874 ) 

875 

876 if len(expressions) > UNIX_CRON_LEN and second_at_beginning: 

877 # move second to it's own(6th) field to process by same logical 

878 expressions.insert(SECOND_FIELD, expressions.pop(0)) 

879 

880 expanded = [] 

881 nth_weekday_of_month = {} 

882 

883 for field_index, expr in enumerate(expressions): 

884 for expanderid, expander in EXPANDERS.items(): 

885 expr = expander(cls).expand( 

886 efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp 

887 ) 

888 

889 if "?" in expr: 

890 if expr != "?": 

891 raise CroniterBadCronError( 

892 f"[{expr_format}] is not acceptable." 

893 f" Question mark can not used with other characters" 

894 ) 

895 if field_index not in [DAY_FIELD, DOW_FIELD]: 

896 raise CroniterBadCronError( 

897 f"[{expr_format}] is not acceptable. " 

898 f"Question mark can only used in day_of_month or day_of_week" 

899 ) 

900 # currently just trade `?` as `*` 

901 expr = "*" 

902 

903 e_list = expr.split(",") 

904 res = [] 

905 

906 while len(e_list) > 0: 

907 e = e_list.pop() 

908 nth = None 

909 

910 if field_index == DOW_FIELD: 

911 # Handle special case in the dow expression: 2#3, l3 

912 special_dow_rem = special_dow_re.match(str(e)) 

913 if special_dow_rem: 

914 g = special_dow_rem.groupdict() 

915 he, last = g.get("he", ""), g.get("last", "") 

916 if he: 

917 e = he 

918 try: 

919 nth = int(last) 

920 assert 5 >= nth >= 1 

921 except (KeyError, ValueError, AssertionError): 

922 raise CroniterBadCronError( 

923 f"[{expr_format}] is not acceptable." 

924 f" Invalid day_of_week value: '{nth}'" 

925 ) 

926 elif last: 

927 e = last 

928 nth = g["pre"] # 'l' 

929 

930 # Before matching step_search_re, normalize "*" to "{min}-{max}". 

931 # Example: in the minute field, "*/5" normalizes to "0-59/5" 

932 t = re.sub( 

933 r"^\*(\/.+)$", 

934 r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]), 

935 str(e), 

936 ) 

937 m = step_search_re.search(t) 

938 

939 if not m: 

940 # Before matching step_search_re, 

941 # normalize "{start}/{step}" to "{start}-{max}/{step}". 

942 # Example: in the minute field, "10/5" normalizes to "10-59/5" 

943 t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e)) 

944 m = step_search_re.search(t) 

945 

946 if m: 

947 # early abort if low/high are out of bounds 

948 (low, high, step) = m.group(1), m.group(2), m.group(4) or 1 

949 if field_index == DAY_FIELD and high == "l": 

950 high = "31" 

951 

952 if not only_int_re.search(low): 

953 low = str(cls._alphaconv(field_index, low, expressions)) 

954 

955 if not only_int_re.search(high): 

956 high = str(cls._alphaconv(field_index, high, expressions)) 

957 

958 # normally, it's already guarded by the RE that should not accept 

959 # not-int values. 

960 if not only_int_re.search(str(step)): 

961 raise CroniterBadCronError( 

962 f"[{expr_format}] step '{step}'" 

963 f" in field {field_index} is not acceptable" 

964 ) 

965 step = int(step) 

966 

967 for band in low, high: 

968 if not only_int_re.search(str(band)): 

969 raise CroniterBadCronError( 

970 f"[{expr_format}] bands '{low}-{high}'" 

971 f" in field {field_index} are not acceptable" 

972 ) 

973 

974 low, high = ( 

975 cls.value_alias(int(_val), field_index, expressions) 

976 for _val in (low, high) 

977 ) 

978 

979 if max(low, high) > max( 

980 cls.RANGES[field_index][0], cls.RANGES[field_index][1] 

981 ): 

982 raise CroniterBadCronError(f"{expr_format} is out of bands") 

983 

984 if from_timestamp: 

985 low = cls._get_low_from_current_date_number( 

986 field_index, int(step), int(from_timestamp) 

987 ) 

988 

989 # Handle when the second bound of the range is in backtracking order: 

990 # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH 

991 if low > high: 

992 whole_field_range = list( 

993 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1) 

994 ) 

995 # Add FirstBound -> ENDRANGE, respecting step 

996 rng = list(range(low, cls.RANGES[field_index][1] + 1, step)) 

997 # Then 0 -> SecondBound, but skipping n first occurences according to step 

998 # EG to respect such expressions : Apr-Jan/3 

999 to_skip = 0 

1000 if rng: 

1001 already_skipped = list(reversed(whole_field_range)).index(rng[-1]) 

1002 curpos = whole_field_range.index(rng[-1]) 

1003 if ((curpos + step) > len(whole_field_range)) and ( 

1004 already_skipped < step 

1005 ): 

1006 to_skip = step - already_skipped 

1007 rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step)) 

1008 # if we include a range type: Jan-Jan, or Sun-Sun, 

1009 # it means the whole cycle (all days of week, # all monthes of year, etc) 

1010 elif low == high: 

1011 rng = list( 

1012 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step) 

1013 ) 

1014 else: 

1015 try: 

1016 rng = list(range(low, high + 1, step)) 

1017 except ValueError as exc: 

1018 raise CroniterBadCronError(f"invalid range: {exc}") 

1019 

1020 if field_index == DOW_FIELD and nth and nth != "l": 

1021 rng = [f"{item}#{nth}" for item in rng] 

1022 e_list += [a for a in rng if a not in e_list] 

1023 else: 

1024 if t.startswith("-"): 

1025 raise CroniterBadCronError( 

1026 f"[{expr_format}] is not acceptable, negative numbers not allowed" 

1027 ) 

1028 if not star_or_int_re.search(t): 

1029 t = cls._alphaconv(field_index, t, expressions) 

1030 

1031 try: 

1032 t = int(t) 

1033 except ValueError: 

1034 pass 

1035 

1036 t = cls.value_alias(t, field_index, expressions) 

1037 

1038 if t not in ["*", "l"] and ( 

1039 int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1] 

1040 ): 

1041 raise CroniterBadCronError( 

1042 f"[{expr_format}] is not acceptable, out of range" 

1043 ) 

1044 

1045 res.append(t) 

1046 

1047 if field_index == DOW_FIELD and nth: 

1048 if t not in nth_weekday_of_month: 

1049 nth_weekday_of_month[t] = set() 

1050 nth_weekday_of_month[t].add(nth) 

1051 

1052 res = set(res) 

1053 res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i) 

1054 if len(res) == cls.LEN_MEANS_ALL[field_index]: 

1055 # Make sure the wildcard is used in the correct way (avoid over-optimization) 

1056 if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or ( 

1057 field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD] 

1058 ): 

1059 pass 

1060 else: 

1061 res = ["*"] 

1062 

1063 expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res) 

1064 

1065 # Check to make sure the dow combo in use is supported 

1066 if nth_weekday_of_month: 

1067 dow_expanded_set = set(expanded[DOW_FIELD]) 

1068 dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys()) 

1069 dow_expanded_set.discard("*") 

1070 # Skip: if it's all weeks instead of wildcard 

1071 if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]: 

1072 raise CroniterUnsupportedSyntaxError( 

1073 f"day-of-week field does not support mixing literal values and nth" 

1074 f" day of week syntax. Cron: '{expr_format}'" 

1075 f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}" 

1076 ) 

1077 

1078 EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] = expressions 

1079 return expanded, nth_weekday_of_month 

1080 

1081 @classmethod 

1082 def expand( 

1083 cls, 

1084 expr_format: str, 

1085 hash_id: Optional[Union[bytes, str]] = None, 

1086 second_at_beginning: bool = False, 

1087 from_timestamp: Optional[float] = None, 

1088 ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]: 

1089 """ 

1090 Expand a cron expression format into a noramlized format of 

1091 list[list[int | 'l' | '*']]. The first list representing each element 

1092 of the epxression, and each sub-list representing the allowed values 

1093 for that expression component. 

1094 

1095 A tuple is returned, the first value being the expanded epxression 

1096 list, and the second being a `nth_weekday_of_month` mapping. 

1097 

1098 Examples: 

1099 

1100 # Every minute 

1101 >>> croniter.expand('* * * * *') 

1102 ([['*'], ['*'], ['*'], ['*'], ['*']], {}) 

1103 

1104 # On the hour 

1105 >>> croniter.expand('0 0 * * *') 

1106 ([[0], [0], ['*'], ['*'], ['*']], {}) 

1107 

1108 # Hours 0-5 and 10 monday through friday 

1109 >>> croniter.expand('0-5,10 * * * mon-fri') 

1110 ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {}) 

1111 

1112 Note that some special values such as nth day of week are expanded to a 

1113 special mapping format for later processing: 

1114 

1115 # Every minute on the 3rd tuesday of the month 

1116 >>> croniter.expand('* * * * 2#3') 

1117 ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}}) 

1118 

1119 # Every hour on the last day of the month 

1120 >>> croniter.expand('0 * l * *') 

1121 ([[0], ['*'], ['l'], ['*'], ['*']], {}) 

1122 

1123 # On the hour every 15 seconds 

1124 >>> croniter.expand('0 0 * * * */15') 

1125 ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {}) 

1126 """ 

1127 try: 

1128 return cls._expand( 

1129 expr_format, 

1130 hash_id=hash_id, 

1131 second_at_beginning=second_at_beginning, 

1132 from_timestamp=from_timestamp, 

1133 ) 

1134 except (ValueError,) as exc: 

1135 if isinstance(exc, CroniterError): 

1136 raise 

1137 trace = _traceback.format_exc() 

1138 raise CroniterBadCronError(trace) 

1139 

1140 @classmethod 

1141 def _get_low_from_current_date_number(cls, field_index, step, from_timestamp): 

1142 dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT) 

1143 if field_index == MINUTE_FIELD: 

1144 return dt.minute % step 

1145 if field_index == HOUR_FIELD: 

1146 return dt.hour % step 

1147 if field_index == DAY_FIELD: 

1148 return ((dt.day - 1) % step) + 1 

1149 if field_index == MONTH_FIELD: 

1150 return dt.month % step 

1151 if field_index == DOW_FIELD: 

1152 return (dt.weekday() + 1) % step 

1153 

1154 raise ValueError("Can't get current date number for index larger than 4") 

1155 

1156 @classmethod 

1157 def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False): 

1158 if hash_id: 

1159 if not isinstance(hash_id, (bytes, str)): 

1160 raise TypeError("hash_id must be bytes or UTF-8 string") 

1161 if not isinstance(hash_id, bytes): 

1162 hash_id = hash_id.encode(encoding) 

1163 try: 

1164 cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning) 

1165 except CroniterError: 

1166 return False 

1167 return True 

1168 

1169 @classmethod 

1170 def match(cls, cron_expression, testdate, day_or=True, second_at_beginning=False): 

1171 return cls.match_range(cron_expression, testdate, testdate, day_or, second_at_beginning) 

1172 

1173 @classmethod 

1174 def match_range( 

1175 cls, cron_expression, from_datetime, to_datetime, day_or=True, second_at_beginning=False 

1176 ): 

1177 cron = cls( 

1178 cron_expression, 

1179 to_datetime, 

1180 ret_type=datetime.datetime, 

1181 day_or=day_or, 

1182 second_at_beginning=second_at_beginning, 

1183 ) 

1184 tdp = cron.get_current(datetime.datetime) 

1185 if not tdp.microsecond: 

1186 tdp += relativedelta(microseconds=1) 

1187 cron.set_current(tdp, force=True) 

1188 try: 

1189 tdt = cron.get_prev() 

1190 except CroniterBadDateError: 

1191 return False 

1192 precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60 

1193 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds 

1194 return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second 

1195 

1196 

1197def croniter_range( 

1198 start, 

1199 stop, 

1200 expr_format, 

1201 ret_type=None, 

1202 day_or=True, 

1203 exclude_ends=False, 

1204 _croniter=None, 

1205 second_at_beginning=False, 

1206 expand_from_start_time=False, 

1207): 

1208 """ 

1209 Generator that provides all times from start to stop matching the given cron expression. 

1210 If the cron expression matches either 'start' and/or 'stop', those times will be returned as 

1211 well unless 'exclude_ends=True' is passed. 

1212 

1213 You can think of this function as sibling to the builtin range function for datetime objects. 

1214 Like range(start,stop,step), except that here 'step' is a cron expression. 

1215 """ 

1216 _croniter = _croniter or croniter 

1217 auto_rt = datetime.datetime 

1218 # type is used in first if branch for perfs reasons 

1219 if type(start) is not type(stop) and not ( 

1220 isinstance(start, type(stop)) or isinstance(stop, type(start)) 

1221 ): 

1222 raise CroniterBadTypeRangeError( 

1223 f"The start and stop must be same type. {type(start)} != {type(stop)}" 

1224 ) 

1225 if isinstance(start, (float, int)): 

1226 start, stop = ( 

1227 datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop) 

1228 ) 

1229 auto_rt = float 

1230 if ret_type is None: 

1231 ret_type = auto_rt 

1232 if not exclude_ends: 

1233 ms1 = relativedelta(microseconds=1) 

1234 if start < stop: # Forward (normal) time order 

1235 start -= ms1 

1236 stop += ms1 

1237 else: # Reverse time order 

1238 start += ms1 

1239 stop -= ms1 

1240 year_span = math.floor(abs(stop.year - start.year)) + 1 

1241 ic = _croniter( 

1242 expr_format, 

1243 start, 

1244 ret_type=datetime.datetime, 

1245 day_or=day_or, 

1246 max_years_between_matches=year_span, 

1247 second_at_beginning=second_at_beginning, 

1248 expand_from_start_time=expand_from_start_time, 

1249 ) 

1250 # define a continue (cont) condition function and step function for the main while loop 

1251 if start < stop: # Forward 

1252 

1253 def cont(v): 

1254 return v < stop 

1255 

1256 step = ic.get_next 

1257 else: # Reverse 

1258 

1259 def cont(v): 

1260 return v > stop 

1261 

1262 step = ic.get_prev 

1263 try: 

1264 dt = step() 

1265 while cont(dt): 

1266 if ret_type is float: 

1267 yield ic.get_current(float) 

1268 else: 

1269 yield dt 

1270 dt = step() 

1271 except CroniterBadDateError: 

1272 # Stop iteration when this exception is raised; no match found within the given year range 

1273 return 

1274 

1275 

1276class HashExpander: 

1277 def __init__(self, cronit): 

1278 self.cron = cronit 

1279 

1280 def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None): 

1281 """Return a hashed/random integer given range/hash information""" 

1282 if range_end is None: 

1283 range_end = self.cron.RANGES[idx][1] 

1284 if range_begin is None: 

1285 range_begin = self.cron.RANGES[idx][0] 

1286 if hash_type == "r": 

1287 crc = random.randint(0, 0xFFFFFFFF) 

1288 else: 

1289 crc = binascii.crc32(hash_id) & 0xFFFFFFFF 

1290 return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin 

1291 

1292 def match(self, efl, idx, expr, hash_id=None, **kw): 

1293 return hash_expression_re.match(expr) 

1294 

1295 def expand(self, efl, idx, expr, hash_id=None, match="", **kw): 

1296 """Expand a hashed/random expression to its normal representation""" 

1297 if match == "": 

1298 match = self.match(efl, idx, expr, hash_id, **kw) 

1299 if not match: 

1300 return expr 

1301 m = match.groupdict() 

1302 

1303 if m["hash_type"] == "h" and hash_id is None: 

1304 raise CroniterBadCronError("Hashed definitions must include hash_id") 

1305 

1306 if m["range_begin"] and m["range_end"]: 

1307 if int(m["range_begin"]) >= int(m["range_end"]): 

1308 raise CroniterBadCronError("Range end must be greater than range begin") 

1309 

1310 if m["range_begin"] and m["range_end"] and m["divisor"]: 

1311 # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54) 

1312 if int(m["divisor"]) == 0: 

1313 raise CroniterBadCronError(f"Bad expression: {expr}") 

1314 

1315 x = self.do( 

1316 idx, 

1317 hash_type=m["hash_type"], 

1318 hash_id=hash_id, 

1319 range_begin=int(m["range_begin"]), 

1320 range_end=int(m["divisor"]) - 1 + int(m["range_begin"]), 

1321 ) 

1322 return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}" 

1323 if m["range_begin"] and m["range_end"]: 

1324 # Example: H(0-29) -> 12 

1325 return str( 

1326 self.do( 

1327 idx, 

1328 hash_type=m["hash_type"], 

1329 hash_id=hash_id, 

1330 range_end=int(m["range_end"]), 

1331 range_begin=int(m["range_begin"]), 

1332 ) 

1333 ) 

1334 if m["divisor"]: 

1335 # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52) 

1336 if int(m["divisor"]) == 0: 

1337 raise CroniterBadCronError(f"Bad expression: {expr}") 

1338 

1339 x = self.do( 

1340 idx, 

1341 hash_type=m["hash_type"], 

1342 hash_id=hash_id, 

1343 range_begin=self.cron.RANGES[idx][0], 

1344 range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0], 

1345 ) 

1346 return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}" 

1347 

1348 # Example: H -> 32 

1349 return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id)) 

1350 

1351 

1352EXPANDERS = {"hash": HashExpander}