Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/croniter/croniter.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

803 statements  

1#!/usr/bin/env python 

2import binascii 

3import calendar 

4import copy 

5import datetime 

6import math 

7import platform 

8import random 

9import re 

10import struct 

11import sys 

12import traceback as _traceback 

13from time import time 

14from typing import Any, Literal, Optional, Union 

15 

16from dateutil.relativedelta import relativedelta 

17from dateutil.tz import datetime_exists, tzutc 

18 

19ExpandedExpression = list[Union[int, Literal["*", "l"]]] 

20 

21 

22def is_32bit() -> bool: 

23 """ 

24 Detect if Python is running in 32-bit mode. 

25 Returns True if running on 32-bit Python, False for 64-bit. 

26 """ 

27 # Method 1: Check pointer size 

28 bits = struct.calcsize("P") * 8 

29 

30 # Method 2: Check platform architecture string 

31 try: 

32 architecture = platform.architecture()[0] 

33 except RuntimeError: 

34 architecture = None 

35 

36 # Method 3: Check maxsize 

37 is_small_maxsize = sys.maxsize <= 2**32 

38 

39 # Evaluate all available methods 

40 is_32 = False 

41 

42 if bits == 32: 

43 is_32 = True 

44 elif architecture and "32" in architecture: 

45 is_32 = True 

46 elif is_small_maxsize: 

47 is_32 = True 

48 

49 return is_32 

50 

51 

52try: 

53 # https://github.com/python/cpython/issues/101069 detection 

54 if is_32bit(): 

55 datetime.datetime.fromtimestamp(3999999999) 

56 OVERFLOW32B_MODE = False 

57except OverflowError: 

58 OVERFLOW32B_MODE = True 

59 

60 

61UTC_DT = datetime.timezone.utc 

62EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT) 

63 

64M_ALPHAS: dict[str, Union[int, str]] = { 

65 "jan": 1, 

66 "feb": 2, 

67 "mar": 3, 

68 "apr": 4, 

69 "may": 5, 

70 "jun": 6, 

71 "jul": 7, 

72 "aug": 8, 

73 "sep": 9, 

74 "oct": 10, 

75 "nov": 11, 

76 "dec": 12, 

77} 

78DOW_ALPHAS: dict[str, Union[int, str]] = { 

79 "sun": 0, 

80 "mon": 1, 

81 "tue": 2, 

82 "wed": 3, 

83 "thu": 4, 

84 "fri": 5, 

85 "sat": 6, 

86} 

87 

88MINUTE_FIELD = 0 

89HOUR_FIELD = 1 

90DAY_FIELD = 2 

91MONTH_FIELD = 3 

92DOW_FIELD = 4 

93SECOND_FIELD = 5 

94YEAR_FIELD = 6 

95 

96UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD) 

97SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD) 

98YEAR_FIELDS = ( 

99 MINUTE_FIELD, 

100 HOUR_FIELD, 

101 DAY_FIELD, 

102 MONTH_FIELD, 

103 DOW_FIELD, 

104 SECOND_FIELD, 

105 YEAR_FIELD, 

106) 

107 

108step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$") 

109only_int_re = re.compile(r"^\d+$") 

110 

111DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31) 

112WEEKDAYS = "|".join(DOW_ALPHAS.keys()) 

113MONTHS = "|".join(M_ALPHAS.keys()) 

114star_or_int_re = re.compile(r"^(\d+|\*)$") 

115special_dow_re = re.compile( 

116 rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)" 

117 rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$" 

118) 

119nearest_weekday_re = re.compile(r"^(?:(\d+)w|w(\d+))$") 

120re_star = re.compile("[*]") 

121hash_expression_re = re.compile( 

122 r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$" 

123) 

124 

125CRON_FIELDS = { 

126 "unix": UNIX_FIELDS, 

127 "second": SECOND_FIELDS, 

128 "year": YEAR_FIELDS, 

129 len(UNIX_FIELDS): UNIX_FIELDS, 

130 len(SECOND_FIELDS): SECOND_FIELDS, 

131 len(YEAR_FIELDS): YEAR_FIELDS, 

132} 

133UNIX_CRON_LEN = len(UNIX_FIELDS) 

134SECOND_CRON_LEN = len(SECOND_FIELDS) 

135YEAR_CRON_LEN = len(YEAR_FIELDS) 

136# retrocompat 

137VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)} 

138 

139MARKER = object() 

140 

141 

142def datetime_to_timestamp(d): 

143 if d.tzinfo is not None: 

144 d = d.replace(tzinfo=None) - d.utcoffset() 

145 

146 return (d - datetime.datetime(1970, 1, 1)).total_seconds() 

147 

148 

149def _is_leap(year: int) -> bool: 

150 return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0) 

151 

152 

153def _last_day_of_month(year: int, month: int) -> int: 

154 """Calculate the last day of the given month (honor leap years).""" 

155 last_day = DAYS[month - 1] 

156 if month == 2 and _is_leap(year): 

157 last_day += 1 

158 return last_day 

159 

160 

161def _is_successor( 

162 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool 

163) -> bool: 

164 """Check if the given date is a successor (after/before) of the previous date.""" 

165 if is_prev: 

166 return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT) 

167 return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT) 

168 

169 

170def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta: 

171 """Calculate the timezone difference of the given dates.""" 

172 offset1 = date1.utcoffset() 

173 offset2 = date2.utcoffset() 

174 assert offset1 is not None 

175 assert offset2 is not None 

176 return offset2 - offset1 

177 

178 

179def _add_tzinfo( 

180 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool 

181) -> tuple[datetime.datetime, bool]: 

182 """Add the tzinfo from the previous date to the given date. 

183 

184 In case the new date is ambiguous, determine the correct date 

185 based on it being closer to the previous date but still a successor 

186 (after/before based on `is_prev`). 

187 

188 In case the date does not exist, jump forward to the next existing date. 

189 """ 

190 localize = getattr(previous_date.tzinfo, "localize", None) 

191 if localize is not None: 

192 # pylint: disable-next=import-outside-toplevel 

193 import pytz 

194 

195 try: 

196 result = localize(date, is_dst=None) 

197 except pytz.NonExistentTimeError: 

198 while True: 

199 date += datetime.timedelta(minutes=1) 

200 try: 

201 result = localize(date, is_dst=None) 

202 except pytz.NonExistentTimeError: 

203 continue 

204 break 

205 return result, False 

206 except pytz.AmbiguousTimeError: 

207 closer = localize(date, is_dst=not is_prev) 

208 farther = localize(date, is_dst=is_prev) 

209 # TODO: Check negative DST 

210 assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev 

211 if _is_successor(closer, previous_date, is_prev): 

212 result = closer 

213 else: 

214 assert _is_successor(farther, previous_date, is_prev) 

215 result = farther 

216 return result, True 

217 

218 result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo) 

219 if not datetime_exists(result): 

220 while not datetime_exists(result): 

221 result += datetime.timedelta(minutes=1) 

222 return result, False 

223 

224 # result is closer to the previous date 

225 farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo) 

226 # Comparing the UTC offsets in the check for the date being ambiguous. 

227 if result.utcoffset() != farther.utcoffset(): 

228 # TODO: Check negative DST 

229 assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev 

230 if not _is_successor(result, previous_date, is_prev): 

231 assert _is_successor(farther, previous_date, is_prev) 

232 result = farther 

233 return result, True 

234 

235 

236class CroniterError(ValueError): 

237 """General top-level Croniter base exception""" 

238 

239 

240class CroniterBadTypeRangeError(TypeError): 

241 """.""" 

242 

243 

244class CroniterBadCronError(CroniterError): 

245 """Syntax, unknown value, or range error within a cron expression""" 

246 

247 

248class CroniterUnsupportedSyntaxError(CroniterBadCronError): 

249 """Valid cron syntax, but likely to produce inaccurate results""" 

250 

251 # Extending CroniterBadCronError, which may be contridatory, but this allows 

252 # catching both errors with a single exception. From a user perspective 

253 # these will likely be handled the same way. 

254 

255 

256class CroniterBadDateError(CroniterError): 

257 """Unable to find next/prev timestamp match""" 

258 

259 

260class CroniterNotAlphaError(CroniterBadCronError): 

261 """Cron syntax contains an invalid day or month abbreviation""" 

262 

263 

264class croniter: 

265 MONTHS_IN_YEAR = 12 

266 

267 # This helps with expanding `*` fields into `lower-upper` ranges. Each item 

268 # in this tuple maps to the corresponding field index 

269 RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099)) 

270 

271 ALPHACONV: tuple[dict[str, Union[int, str]], ...] = ( 

272 {}, # 0: min 

273 {}, # 1: hour 

274 {"l": "l"}, # 2: dom 

275 # 3: mon 

276 copy.deepcopy(M_ALPHAS), 

277 # 4: dow 

278 copy.deepcopy(DOW_ALPHAS), 

279 # 5: second 

280 {}, 

281 # 6: year 

282 {}, 

283 ) 

284 

285 LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {}) 

286 

287 LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130) 

288 

289 def __init__( 

290 self, 

291 expr_format: str, 

292 start_time: Optional[Union[datetime.datetime, float]] = None, 

293 ret_type: type = float, 

294 day_or: bool = True, 

295 max_years_between_matches: Optional[int] = None, 

296 is_prev: bool = False, 

297 hash_id: Optional[Union[bytes, str]] = None, 

298 implement_cron_bug: bool = False, 

299 second_at_beginning: bool = False, 

300 expand_from_start_time: bool = False, 

301 ) -> None: 

302 self._ret_type = ret_type 

303 self._day_or = day_or 

304 self._implement_cron_bug = implement_cron_bug 

305 self.second_at_beginning = bool(second_at_beginning) 

306 self._expand_from_start_time = expand_from_start_time 

307 

308 if hash_id is not None: 

309 if not isinstance(hash_id, (bytes, str)): 

310 raise TypeError("hash_id must be bytes or UTF-8 string") 

311 if not isinstance(hash_id, bytes): 

312 hash_id = hash_id.encode("UTF-8") 

313 

314 self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None 

315 if max_years_between_matches is None: 

316 max_years_between_matches = 50 

317 self._max_years_between_matches = max(int(max_years_between_matches), 1) 

318 

319 if start_time is None: 

320 start_time = time() 

321 

322 self.tzinfo: Optional[datetime.tzinfo] = None 

323 

324 self.start_time = 0.0 

325 self.dst_start_time = 0.0 

326 self.cur = 0.0 

327 self.set_current(start_time, force=True) 

328 

329 self.expanded, self.nth_weekday_of_month, self.expressions, self.nearest_weekday = self._expand( 

330 expr_format, 

331 hash_id=hash_id, 

332 from_timestamp=self.dst_start_time if self._expand_from_start_time else None, 

333 second_at_beginning=second_at_beginning, 

334 ) 

335 self.fields = CRON_FIELDS[len(self.expanded)] 

336 self._is_prev = is_prev 

337 

338 @classmethod 

339 def _alphaconv(cls, index, key, expressions): 

340 try: 

341 return cls.ALPHACONV[index][key] 

342 except KeyError: 

343 raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable") 

344 

345 def get_next(self, ret_type=None, start_time=None, update_current=True): 

346 if start_time and self._expand_from_start_time: 

347 raise ValueError( 

348 "start_time is not supported when using expand_from_start_time = True." 

349 ) 

350 return self._get_next( 

351 ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current 

352 ) 

353 

354 def get_prev(self, ret_type=None, start_time=None, update_current=True): 

355 return self._get_next( 

356 ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current 

357 ) 

358 

359 def get_current(self, ret_type=None): 

360 ret_type = ret_type or self._ret_type 

361 if issubclass(ret_type, datetime.datetime): 

362 return self.timestamp_to_datetime(self.cur) 

363 return self.cur 

364 

365 def set_current( 

366 self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True 

367 ) -> float: 

368 if (force or (self.cur is None)) and start_time is not None: 

369 if isinstance(start_time, datetime.datetime): 

370 self.tzinfo = start_time.tzinfo 

371 start_time = self.datetime_to_timestamp(start_time) 

372 

373 self.start_time = start_time 

374 self.dst_start_time = start_time 

375 self.cur = start_time 

376 return self.cur 

377 

378 @staticmethod 

379 def datetime_to_timestamp(d: datetime.datetime) -> float: 

380 """ 

381 Converts a `datetime` object `d` into a UNIX timestamp. 

382 """ 

383 return datetime_to_timestamp(d) 

384 

385 _datetime_to_timestamp = datetime_to_timestamp # retrocompat 

386 

387 def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime: 

388 """ 

389 Converts a UNIX `timestamp` into a `datetime` object. 

390 """ 

391 if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set 

392 tzinfo = self.tzinfo 

393 if OVERFLOW32B_MODE: 

394 # degraded mode to workaround Y2038 

395 # see https://github.com/python/cpython/issues/101069 

396 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp) 

397 else: 

398 result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None) 

399 if tzinfo: 

400 result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo) 

401 return result 

402 

403 _timestamp_to_datetime = timestamp_to_datetime # retrocompat 

404 

405 def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None): 

406 if update_current is None: 

407 update_current = True 

408 self.set_current(start_time, force=True) 

409 if is_prev is None: 

410 is_prev = self._is_prev 

411 self._is_prev = is_prev 

412 

413 ret_type = ret_type or self._ret_type 

414 

415 if not issubclass(ret_type, (float, datetime.datetime)): 

416 raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.") 

417 

418 result = self._calc_next(is_prev) 

419 timestamp = self.datetime_to_timestamp(result) 

420 if update_current: 

421 self.cur = timestamp 

422 if issubclass(ret_type, datetime.datetime): 

423 return result 

424 return timestamp 

425 

426 # iterator protocol, to enable direct use of croniter 

427 # objects in a loop, like "for dt in croniter("5 0 * * *'): ..." 

428 # or for combining multiple croniters into single 

429 # dates feed using 'itertools' module 

430 def all_next(self, ret_type=None, start_time=None, update_current=None): 

431 """ 

432 Returns a generator yielding consecutive dates. 

433 

434 May be used instead of an implicit call to __iter__ whenever a 

435 non-default `ret_type` needs to be specified. 

436 """ 

437 # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could 

438 # be used instead 

439 try: 

440 while True: 

441 self._is_prev = False 

442 yield self._get_next( 

443 ret_type=ret_type, start_time=start_time, update_current=update_current 

444 ) 

445 start_time = None 

446 except CroniterBadDateError: 

447 if self._max_years_btw_matches_explicitly_set: 

448 return 

449 raise 

450 

451 def all_prev(self, ret_type=None, start_time=None, update_current=None): 

452 """ 

453 Returns a generator yielding previous dates. 

454 """ 

455 try: 

456 while True: 

457 self._is_prev = True 

458 yield self._get_next( 

459 ret_type=ret_type, start_time=start_time, update_current=update_current 

460 ) 

461 start_time = None 

462 except CroniterBadDateError: 

463 if self._max_years_btw_matches_explicitly_set: 

464 return 

465 raise 

466 

467 def iter(self, *args, **kwargs): 

468 return self.all_prev if self._is_prev else self.all_next 

469 

470 def __iter__(self): 

471 return self 

472 

473 __next__ = next = _get_next 

474 

475 def _calc_next(self, is_prev: bool) -> datetime.datetime: 

476 current = self.timestamp_to_datetime(self.cur) 

477 expanded = self.expanded[:] 

478 nth_weekday_of_month = self.nth_weekday_of_month.copy() 

479 

480 # exception to support day of month and day of week as defined in cron 

481 if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or: 

482 # If requested, handle a bug in vixie cron/ISC cron where day_of_month and 

483 # day_of_week form an intersection (AND) instead of a union (OR) if either 

484 # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html) 

485 if self._implement_cron_bug and ( 

486 re_star.match(self.expressions[DAY_FIELD]) 

487 or re_star.match(self.expressions[DOW_FIELD]) 

488 ): 

489 # To produce a schedule identical to the cron bug, we'll bypass the code 

490 # that makes a union of DOM and DOW, and instead skip to the code that 

491 # does an intersect instead 

492 pass 

493 else: 

494 bak = expanded[DOW_FIELD] 

495 expanded[DOW_FIELD] = ["*"] 

496 t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev) 

497 expanded[DOW_FIELD] = bak 

498 expanded[DAY_FIELD] = ["*"] 

499 

500 t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev) 

501 if is_prev: 

502 return t1 if t1 > t2 else t2 

503 return t1 if t1 < t2 else t2 

504 

505 return self._calc(current, expanded, nth_weekday_of_month, is_prev) 

506 

507 def _calc( 

508 self, 

509 now: datetime.datetime, 

510 expanded: list[ExpandedExpression], 

511 nth_weekday_of_month: dict[int, set[int]], 

512 is_prev: bool, 

513 ) -> datetime.datetime: 

514 if is_prev: 

515 nearest_diff_method = self._get_prev_nearest_diff 

516 offset = relativedelta(microseconds=-1) 

517 else: 

518 nearest_diff_method = self._get_next_nearest_diff 

519 if len(expanded) > UNIX_CRON_LEN: 

520 offset = relativedelta(seconds=1) 

521 else: 

522 offset = relativedelta(minutes=1) 

523 # Calculate the next cron time in local time a.k.a. timezone unaware time. 

524 unaware_time = now.replace(tzinfo=None) + offset 

525 if len(expanded) > UNIX_CRON_LEN: 

526 unaware_time = unaware_time.replace(microsecond=0) 

527 else: 

528 unaware_time = unaware_time.replace(second=0, microsecond=0) 

529 

530 month = unaware_time.month 

531 year = current_year = unaware_time.year 

532 

533 def proc_year(d): 

534 if len(expanded) == YEAR_CRON_LEN: 

535 try: 

536 expanded[YEAR_FIELD].index("*") 

537 except ValueError: 

538 # use None as range_val to indicate no loop 

539 diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None) 

540 if diff_year is None: 

541 return None, d 

542 if diff_year != 0: 

543 if is_prev: 

544 d += relativedelta( 

545 years=diff_year, month=12, day=31, hour=23, minute=59, second=59 

546 ) 

547 else: 

548 d += relativedelta( 

549 years=diff_year, month=1, day=1, hour=0, minute=0, second=0 

550 ) 

551 return True, d 

552 return False, d 

553 

554 def proc_month(d): 

555 try: 

556 expanded[MONTH_FIELD].index("*") 

557 except ValueError: 

558 diff_month = nearest_diff_method( 

559 d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR 

560 ) 

561 reset_day = 1 

562 

563 if diff_month is not None and diff_month != 0: 

564 if is_prev: 

565 d += relativedelta(months=diff_month) 

566 reset_day = _last_day_of_month(d.year, d.month) 

567 d += relativedelta(day=reset_day, hour=23, minute=59, second=59) 

568 else: 

569 d += relativedelta( 

570 months=diff_month, day=reset_day, hour=0, minute=0, second=0 

571 ) 

572 return True, d 

573 return False, d 

574 

575 def proc_day_of_month(d): 

576 try: 

577 expanded[DAY_FIELD].index("*") 

578 except ValueError: 

579 days = _last_day_of_month(year, month) 

580 if "l" in expanded[DAY_FIELD] and days == d.day: 

581 return False, d 

582 

583 if is_prev: 

584 prev_month = (month - 2) % self.MONTHS_IN_YEAR + 1 

585 prev_year = year - 1 if month == 1 else year 

586 days_in_prev_month = _last_day_of_month(prev_year, prev_month) 

587 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month) 

588 else: 

589 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days) 

590 

591 if diff_day is not None and diff_day != 0: 

592 if is_prev: 

593 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

594 else: 

595 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

596 return True, d 

597 return False, d 

598 

599 def proc_day_of_week(d): 

600 try: 

601 expanded[DOW_FIELD].index("*") 

602 except ValueError: 

603 diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7) 

604 if diff_day_of_week is not None and diff_day_of_week != 0: 

605 if is_prev: 

606 d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59) 

607 else: 

608 d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0) 

609 return True, d 

610 return False, d 

611 

612 def proc_day_of_week_nth(d): 

613 if "*" in nth_weekday_of_month: 

614 s = nth_weekday_of_month["*"] 

615 for i in range(0, 7): 

616 if i in nth_weekday_of_month: 

617 nth_weekday_of_month[i].update(s) 

618 else: 

619 nth_weekday_of_month[i] = s 

620 del nth_weekday_of_month["*"] 

621 

622 candidates = [] 

623 for wday, nth in nth_weekday_of_month.items(): 

624 c = self._get_nth_weekday_of_month(d.year, d.month, wday) 

625 for n in nth: 

626 if n == "l": 

627 candidate = c[-1] 

628 elif len(c) < n: 

629 continue 

630 else: 

631 candidate = c[n - 1] 

632 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate): 

633 candidates.append(candidate) 

634 

635 if not candidates: 

636 if is_prev: 

637 d += relativedelta(days=-d.day, hour=23, minute=59, second=59) 

638 else: 

639 days = _last_day_of_month(year, month) 

640 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0) 

641 return True, d 

642 

643 candidates.sort() 

644 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day 

645 if diff_day != 0: 

646 if is_prev: 

647 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

648 else: 

649 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

650 return True, d 

651 return False, d 

652 

653 def proc_nearest_weekday(d): 

654 """Process W (nearest weekday) day-of-month entries.""" 

655 candidates = [] 

656 for w_day in self.nearest_weekday: 

657 candidate = self._get_nearest_weekday(d.year, d.month, w_day) 

658 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate): 

659 candidates.append(candidate) 

660 

661 if not candidates: 

662 if is_prev: 

663 d += relativedelta(days=-d.day, hour=23, minute=59, second=59) 

664 else: 

665 days = _last_day_of_month(year, month) 

666 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0) 

667 return True, d 

668 

669 candidates.sort() 

670 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day 

671 if diff_day != 0: 

672 if is_prev: 

673 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

674 else: 

675 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

676 return True, d 

677 return False, d 

678 

679 def proc_hour(d): 

680 try: 

681 expanded[HOUR_FIELD].index("*") 

682 except ValueError: 

683 diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24) 

684 if diff_hour is not None and diff_hour != 0: 

685 if is_prev: 

686 d += relativedelta(hours=diff_hour, minute=59, second=59) 

687 else: 

688 d += relativedelta(hours=diff_hour, minute=0, second=0) 

689 return True, d 

690 return False, d 

691 

692 def proc_minute(d): 

693 try: 

694 expanded[MINUTE_FIELD].index("*") 

695 except ValueError: 

696 diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60) 

697 if diff_min is not None and diff_min != 0: 

698 if is_prev: 

699 d += relativedelta(minutes=diff_min, second=59) 

700 else: 

701 d += relativedelta(minutes=diff_min, second=0) 

702 return True, d 

703 return False, d 

704 

705 def proc_second(d): 

706 if len(expanded) > UNIX_CRON_LEN: 

707 try: 

708 expanded[SECOND_FIELD].index("*") 

709 except ValueError: 

710 diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60) 

711 if diff_sec is not None and diff_sec != 0: 

712 d += relativedelta(seconds=diff_sec) 

713 return True, d 

714 else: 

715 d += relativedelta(second=0) 

716 return False, d 

717 

718 procs = [ 

719 proc_year, 

720 proc_month, 

721 (proc_nearest_weekday if self.nearest_weekday else proc_day_of_month), 

722 (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week), 

723 proc_hour, 

724 proc_minute, 

725 proc_second, 

726 ] 

727 

728 while abs(year - current_year) <= self._max_years_between_matches: 

729 next = False 

730 stop = False 

731 for proc in procs: 

732 (changed, unaware_time) = proc(unaware_time) 

733 # `None` can be set mostly for year processing 

734 # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff 

735 if changed is None: 

736 stop = True 

737 break 

738 if changed: 

739 month, year = unaware_time.month, unaware_time.year 

740 next = True 

741 break 

742 if stop: 

743 break 

744 if next: 

745 continue 

746 

747 unaware_time = unaware_time.replace(microsecond=0) 

748 if now.tzinfo is None: 

749 return unaware_time 

750 

751 # Add timezone information back and handle DST changes 

752 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev) 

753 

754 if not exists and ( 

755 not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD] 

756 ): 

757 # The calculated local date does not exist and moving the time forward 

758 # to the next valid time isn't the correct solution. Search for the 

759 # next matching cron time that exists. 

760 while not exists: 

761 unaware_time = self._calc( 

762 unaware_time, expanded, nth_weekday_of_month, is_prev 

763 ) 

764 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev) 

765 

766 offset_delta = _timezone_delta(now, aware_time) 

767 if not offset_delta: 

768 # There was no DST change. 

769 return aware_time 

770 

771 # There was a DST change. So check if there is a alternative cron time 

772 # for the other UTC offset. 

773 alternative_unaware_time = now.replace(tzinfo=None) + offset_delta 

774 alternative_unaware_time = self._calc( 

775 alternative_unaware_time, expanded, nth_weekday_of_month, is_prev 

776 ) 

777 alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev) 

778 

779 if not _is_successor(alternative_aware_time, now, is_prev): 

780 # The alternative time is an ancestor of now. Thus it is not an alternative. 

781 return aware_time 

782 

783 if _is_successor(aware_time, alternative_aware_time, is_prev): 

784 return alternative_aware_time 

785 

786 return aware_time 

787 

788 if is_prev: 

789 raise CroniterBadDateError("failed to find prev date") 

790 raise CroniterBadDateError("failed to find next date") 

791 

792 @staticmethod 

793 def _get_next_nearest_diff(x, to_check, range_val): 

794 """ 

795 `range_val` is the range of a field. 

796 If no available time, we can move to next loop(like next month). 

797 `range_val` can also be set to `None` to indicate that there is no loop. 

798 ( Currently, should only used for `year` field ) 

799 """ 

800 for i, d in enumerate(to_check): 

801 if range_val is not None: 

802 if d == "l": 

803 # if 'l' then it is the last day of month 

804 # => its value of range_val 

805 d = range_val 

806 elif d > range_val: 

807 continue 

808 if d >= x: 

809 return d - x 

810 # When range_val is None and x not exists in to_check, 

811 # `None` will be returned to suggest no more available time 

812 if range_val is None: 

813 return None 

814 return to_check[0] - x + range_val 

815 

816 @staticmethod 

817 def _get_prev_nearest_diff(x, to_check, range_val): 

818 """ 

819 `range_val` is the range of a field. 

820 If no available time, we can move to previous loop(like previous month). 

821 Range_val can also be set to `None` to indicate that there is no loop. 

822 ( Currently should only used for `year` field ) 

823 """ 

824 candidates = to_check[:] 

825 candidates.reverse() 

826 for d in candidates: 

827 if d != "l" and d <= x: 

828 return d - x 

829 if "l" in candidates: 

830 return -x 

831 # When range_val is None and x not exists in to_check, 

832 # `None` will be returned to suggest no more available time 

833 if range_val is None: 

834 return None 

835 candidate = candidates[0] 

836 for c in candidates: 

837 # fixed: c < range_val 

838 # this code will reject all 31 day of month, 12 month, 59 second, 

839 # 23 hour and so on. 

840 # if candidates has just a element, this will not harmful. 

841 # but candidates have multiple elements, then values equal to 

842 # range_val will rejected. 

843 if c <= range_val: 

844 candidate = c 

845 break 

846 # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error 

847 # return 2021-03-02 06:00:00 

848 if candidate > range_val: 

849 return -range_val 

850 return candidate - x - range_val 

851 

852 @staticmethod 

853 def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]: 

854 """For a given year/month return a list of days in nth-day-of-month order. 

855 The last weekday of the month is always [-1]. 

856 """ 

857 w = (day_of_week + 6) % 7 

858 c = calendar.Calendar(w).monthdayscalendar(year, month) 

859 if c[0][0] == 0: 

860 c.pop(0) 

861 return tuple(i[0] for i in c) 

862 

863 @staticmethod 

864 def _get_nearest_weekday(year, month, day): 

865 """Get the nearest weekday (Mon-Fri) to the given day in the given month. 

866 

867 Rules: 

868 - If the day is a weekday, return it. 

869 - If Saturday, return Friday (day-1), unless that crosses into previous month, 

870 then return Monday (day+2). 

871 - If Sunday, return Monday (day+1), unless that crosses into next month, 

872 then return Friday (day-2). 

873 """ 

874 last_day = _last_day_of_month(year, month) 

875 day = min(day, last_day) 

876 weekday = calendar.weekday(year, month, day) # 0=Mon, 6=Sun 

877 if weekday < 5: # Mon-Fri 

878 return day 

879 if weekday == 5: # Saturday 

880 if day > 1: 

881 return day - 1 # Friday 

882 else: 

883 return day + 2 # Monday (1st is Sat, so 3rd is Mon) 

884 # Sunday 

885 if day < last_day: 

886 return day + 1 # Monday 

887 else: 

888 return day - 2 # Friday (last day is Sun, go back to Fri) 

889 

890 @classmethod 

891 def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN): 

892 if isinstance(len_expressions, (list, dict, tuple, set)): 

893 len_expressions = len(len_expressions) 

894 if val in cls.LOWMAP[field_index] and not ( 

895 # do not support 0 as a month either for classical 5 fields cron, 

896 # 6fields second repeat form or 7 fields year form 

897 # but still let conversion happen if day field is shifted 

898 (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN) 

899 or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN) 

900 or ( 

901 field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD] 

902 and len_expressions == YEAR_CRON_LEN 

903 ) 

904 ): 

905 val = cls.LOWMAP[field_index][val] 

906 return val 

907 

908 # Maximum days in each month (non-leap year for Feb) 

909 DAYS_IN_MONTH = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31} 

910 

911 @classmethod 

912 def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None, strict=False, strict_year=None): 

913 # Split the expression in components, and normalize L -> l, MON -> mon, 

914 # etc. Keep expr_format untouched so we can use it in the exception 

915 # messages. 

916 expr_aliases = { 

917 "@midnight": ("0 0 * * *", "h h(0-2) * * * h"), 

918 "@hourly": ("0 * * * *", "h * * * * h"), 

919 "@daily": ("0 0 * * *", "h h * * * h"), 

920 "@weekly": ("0 0 * * 0", "h h * * h h"), 

921 "@monthly": ("0 0 1 * *", "h h h * * h"), 

922 "@yearly": ("0 0 1 1 *", "h h h h * h"), 

923 "@annually": ("0 0 1 1 *", "h h h h * h"), 

924 } 

925 

926 efl = expr_format.lower() 

927 hash_id_expr = 1 if hash_id is not None else 0 

928 try: 

929 efl = expr_aliases[efl][hash_id_expr] 

930 except KeyError: 

931 pass 

932 

933 expressions = efl.split() 

934 

935 if len(expressions) not in VALID_LEN_EXPRESSION: 

936 raise CroniterBadCronError( 

937 "Exactly 5, 6 or 7 columns has to be specified for iterator expression." 

938 ) 

939 

940 if len(expressions) > UNIX_CRON_LEN and second_at_beginning: 

941 # move second to it's own(6th) field to process by same logical 

942 expressions.insert(SECOND_FIELD, expressions.pop(0)) 

943 

944 expanded = [] 

945 nth_weekday_of_month = {} 

946 nearest_weekday = set() 

947 

948 for field_index, expr in enumerate(expressions): 

949 for expanderid, expander in EXPANDERS.items(): 

950 expr = expander(cls).expand( 

951 efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp 

952 ) 

953 

954 if "?" in expr: 

955 if expr != "?": 

956 raise CroniterBadCronError( 

957 f"[{expr_format}] is not acceptable." 

958 f" Question mark can not used with other characters" 

959 ) 

960 if field_index not in [DAY_FIELD, DOW_FIELD]: 

961 raise CroniterBadCronError( 

962 f"[{expr_format}] is not acceptable. " 

963 f"Question mark can only used in day_of_month or day_of_week" 

964 ) 

965 # currently just trade `?` as `*` 

966 expr = "*" 

967 

968 e_list = expr.split(",") 

969 res = [] 

970 

971 while len(e_list) > 0: 

972 e = e_list.pop() 

973 nth = None 

974 

975 if field_index == DOW_FIELD: 

976 # Handle special case in the dow expression: 2#3, l3 

977 special_dow_rem = special_dow_re.match(str(e)) 

978 if special_dow_rem: 

979 g = special_dow_rem.groupdict() 

980 he, last = g.get("he", ""), g.get("last", "") 

981 if he: 

982 e = he 

983 try: 

984 nth = int(last) 

985 assert 5 >= nth >= 1 

986 except (KeyError, ValueError, AssertionError): 

987 raise CroniterBadCronError( 

988 f"[{expr_format}] is not acceptable." 

989 f" Invalid day_of_week value: '{nth}'" 

990 ) 

991 elif last: 

992 e = last 

993 nth = g["pre"] # 'l' 

994 

995 if field_index == DAY_FIELD: 

996 # Handle W (nearest weekday) in day-of-month: 15w, w15 

997 w_match = nearest_weekday_re.match(str(e)) 

998 if w_match: 

999 w_day = int(w_match.group(1) or w_match.group(2)) 

1000 if w_day < 1 or w_day > 31: 

1001 raise CroniterBadCronError( 

1002 f"[{expr_format}] is not acceptable," 

1003 f" nearest weekday day value '{w_day}' out of range" 

1004 ) 

1005 if len(e_list) > 0 or len(res) > 0: 

1006 raise CroniterBadCronError( 

1007 f"[{expr_format}] is not acceptable." 

1008 f" 'W' can only be used with a single day value," 

1009 f" not in a list or range" 

1010 ) 

1011 nearest_weekday.add(w_day) 

1012 res.append(w_day) 

1013 continue 

1014 

1015 # Before matching step_search_re, normalize "*" to "{min}-{max}". 

1016 # Example: in the minute field, "*/5" normalizes to "0-59/5" 

1017 t = re.sub( 

1018 r"^\*(\/.+)$", 

1019 r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]), 

1020 str(e), 

1021 ) 

1022 m = step_search_re.search(t) 

1023 

1024 if not m: 

1025 # Before matching step_search_re, 

1026 # normalize "{start}/{step}" to "{start}-{max}/{step}". 

1027 # Example: in the minute field, "10/5" normalizes to "10-59/5" 

1028 t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e)) 

1029 m = step_search_re.search(t) 

1030 

1031 if m: 

1032 # early abort if low/high are out of bounds 

1033 (low, high, step) = m.group(1), m.group(2), m.group(4) or 1 

1034 if field_index == DAY_FIELD and high == "l": 

1035 high = "31" 

1036 

1037 if not only_int_re.search(low): 

1038 low = str(cls._alphaconv(field_index, low, expressions)) 

1039 

1040 if not only_int_re.search(high): 

1041 high = str(cls._alphaconv(field_index, high, expressions)) 

1042 

1043 # normally, it's already guarded by the RE that should not accept 

1044 # not-int values. 

1045 if not only_int_re.search(str(step)): 

1046 raise CroniterBadCronError( 

1047 f"[{expr_format}] step '{step}'" 

1048 f" in field {field_index} is not acceptable" 

1049 ) 

1050 step = int(step) 

1051 

1052 for band in low, high: 

1053 if not only_int_re.search(str(band)): 

1054 raise CroniterBadCronError( 

1055 f"[{expr_format}] bands '{low}-{high}'" 

1056 f" in field {field_index} are not acceptable" 

1057 ) 

1058 

1059 low, high = ( 

1060 cls.value_alias(int(_val), field_index, expressions) 

1061 for _val in (low, high) 

1062 ) 

1063 

1064 if max(low, high) > max( 

1065 cls.RANGES[field_index][0], cls.RANGES[field_index][1] 

1066 ): 

1067 raise CroniterBadCronError(f"{expr_format} is out of bands") 

1068 

1069 if from_timestamp: 

1070 low = cls._get_low_from_current_date_number( 

1071 field_index, int(step), int(from_timestamp) 

1072 ) 

1073 

1074 # Handle when the second bound of the range is in backtracking order: 

1075 # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH 

1076 if low > high: 

1077 whole_field_range = list( 

1078 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1) 

1079 ) 

1080 # Add FirstBound -> ENDRANGE, respecting step 

1081 rng = list(range(low, cls.RANGES[field_index][1] + 1, step)) 

1082 # Then 0 -> SecondBound, but skipping n first occurences according to step 

1083 # EG to respect such expressions : Apr-Jan/3 

1084 to_skip = 0 

1085 if rng: 

1086 already_skipped = list(reversed(whole_field_range)).index(rng[-1]) 

1087 curpos = whole_field_range.index(rng[-1]) 

1088 if ((curpos + step) > len(whole_field_range)) and ( 

1089 already_skipped < step 

1090 ): 

1091 to_skip = step - already_skipped 

1092 rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step)) 

1093 # if we include a range type: Jan-Jan, or Sun-Sun, 

1094 # it means the whole cycle (all days of week, # all monthes of year, etc) 

1095 elif low == high: 

1096 rng = list( 

1097 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step) 

1098 ) 

1099 else: 

1100 try: 

1101 rng = list(range(low, high + 1, step)) 

1102 except ValueError as exc: 

1103 raise CroniterBadCronError(f"invalid range: {exc}") 

1104 

1105 if field_index == DOW_FIELD and nth and nth != "l": 

1106 rng = [f"{item}#{nth}" for item in rng] 

1107 e_list += [a for a in rng if a not in e_list] 

1108 else: 

1109 if t.startswith("-"): 

1110 raise CroniterBadCronError( 

1111 f"[{expr_format}] is not acceptable, negative numbers not allowed" 

1112 ) 

1113 if not star_or_int_re.search(t): 

1114 t = cls._alphaconv(field_index, t, expressions) 

1115 

1116 try: 

1117 t = int(t) 

1118 except ValueError: 

1119 pass 

1120 

1121 t = cls.value_alias(t, field_index, expressions) 

1122 

1123 if t not in ["*", "l"] and ( 

1124 int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1] 

1125 ): 

1126 raise CroniterBadCronError( 

1127 f"[{expr_format}] is not acceptable, out of range" 

1128 ) 

1129 

1130 res.append(t) 

1131 

1132 if field_index == DOW_FIELD and nth: 

1133 if t not in nth_weekday_of_month: 

1134 nth_weekday_of_month[t] = set() 

1135 nth_weekday_of_month[t].add(nth) 

1136 

1137 res = set(res) 

1138 res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i) 

1139 if len(res) == cls.LEN_MEANS_ALL[field_index]: 

1140 # Make sure the wildcard is used in the correct way (avoid over-optimization) 

1141 if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or ( 

1142 field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD] 

1143 ): 

1144 pass 

1145 else: 

1146 res = ["*"] 

1147 

1148 expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res) 

1149 

1150 # Check to make sure the dow combo in use is supported 

1151 if nth_weekday_of_month: 

1152 dow_expanded_set = set(expanded[DOW_FIELD]) 

1153 dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys()) 

1154 dow_expanded_set.discard("*") 

1155 # Skip: if it's all weeks instead of wildcard 

1156 if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]: 

1157 raise CroniterUnsupportedSyntaxError( 

1158 f"day-of-week field does not support mixing literal values and nth" 

1159 f" day of week syntax. Cron: '{expr_format}'" 

1160 f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}" 

1161 ) 

1162 

1163 if strict: 

1164 # Cross-validate day-of-month against month (and optionally year) 

1165 # to reject impossible combinations like "0 0 31 2 *" (Feb 31st). 

1166 days = expanded[DAY_FIELD] 

1167 months = expanded[MONTH_FIELD] 

1168 if days != ["*"] and days != ["l"] and months != ["*"]: 

1169 int_days = [d for d in days if isinstance(d, int)] 

1170 int_months = [m for m in months if isinstance(m, int)] 

1171 if int_days and int_months: 

1172 # Determine max days per month, accounting for leap years 

1173 days_in_month = dict(cls.DAYS_IN_MONTH) 

1174 if 2 in int_months: 

1175 has_leap_year = True # assume possible by default 

1176 if strict_year is not None: 

1177 # Year explicitly provided as parameter 

1178 if isinstance(strict_year, int): 

1179 has_leap_year = calendar.isleap(strict_year) 

1180 else: 

1181 has_leap_year = any(calendar.isleap(y) for y in strict_year) 

1182 elif len(expanded) > YEAR_FIELD: 

1183 years = expanded[YEAR_FIELD] 

1184 if years != ["*"]: 

1185 int_years = [y for y in years if isinstance(y, int)] 

1186 if int_years: 

1187 has_leap_year = any(calendar.isleap(y) for y in int_years) 

1188 if has_leap_year: 

1189 days_in_month[2] = 29 

1190 min_day = min(int_days) 

1191 max_possible = max(days_in_month[m] for m in int_months) 

1192 if min_day > max_possible: 

1193 raise CroniterBadCronError( 

1194 f"[{expr_format}] is not acceptable. Day(s) {int_days}" 

1195 f" can never occur in month(s) {int_months}" 

1196 ) 

1197 

1198 return expanded, nth_weekday_of_month, expressions, nearest_weekday 

1199 

1200 @classmethod 

1201 def expand( 

1202 cls, 

1203 expr_format: str, 

1204 hash_id: Optional[Union[bytes, str]] = None, 

1205 second_at_beginning: bool = False, 

1206 from_timestamp: Optional[float] = None, 

1207 strict: bool = False, 

1208 strict_year: Optional[Union[int, list[int]]] = None, 

1209 ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]: 

1210 """ 

1211 Expand a cron expression format into a noramlized format of 

1212 list[list[int | 'l' | '*']]. The first list representing each element 

1213 of the epxression, and each sub-list representing the allowed values 

1214 for that expression component. 

1215 

1216 A tuple is returned, the first value being the expanded epxression 

1217 list, and the second being a `nth_weekday_of_month` mapping. 

1218 

1219 Examples: 

1220 

1221 # Every minute 

1222 >>> croniter.expand('* * * * *') 

1223 ([['*'], ['*'], ['*'], ['*'], ['*']], {}) 

1224 

1225 # On the hour 

1226 >>> croniter.expand('0 0 * * *') 

1227 ([[0], [0], ['*'], ['*'], ['*']], {}) 

1228 

1229 # Hours 0-5 and 10 monday through friday 

1230 >>> croniter.expand('0-5,10 * * * mon-fri') 

1231 ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {}) 

1232 

1233 Note that some special values such as nth day of week are expanded to a 

1234 special mapping format for later processing: 

1235 

1236 # Every minute on the 3rd tuesday of the month 

1237 >>> croniter.expand('* * * * 2#3') 

1238 ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}}) 

1239 

1240 # Every hour on the last day of the month 

1241 >>> croniter.expand('0 * l * *') 

1242 ([[0], ['*'], ['l'], ['*'], ['*']], {}) 

1243 

1244 # On the hour every 15 seconds 

1245 >>> croniter.expand('0 0 * * * */15') 

1246 ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {}) 

1247 """ 

1248 try: 

1249 expanded, nth_weekday_of_month, _expressions, _nearest_weekday = cls._expand( 

1250 expr_format, 

1251 hash_id=hash_id, 

1252 second_at_beginning=second_at_beginning, 

1253 from_timestamp=from_timestamp, 

1254 strict=strict, 

1255 strict_year=strict_year, 

1256 ) 

1257 return expanded, nth_weekday_of_month 

1258 except (ValueError,) as exc: 

1259 if isinstance(exc, CroniterError): 

1260 raise 

1261 trace = _traceback.format_exc() 

1262 raise CroniterBadCronError(trace) 

1263 

1264 @classmethod 

1265 def _get_low_from_current_date_number(cls, field_index, step, from_timestamp): 

1266 dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT) 

1267 if field_index == MINUTE_FIELD: 

1268 return dt.minute % step 

1269 if field_index == HOUR_FIELD: 

1270 return dt.hour % step 

1271 if field_index == DAY_FIELD: 

1272 return ((dt.day - 1) % step) + 1 

1273 if field_index == MONTH_FIELD: 

1274 return dt.month % step 

1275 if field_index == DOW_FIELD: 

1276 return (dt.weekday() + 1) % step 

1277 

1278 raise ValueError("Can't get current date number for index larger than 4") 

1279 

1280 @classmethod 

1281 def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False, strict=False, strict_year=None): 

1282 if hash_id: 

1283 if not isinstance(hash_id, (bytes, str)): 

1284 raise TypeError("hash_id must be bytes or UTF-8 string") 

1285 if not isinstance(hash_id, bytes): 

1286 hash_id = hash_id.encode(encoding) 

1287 try: 

1288 cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning, strict=strict, strict_year=strict_year) 

1289 except CroniterError: 

1290 return False 

1291 return True 

1292 

1293 @classmethod 

1294 def match( 

1295 cls, 

1296 cron_expression, 

1297 testdate, 

1298 day_or=True, 

1299 second_at_beginning=False, 

1300 precision_in_seconds=None, 

1301 ): 

1302 return cls.match_range( 

1303 cron_expression, testdate, testdate, day_or, second_at_beginning, precision_in_seconds 

1304 ) 

1305 

1306 @classmethod 

1307 def match_range( 

1308 cls, 

1309 cron_expression, 

1310 from_datetime, 

1311 to_datetime, 

1312 day_or=True, 

1313 second_at_beginning=False, 

1314 precision_in_seconds=None, 

1315 ): 

1316 cron = cls( 

1317 cron_expression, 

1318 to_datetime, 

1319 ret_type=datetime.datetime, 

1320 day_or=day_or, 

1321 second_at_beginning=second_at_beginning, 

1322 ) 

1323 tdp = cron.get_current(datetime.datetime) 

1324 if not tdp.microsecond: 

1325 tdp += relativedelta(microseconds=1) 

1326 cron.set_current(tdp, force=True) 

1327 try: 

1328 tdt = cron.get_prev() 

1329 except CroniterBadDateError: 

1330 return False 

1331 if precision_in_seconds is None: 

1332 precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60 

1333 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds 

1334 return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second 

1335 

1336 

1337def croniter_range( 

1338 start, 

1339 stop, 

1340 expr_format, 

1341 ret_type=None, 

1342 day_or=True, 

1343 exclude_ends=False, 

1344 _croniter=None, 

1345 second_at_beginning=False, 

1346 expand_from_start_time=False, 

1347): 

1348 """ 

1349 Generator that provides all times from start to stop matching the given cron expression. 

1350 If the cron expression matches either 'start' and/or 'stop', those times will be returned as 

1351 well unless 'exclude_ends=True' is passed. 

1352 

1353 You can think of this function as sibling to the builtin range function for datetime objects. 

1354 Like range(start,stop,step), except that here 'step' is a cron expression. 

1355 """ 

1356 _croniter = _croniter or croniter 

1357 auto_rt = datetime.datetime 

1358 # type is used in first if branch for perfs reasons 

1359 if type(start) is not type(stop) and not ( 

1360 isinstance(start, type(stop)) or isinstance(stop, type(start)) 

1361 ): 

1362 raise CroniterBadTypeRangeError( 

1363 f"The start and stop must be same type. {type(start)} != {type(stop)}" 

1364 ) 

1365 if isinstance(start, (float, int)): 

1366 start, stop = ( 

1367 datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop) 

1368 ) 

1369 auto_rt = float 

1370 if ret_type is None: 

1371 ret_type = auto_rt 

1372 if not exclude_ends: 

1373 ms1 = relativedelta(microseconds=1) 

1374 if start < stop: # Forward (normal) time order 

1375 start -= ms1 

1376 stop += ms1 

1377 else: # Reverse time order 

1378 start += ms1 

1379 stop -= ms1 

1380 year_span = math.floor(abs(stop.year - start.year)) + 1 

1381 ic = _croniter( 

1382 expr_format, 

1383 start, 

1384 ret_type=datetime.datetime, 

1385 day_or=day_or, 

1386 max_years_between_matches=year_span, 

1387 second_at_beginning=second_at_beginning, 

1388 expand_from_start_time=expand_from_start_time, 

1389 ) 

1390 # define a continue (cont) condition function and step function for the main while loop 

1391 if start < stop: # Forward 

1392 

1393 def cont(v): 

1394 return v < stop 

1395 

1396 step = ic.get_next 

1397 else: # Reverse 

1398 

1399 def cont(v): 

1400 return v > stop 

1401 

1402 step = ic.get_prev 

1403 try: 

1404 dt = step() 

1405 while cont(dt): 

1406 if ret_type is float: 

1407 yield ic.get_current(float) 

1408 else: 

1409 yield dt 

1410 dt = step() 

1411 except CroniterBadDateError: 

1412 # Stop iteration when this exception is raised; no match found within the given year range 

1413 return 

1414 

1415 

1416class HashExpander: 

1417 def __init__(self, cronit): 

1418 self.cron = cronit 

1419 

1420 def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None): 

1421 """Return a hashed/random integer given range/hash information""" 

1422 if range_end is None: 

1423 range_end = self.cron.RANGES[idx][1] 

1424 if range_begin is None: 

1425 range_begin = self.cron.RANGES[idx][0] 

1426 if hash_type == "r": 

1427 crc = random.randint(0, 0xFFFFFFFF) 

1428 else: 

1429 crc = binascii.crc32(hash_id) & 0xFFFFFFFF 

1430 return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin 

1431 

1432 def match(self, efl, idx, expr, hash_id=None, **kw): 

1433 return hash_expression_re.match(expr) 

1434 

1435 def expand(self, efl, idx, expr, hash_id=None, match="", **kw): 

1436 """Expand a hashed/random expression to its normal representation""" 

1437 if match == "": 

1438 match = self.match(efl, idx, expr, hash_id, **kw) 

1439 if not match: 

1440 return expr 

1441 m = match.groupdict() 

1442 

1443 if m["hash_type"] == "h" and hash_id is None: 

1444 raise CroniterBadCronError("Hashed definitions must include hash_id") 

1445 

1446 if m["range_begin"] and m["range_end"]: 

1447 if int(m["range_begin"]) >= int(m["range_end"]): 

1448 raise CroniterBadCronError("Range end must be greater than range begin") 

1449 

1450 if m["range_begin"] and m["range_end"] and m["divisor"]: 

1451 # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54) 

1452 if int(m["divisor"]) == 0: 

1453 raise CroniterBadCronError(f"Bad expression: {expr}") 

1454 

1455 x = self.do( 

1456 idx, 

1457 hash_type=m["hash_type"], 

1458 hash_id=hash_id, 

1459 range_begin=int(m["range_begin"]), 

1460 range_end=int(m["divisor"]) - 1 + int(m["range_begin"]), 

1461 ) 

1462 return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}" 

1463 if m["range_begin"] and m["range_end"]: 

1464 # Example: H(0-29) -> 12 

1465 return str( 

1466 self.do( 

1467 idx, 

1468 hash_type=m["hash_type"], 

1469 hash_id=hash_id, 

1470 range_end=int(m["range_end"]), 

1471 range_begin=int(m["range_begin"]), 

1472 ) 

1473 ) 

1474 if m["divisor"]: 

1475 # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52) 

1476 if int(m["divisor"]) == 0: 

1477 raise CroniterBadCronError(f"Bad expression: {expr}") 

1478 

1479 x = self.do( 

1480 idx, 

1481 hash_type=m["hash_type"], 

1482 hash_id=hash_id, 

1483 range_begin=self.cron.RANGES[idx][0], 

1484 range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0], 

1485 ) 

1486 return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}" 

1487 

1488 # Example: H -> 32 

1489 return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id)) 

1490 

1491 

1492EXPANDERS = {"hash": HashExpander}