Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/croniter/croniter.py: 79%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

805 statements  

1#!/usr/bin/env python 

2import binascii 

3import calendar 

4import copy 

5import datetime 

6import math 

7import platform 

8import random 

9import re 

10import struct 

11import sys 

12import traceback as _traceback 

13from time import time 

14from typing import Any, Literal, Optional, Union 

15 

16from dateutil.relativedelta import relativedelta 

17from dateutil.tz import datetime_exists, tzutc 

18 

19ExpandedExpression = list[Union[int, Literal["*", "l"]]] 

20 

21 

22def is_32bit() -> bool: 

23 """ 

24 Detect if Python is running in 32-bit mode. 

25 Returns True if running on 32-bit Python, False for 64-bit. 

26 """ 

27 # Method 1: Check pointer size 

28 bits = struct.calcsize("P") * 8 

29 

30 # Method 2: Check platform architecture string 

31 try: 

32 architecture = platform.architecture()[0] 

33 except RuntimeError: 

34 architecture = None 

35 

36 # Method 3: Check maxsize 

37 is_small_maxsize = sys.maxsize <= 2**32 

38 

39 # Evaluate all available methods 

40 is_32 = False 

41 

42 if bits == 32: 

43 is_32 = True 

44 elif architecture and "32" in architecture: 

45 is_32 = True 

46 elif is_small_maxsize: 

47 is_32 = True 

48 

49 return is_32 

50 

51 

52try: 

53 # https://github.com/python/cpython/issues/101069 detection 

54 if is_32bit(): 

55 datetime.datetime.fromtimestamp(3999999999) 

56 OVERFLOW32B_MODE = False 

57except OverflowError: 

58 OVERFLOW32B_MODE = True 

59 

60 

61UTC_DT = datetime.timezone.utc 

62EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT) 

63 

64M_ALPHAS: dict[str, Union[int, str]] = { 

65 "jan": 1, 

66 "feb": 2, 

67 "mar": 3, 

68 "apr": 4, 

69 "may": 5, 

70 "jun": 6, 

71 "jul": 7, 

72 "aug": 8, 

73 "sep": 9, 

74 "oct": 10, 

75 "nov": 11, 

76 "dec": 12, 

77} 

78DOW_ALPHAS: dict[str, Union[int, str]] = { 

79 "sun": 0, 

80 "mon": 1, 

81 "tue": 2, 

82 "wed": 3, 

83 "thu": 4, 

84 "fri": 5, 

85 "sat": 6, 

86} 

87 

88MINUTE_FIELD = 0 

89HOUR_FIELD = 1 

90DAY_FIELD = 2 

91MONTH_FIELD = 3 

92DOW_FIELD = 4 

93SECOND_FIELD = 5 

94YEAR_FIELD = 6 

95 

96UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD) 

97SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD) 

98YEAR_FIELDS = ( 

99 MINUTE_FIELD, 

100 HOUR_FIELD, 

101 DAY_FIELD, 

102 MONTH_FIELD, 

103 DOW_FIELD, 

104 SECOND_FIELD, 

105 YEAR_FIELD, 

106) 

107 

108step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$") 

109only_int_re = re.compile(r"^\d+$") 

110 

111DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31) 

112WEEKDAYS = "|".join(DOW_ALPHAS.keys()) 

113MONTHS = "|".join(M_ALPHAS.keys()) 

114star_or_int_re = re.compile(r"^(\d+|\*)$") 

115special_dow_re = re.compile( 

116 rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)" 

117 rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$" 

118) 

119nearest_weekday_re = re.compile(r"^(?:(\d+)w|w(\d+))$") 

120re_star = re.compile("[*]") 

121hash_expression_re = re.compile( 

122 r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$" 

123) 

124 

125CRON_FIELDS = { 

126 "unix": UNIX_FIELDS, 

127 "second": SECOND_FIELDS, 

128 "year": YEAR_FIELDS, 

129 len(UNIX_FIELDS): UNIX_FIELDS, 

130 len(SECOND_FIELDS): SECOND_FIELDS, 

131 len(YEAR_FIELDS): YEAR_FIELDS, 

132} 

133UNIX_CRON_LEN = len(UNIX_FIELDS) 

134SECOND_CRON_LEN = len(SECOND_FIELDS) 

135YEAR_CRON_LEN = len(YEAR_FIELDS) 

136# retrocompat 

137VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)} 

138 

139MARKER = object() 

140 

141 

142def datetime_to_timestamp(d): 

143 if d.tzinfo is not None: 

144 d = d.replace(tzinfo=None) - d.utcoffset() 

145 

146 return (d - datetime.datetime(1970, 1, 1)).total_seconds() 

147 

148 

149def _is_leap(year: int) -> bool: 

150 return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0) 

151 

152 

153def _last_day_of_month(year: int, month: int) -> int: 

154 """Calculate the last day of the given month (honor leap years).""" 

155 last_day = DAYS[month - 1] 

156 if month == 2 and _is_leap(year): 

157 last_day += 1 

158 return last_day 

159 

160 

161def _is_successor( 

162 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool 

163) -> bool: 

164 """Check if the given date is a successor (after/before) of the previous date.""" 

165 if is_prev: 

166 return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT) 

167 return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT) 

168 

169 

170def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta: 

171 """Calculate the timezone difference of the given dates.""" 

172 offset1 = date1.utcoffset() 

173 offset2 = date2.utcoffset() 

174 assert offset1 is not None 

175 assert offset2 is not None 

176 return offset2 - offset1 

177 

178 

179def _add_tzinfo( 

180 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool 

181) -> tuple[datetime.datetime, bool]: 

182 """Add the tzinfo from the previous date to the given date. 

183 

184 In case the new date is ambiguous, determine the correct date 

185 based on it being closer to the previous date but still a successor 

186 (after/before based on `is_prev`). 

187 

188 In case the date does not exist, jump forward to the next existing date. 

189 """ 

190 localize = getattr(previous_date.tzinfo, "localize", None) 

191 if localize is not None: 

192 # pylint: disable-next=import-outside-toplevel 

193 import pytz 

194 

195 try: 

196 result = localize(date, is_dst=None) 

197 except pytz.NonExistentTimeError: 

198 while True: 

199 date += datetime.timedelta(minutes=1) 

200 try: 

201 result = localize(date, is_dst=None) 

202 except pytz.NonExistentTimeError: 

203 continue 

204 break 

205 return result, False 

206 except pytz.AmbiguousTimeError: 

207 closer = localize(date, is_dst=not is_prev) 

208 farther = localize(date, is_dst=is_prev) 

209 # TODO: Check negative DST 

210 assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev 

211 if _is_successor(closer, previous_date, is_prev): 

212 result = closer 

213 else: 

214 assert _is_successor(farther, previous_date, is_prev) 

215 result = farther 

216 return result, True 

217 

218 result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo) 

219 if not datetime_exists(result): 

220 while not datetime_exists(result): 

221 result += datetime.timedelta(minutes=1) 

222 return result, False 

223 

224 # result is closer to the previous date 

225 farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo) 

226 # Comparing the UTC offsets in the check for the date being ambiguous. 

227 if result.utcoffset() != farther.utcoffset(): 

228 # TODO: Check negative DST 

229 assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev 

230 if not _is_successor(result, previous_date, is_prev): 

231 assert _is_successor(farther, previous_date, is_prev) 

232 result = farther 

233 return result, True 

234 

235 

236class CroniterError(ValueError): 

237 """General top-level Croniter base exception""" 

238 

239 

240class CroniterBadTypeRangeError(TypeError): 

241 """.""" 

242 

243 

244class CroniterBadCronError(CroniterError): 

245 """Syntax, unknown value, or range error within a cron expression""" 

246 

247 

248class CroniterUnsupportedSyntaxError(CroniterBadCronError): 

249 """Valid cron syntax, but likely to produce inaccurate results""" 

250 

251 # Extending CroniterBadCronError, which may be contridatory, but this allows 

252 # catching both errors with a single exception. From a user perspective 

253 # these will likely be handled the same way. 

254 

255 

256class CroniterBadDateError(CroniterError): 

257 """Unable to find next/prev timestamp match""" 

258 

259 

260class CroniterNotAlphaError(CroniterBadCronError): 

261 """Cron syntax contains an invalid day or month abbreviation""" 

262 

263 

264class croniter: 

265 MONTHS_IN_YEAR = 12 

266 

267 # This helps with expanding `*` fields into `lower-upper` ranges. Each item 

268 # in this tuple maps to the corresponding field index 

269 RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099)) 

270 

271 ALPHACONV: tuple[dict[str, Union[int, str]], ...] = ( 

272 {}, # 0: min 

273 {}, # 1: hour 

274 {"l": "l"}, # 2: dom 

275 # 3: mon 

276 copy.deepcopy(M_ALPHAS), 

277 # 4: dow 

278 copy.deepcopy(DOW_ALPHAS), 

279 # 5: second 

280 {}, 

281 # 6: year 

282 {}, 

283 ) 

284 

285 LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {}) 

286 

287 LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130) 

288 

289 def __init__( 

290 self, 

291 expr_format: str, 

292 start_time: Optional[Union[datetime.datetime, float]] = None, 

293 ret_type: type = float, 

294 day_or: bool = True, 

295 max_years_between_matches: Optional[int] = None, 

296 is_prev: bool = False, 

297 hash_id: Optional[Union[bytes, str]] = None, 

298 implement_cron_bug: bool = False, 

299 second_at_beginning: bool = False, 

300 expand_from_start_time: bool = False, 

301 ) -> None: 

302 self._ret_type = ret_type 

303 self._day_or = day_or 

304 self._implement_cron_bug = implement_cron_bug 

305 self.second_at_beginning = bool(second_at_beginning) 

306 self._expand_from_start_time = expand_from_start_time 

307 

308 if hash_id is not None: 

309 if not isinstance(hash_id, (bytes, str)): 

310 raise TypeError("hash_id must be bytes or UTF-8 string") 

311 if not isinstance(hash_id, bytes): 

312 hash_id = hash_id.encode("UTF-8") 

313 

314 self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None 

315 if max_years_between_matches is None: 

316 max_years_between_matches = 50 

317 self._max_years_between_matches = max(int(max_years_between_matches), 1) 

318 

319 if start_time is None: 

320 start_time = time() 

321 

322 self.tzinfo: Optional[datetime.tzinfo] = None 

323 

324 self.start_time = 0.0 

325 self.dst_start_time = 0.0 

326 self.cur = 0.0 

327 self.set_current(start_time, force=True) 

328 

329 self.expanded, self.nth_weekday_of_month, self.expressions, self.nearest_weekday = self._expand( 

330 expr_format, 

331 hash_id=hash_id, 

332 from_timestamp=self.dst_start_time if self._expand_from_start_time else None, 

333 second_at_beginning=second_at_beginning, 

334 ) 

335 self.fields = CRON_FIELDS[len(self.expanded)] 

336 self._is_prev = is_prev 

337 

338 @classmethod 

339 def _alphaconv(cls, index, key, expressions): 

340 try: 

341 return cls.ALPHACONV[index][key] 

342 except KeyError: 

343 raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable") 

344 

345 def get_next(self, ret_type=None, start_time=None, update_current=True): 

346 if start_time and self._expand_from_start_time: 

347 raise ValueError( 

348 "start_time is not supported when using expand_from_start_time = True." 

349 ) 

350 return self._get_next( 

351 ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current 

352 ) 

353 

354 def get_prev(self, ret_type=None, start_time=None, update_current=True): 

355 return self._get_next( 

356 ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current 

357 ) 

358 

359 def get_current(self, ret_type=None): 

360 ret_type = ret_type or self._ret_type 

361 if issubclass(ret_type, datetime.datetime): 

362 return self.timestamp_to_datetime(self.cur) 

363 return self.cur 

364 

365 def set_current( 

366 self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True 

367 ) -> float: 

368 if (force or (self.cur is None)) and start_time is not None: 

369 if isinstance(start_time, datetime.datetime): 

370 self.tzinfo = start_time.tzinfo 

371 start_time = self.datetime_to_timestamp(start_time) 

372 

373 self.start_time = start_time 

374 self.dst_start_time = start_time 

375 self.cur = start_time 

376 return self.cur 

377 

378 @staticmethod 

379 def datetime_to_timestamp(d: datetime.datetime) -> float: 

380 """ 

381 Converts a `datetime` object `d` into a UNIX timestamp. 

382 """ 

383 return datetime_to_timestamp(d) 

384 

385 _datetime_to_timestamp = datetime_to_timestamp # retrocompat 

386 

387 def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime: 

388 """ 

389 Converts a UNIX `timestamp` into a `datetime` object. 

390 """ 

391 if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set 

392 tzinfo = self.tzinfo 

393 if OVERFLOW32B_MODE: 

394 # degraded mode to workaround Y2038 

395 # see https://github.com/python/cpython/issues/101069 

396 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp) 

397 else: 

398 result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None) 

399 if tzinfo: 

400 result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo) 

401 return result 

402 

403 _timestamp_to_datetime = timestamp_to_datetime # retrocompat 

404 

405 def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None): 

406 if update_current is None: 

407 update_current = True 

408 self.set_current(start_time, force=True) 

409 if is_prev is None: 

410 is_prev = self._is_prev 

411 self._is_prev = is_prev 

412 

413 ret_type = ret_type or self._ret_type 

414 

415 if not issubclass(ret_type, (float, datetime.datetime)): 

416 raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.") 

417 

418 result = self._calc_next(is_prev) 

419 timestamp = self.datetime_to_timestamp(result) 

420 if update_current: 

421 self.cur = timestamp 

422 if issubclass(ret_type, datetime.datetime): 

423 return result 

424 return timestamp 

425 

426 # iterator protocol, to enable direct use of croniter 

427 # objects in a loop, like "for dt in croniter("5 0 * * *'): ..." 

428 # or for combining multiple croniters into single 

429 # dates feed using 'itertools' module 

430 def all_next(self, ret_type=None, start_time=None, update_current=None): 

431 """ 

432 Returns a generator yielding consecutive dates. 

433 

434 May be used instead of an implicit call to __iter__ whenever a 

435 non-default `ret_type` needs to be specified. 

436 """ 

437 # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could 

438 # be used instead 

439 try: 

440 while True: 

441 self._is_prev = False 

442 yield self._get_next( 

443 ret_type=ret_type, start_time=start_time, update_current=update_current 

444 ) 

445 start_time = None 

446 except CroniterBadDateError: 

447 if self._max_years_btw_matches_explicitly_set: 

448 return 

449 raise 

450 

451 def all_prev(self, ret_type=None, start_time=None, update_current=None): 

452 """ 

453 Returns a generator yielding previous dates. 

454 """ 

455 try: 

456 while True: 

457 self._is_prev = True 

458 yield self._get_next( 

459 ret_type=ret_type, start_time=start_time, update_current=update_current 

460 ) 

461 start_time = None 

462 except CroniterBadDateError: 

463 if self._max_years_btw_matches_explicitly_set: 

464 return 

465 raise 

466 

467 def iter(self, *args, **kwargs): 

468 return self.all_prev if self._is_prev else self.all_next 

469 

470 def __iter__(self): 

471 return self 

472 

473 __next__ = next = _get_next 

474 

475 def _calc_next(self, is_prev: bool) -> datetime.datetime: 

476 current = self.timestamp_to_datetime(self.cur) 

477 expanded = self.expanded[:] 

478 nth_weekday_of_month = self.nth_weekday_of_month.copy() 

479 

480 # exception to support day of month and day of week as defined in cron 

481 if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or: 

482 # If requested, handle a bug in vixie cron/ISC cron where day_of_month and 

483 # day_of_week form an intersection (AND) instead of a union (OR) if either 

484 # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html) 

485 if self._implement_cron_bug and ( 

486 re_star.match(self.expressions[DAY_FIELD]) 

487 or re_star.match(self.expressions[DOW_FIELD]) 

488 ): 

489 # To produce a schedule identical to the cron bug, we'll bypass the code 

490 # that makes a union of DOM and DOW, and instead skip to the code that 

491 # does an intersect instead 

492 pass 

493 else: 

494 bak = expanded[DOW_FIELD] 

495 expanded[DOW_FIELD] = ["*"] 

496 t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev) 

497 expanded[DOW_FIELD] = bak 

498 expanded[DAY_FIELD] = ["*"] 

499 

500 t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev) 

501 if is_prev: 

502 return t1 if t1 > t2 else t2 

503 return t1 if t1 < t2 else t2 

504 

505 return self._calc(current, expanded, nth_weekday_of_month, is_prev) 

506 

507 def _calc( 

508 self, 

509 now: datetime.datetime, 

510 expanded: list[ExpandedExpression], 

511 nth_weekday_of_month: dict[int, set[int]], 

512 is_prev: bool, 

513 ) -> datetime.datetime: 

514 if is_prev: 

515 nearest_diff_method = self._get_prev_nearest_diff 

516 offset = relativedelta(microseconds=-1) 

517 else: 

518 nearest_diff_method = self._get_next_nearest_diff 

519 if len(expanded) > UNIX_CRON_LEN: 

520 offset = relativedelta(seconds=1) 

521 else: 

522 offset = relativedelta(minutes=1) 

523 # Calculate the next cron time in local time a.k.a. timezone unaware time. 

524 unaware_time = now.replace(tzinfo=None) + offset 

525 if len(expanded) > UNIX_CRON_LEN: 

526 unaware_time = unaware_time.replace(microsecond=0) 

527 else: 

528 unaware_time = unaware_time.replace(second=0, microsecond=0) 

529 

530 month = unaware_time.month 

531 year = current_year = unaware_time.year 

532 

533 def proc_year(d): 

534 if len(expanded) == YEAR_CRON_LEN: 

535 try: 

536 expanded[YEAR_FIELD].index("*") 

537 except ValueError: 

538 # use None as range_val to indicate no loop 

539 diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None) 

540 if diff_year is None: 

541 return None, d 

542 if diff_year != 0: 

543 if is_prev: 

544 d += relativedelta( 

545 years=diff_year, month=12, day=31, hour=23, minute=59, second=59 

546 ) 

547 else: 

548 d += relativedelta( 

549 years=diff_year, month=1, day=1, hour=0, minute=0, second=0 

550 ) 

551 return True, d 

552 return False, d 

553 

554 def proc_month(d): 

555 try: 

556 expanded[MONTH_FIELD].index("*") 

557 except ValueError: 

558 diff_month = nearest_diff_method( 

559 d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR 

560 ) 

561 reset_day = 1 

562 

563 if diff_month is not None and diff_month != 0: 

564 if is_prev: 

565 d += relativedelta(months=diff_month) 

566 reset_day = _last_day_of_month(d.year, d.month) 

567 d += relativedelta(day=reset_day, hour=23, minute=59, second=59) 

568 else: 

569 d += relativedelta( 

570 months=diff_month, day=reset_day, hour=0, minute=0, second=0 

571 ) 

572 return True, d 

573 return False, d 

574 

575 def proc_day_of_month(d): 

576 try: 

577 expanded[DAY_FIELD].index("*") 

578 except ValueError: 

579 days = _last_day_of_month(year, month) 

580 if "l" in expanded[DAY_FIELD] and days == d.day: 

581 return False, d 

582 

583 if is_prev: 

584 prev_month = (month - 2) % self.MONTHS_IN_YEAR + 1 

585 prev_year = year - 1 if month == 1 else year 

586 days_in_prev_month = _last_day_of_month(prev_year, prev_month) 

587 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month) 

588 else: 

589 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days) 

590 

591 if diff_day is not None and diff_day != 0: 

592 if is_prev: 

593 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

594 else: 

595 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

596 return True, d 

597 return False, d 

598 

599 def proc_day_of_week(d): 

600 try: 

601 expanded[DOW_FIELD].index("*") 

602 except ValueError: 

603 diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7) 

604 if diff_day_of_week is not None and diff_day_of_week != 0: 

605 if is_prev: 

606 d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59) 

607 else: 

608 d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0) 

609 return True, d 

610 return False, d 

611 

612 def proc_day_of_week_nth(d): 

613 if "*" in nth_weekday_of_month: 

614 s = nth_weekday_of_month["*"] 

615 for i in range(0, 7): 

616 if i in nth_weekday_of_month: 

617 nth_weekday_of_month[i].update(s) 

618 else: 

619 nth_weekday_of_month[i] = s 

620 del nth_weekday_of_month["*"] 

621 

622 candidates = [] 

623 for wday, nth in nth_weekday_of_month.items(): 

624 c = self._get_nth_weekday_of_month(d.year, d.month, wday) 

625 for n in nth: 

626 if n == "l": 

627 candidate = c[-1] 

628 elif len(c) < n: 

629 continue 

630 else: 

631 candidate = c[n - 1] 

632 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate): 

633 candidates.append(candidate) 

634 

635 if not candidates: 

636 if is_prev: 

637 d += relativedelta(days=-d.day, hour=23, minute=59, second=59) 

638 else: 

639 days = _last_day_of_month(year, month) 

640 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0) 

641 return True, d 

642 

643 candidates.sort() 

644 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day 

645 if diff_day != 0: 

646 if is_prev: 

647 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

648 else: 

649 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

650 return True, d 

651 return False, d 

652 

653 def proc_nearest_weekday(d): 

654 """Process W (nearest weekday) day-of-month entries.""" 

655 candidates = [] 

656 for w_day in self.nearest_weekday: 

657 candidate = self._get_nearest_weekday(d.year, d.month, w_day) 

658 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate): 

659 candidates.append(candidate) 

660 

661 if not candidates: 

662 if is_prev: 

663 d += relativedelta(days=-d.day, hour=23, minute=59, second=59) 

664 else: 

665 days = _last_day_of_month(year, month) 

666 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0) 

667 return True, d 

668 

669 candidates.sort() 

670 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day 

671 if diff_day != 0: 

672 if is_prev: 

673 d += relativedelta(days=diff_day, hour=23, minute=59, second=59) 

674 else: 

675 d += relativedelta(days=diff_day, hour=0, minute=0, second=0) 

676 return True, d 

677 return False, d 

678 

679 def proc_hour(d): 

680 try: 

681 expanded[HOUR_FIELD].index("*") 

682 except ValueError: 

683 diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24) 

684 if diff_hour is not None and diff_hour != 0: 

685 if is_prev: 

686 d += relativedelta(hours=diff_hour, minute=59, second=59) 

687 else: 

688 d += relativedelta(hours=diff_hour, minute=0, second=0) 

689 return True, d 

690 return False, d 

691 

692 def proc_minute(d): 

693 try: 

694 expanded[MINUTE_FIELD].index("*") 

695 except ValueError: 

696 diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60) 

697 if diff_min is not None and diff_min != 0: 

698 if is_prev: 

699 d += relativedelta(minutes=diff_min, second=59) 

700 else: 

701 d += relativedelta(minutes=diff_min, second=0) 

702 return True, d 

703 return False, d 

704 

705 def proc_second(d): 

706 if len(expanded) > UNIX_CRON_LEN: 

707 try: 

708 expanded[SECOND_FIELD].index("*") 

709 except ValueError: 

710 diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60) 

711 if diff_sec is not None and diff_sec != 0: 

712 d += relativedelta(seconds=diff_sec) 

713 return True, d 

714 else: 

715 d += relativedelta(second=0) 

716 return False, d 

717 

718 procs = [ 

719 proc_year, 

720 proc_month, 

721 (proc_nearest_weekday if self.nearest_weekday else proc_day_of_month), 

722 (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week), 

723 proc_hour, 

724 proc_minute, 

725 proc_second, 

726 ] 

727 

728 while abs(year - current_year) <= self._max_years_between_matches: 

729 next = False 

730 stop = False 

731 for proc in procs: 

732 (changed, unaware_time) = proc(unaware_time) 

733 # `None` can be set mostly for year processing 

734 # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff 

735 if changed is None: 

736 stop = True 

737 break 

738 if changed: 

739 month, year = unaware_time.month, unaware_time.year 

740 next = True 

741 break 

742 if stop: 

743 break 

744 if next: 

745 continue 

746 

747 unaware_time = unaware_time.replace(microsecond=0) 

748 if now.tzinfo is None: 

749 return unaware_time 

750 

751 # Add timezone information back and handle DST changes 

752 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev) 

753 

754 if not exists and ( 

755 not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD] 

756 ): 

757 # The calculated local date does not exist and moving the time forward 

758 # to the next valid time isn't the correct solution. Search for the 

759 # next matching cron time that exists. 

760 while not exists: 

761 unaware_time = self._calc( 

762 unaware_time, expanded, nth_weekday_of_month, is_prev 

763 ) 

764 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev) 

765 

766 offset_delta = _timezone_delta(now, aware_time) 

767 if not offset_delta: 

768 # There was no DST change. 

769 return aware_time 

770 

771 # There was a DST change. So check if there is a alternative cron time 

772 # for the other UTC offset. 

773 alternative_unaware_time = now.replace(tzinfo=None) + offset_delta 

774 alternative_unaware_time = self._calc( 

775 alternative_unaware_time, expanded, nth_weekday_of_month, is_prev 

776 ) 

777 alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev) 

778 

779 if not _is_successor(alternative_aware_time, now, is_prev): 

780 # The alternative time is an ancestor of now. Thus it is not an alternative. 

781 return aware_time 

782 

783 if _is_successor(aware_time, alternative_aware_time, is_prev): 

784 return alternative_aware_time 

785 

786 return aware_time 

787 

788 if is_prev: 

789 raise CroniterBadDateError("failed to find prev date") 

790 raise CroniterBadDateError("failed to find next date") 

791 

792 @staticmethod 

793 def _get_next_nearest_diff(x, to_check, range_val): 

794 """ 

795 `range_val` is the range of a field. 

796 If no available time, we can move to next loop(like next month). 

797 `range_val` can also be set to `None` to indicate that there is no loop. 

798 ( Currently, should only used for `year` field ) 

799 """ 

800 for i, d in enumerate(to_check): 

801 if range_val is not None: 

802 if d == "l": 

803 # if 'l' then it is the last day of month 

804 # => its value of range_val 

805 d = range_val 

806 elif d > range_val: 

807 continue 

808 if d >= x: 

809 return d - x 

810 # When range_val is None and x not exists in to_check, 

811 # `None` will be returned to suggest no more available time 

812 if range_val is None: 

813 return None 

814 return to_check[0] - x + range_val 

815 

816 @staticmethod 

817 def _get_prev_nearest_diff(x, to_check, range_val): 

818 """ 

819 `range_val` is the range of a field. 

820 If no available time, we can move to previous loop(like previous month). 

821 Range_val can also be set to `None` to indicate that there is no loop. 

822 ( Currently should only used for `year` field ) 

823 """ 

824 candidates = to_check[:] 

825 candidates.reverse() 

826 for d in candidates: 

827 if d != "l" and d <= x: 

828 return d - x 

829 if "l" in candidates: 

830 return -x 

831 # When range_val is None and x not exists in to_check, 

832 # `None` will be returned to suggest no more available time 

833 if range_val is None: 

834 return None 

835 candidate = candidates[0] 

836 for c in candidates: 

837 # fixed: c < range_val 

838 # this code will reject all 31 day of month, 12 month, 59 second, 

839 # 23 hour and so on. 

840 # if candidates has just a element, this will not harmful. 

841 # but candidates have multiple elements, then values equal to 

842 # range_val will rejected. 

843 if c <= range_val: 

844 candidate = c 

845 break 

846 # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error 

847 # return 2021-03-02 06:00:00 

848 if candidate > range_val: 

849 return -range_val 

850 return candidate - x - range_val 

851 

852 @staticmethod 

853 def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]: 

854 """For a given year/month return a list of days in nth-day-of-month order. 

855 The last weekday of the month is always [-1]. 

856 """ 

857 w = (day_of_week + 6) % 7 

858 c = calendar.Calendar(w).monthdayscalendar(year, month) 

859 if c[0][0] == 0: 

860 c.pop(0) 

861 return tuple(i[0] for i in c) 

862 

863 @staticmethod 

864 def _get_nearest_weekday(year, month, day): 

865 """Get the nearest weekday (Mon-Fri) to the given day in the given month. 

866 

867 Rules: 

868 - If the day is a weekday, return it. 

869 - If Saturday, return Friday (day-1), unless that crosses into previous month, 

870 then return Monday (day+2). 

871 - If Sunday, return Monday (day+1), unless that crosses into next month, 

872 then return Friday (day-2). 

873 """ 

874 last_day = _last_day_of_month(year, month) 

875 day = min(day, last_day) 

876 weekday = calendar.weekday(year, month, day) # 0=Mon, 6=Sun 

877 if weekday < 5: # Mon-Fri 

878 return day 

879 if weekday == 5: # Saturday 

880 if day > 1: 

881 return day - 1 # Friday 

882 else: 

883 return day + 2 # Monday (1st is Sat, so 3rd is Mon) 

884 # Sunday 

885 if day < last_day: 

886 return day + 1 # Monday 

887 else: 

888 return day - 2 # Friday (last day is Sun, go back to Fri) 

889 

890 @classmethod 

891 def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN): 

892 if isinstance(len_expressions, (list, dict, tuple, set)): 

893 len_expressions = len(len_expressions) 

894 if val in cls.LOWMAP[field_index] and not ( 

895 # do not support 0 as a month either for classical 5 fields cron, 

896 # 6fields second repeat form or 7 fields year form 

897 # but still let conversion happen if day field is shifted 

898 (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN) 

899 or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN) 

900 or ( 

901 field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD] 

902 and len_expressions == YEAR_CRON_LEN 

903 ) 

904 ): 

905 val = cls.LOWMAP[field_index][val] 

906 return val 

907 

908 # Maximum days in each month (non-leap year for Feb) 

909 DAYS_IN_MONTH = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31} 

910 

911 @classmethod 

912 def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None, strict=False, strict_year=None): 

913 # Split the expression in components, and normalize L -> l, MON -> mon, 

914 # etc. Keep expr_format untouched so we can use it in the exception 

915 # messages. 

916 expr_aliases = { 

917 "@midnight": ("0 0 * * *", "h h(0-2) * * * h"), 

918 "@hourly": ("0 * * * *", "h * * * * h"), 

919 "@daily": ("0 0 * * *", "h h * * * h"), 

920 "@weekly": ("0 0 * * 0", "h h * * h h"), 

921 "@monthly": ("0 0 1 * *", "h h h * * h"), 

922 "@yearly": ("0 0 1 1 *", "h h h h * h"), 

923 "@annually": ("0 0 1 1 *", "h h h h * h"), 

924 } 

925 

926 efl = expr_format.lower() 

927 hash_id_expr = 1 if hash_id is not None else 0 

928 try: 

929 efl = expr_aliases[efl][hash_id_expr] 

930 except KeyError: 

931 pass 

932 

933 expressions = efl.split() 

934 

935 if len(expressions) not in VALID_LEN_EXPRESSION: 

936 raise CroniterBadCronError( 

937 "Exactly 5, 6 or 7 columns has to be specified for iterator expression." 

938 ) 

939 

940 if len(expressions) > UNIX_CRON_LEN and second_at_beginning: 

941 # move second to it's own(6th) field to process by same logical 

942 expressions.insert(SECOND_FIELD, expressions.pop(0)) 

943 

944 expanded = [] 

945 nth_weekday_of_month = {} 

946 nearest_weekday = set() 

947 

948 for field_index, expr in enumerate(expressions): 

949 for expanderid, expander in EXPANDERS.items(): 

950 expr = expander(cls).expand( 

951 efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp 

952 ) 

953 

954 if "?" in expr: 

955 if expr != "?": 

956 raise CroniterBadCronError( 

957 f"[{expr_format}] is not acceptable." 

958 f" Question mark can not used with other characters" 

959 ) 

960 if field_index not in [DAY_FIELD, DOW_FIELD]: 

961 raise CroniterBadCronError( 

962 f"[{expr_format}] is not acceptable. " 

963 f"Question mark can only used in day_of_month or day_of_week" 

964 ) 

965 # currently just trade `?` as `*` 

966 expr = "*" 

967 

968 e_list = expr.split(",") 

969 res = [] 

970 seen = set() 

971 

972 while len(e_list) > 0: 

973 e = e_list.pop() 

974 nth = None 

975 

976 if field_index == DOW_FIELD: 

977 # Handle special case in the dow expression: 2#3, l3 

978 special_dow_rem = special_dow_re.match(str(e)) 

979 if special_dow_rem: 

980 g = special_dow_rem.groupdict() 

981 he, last = g.get("he", ""), g.get("last", "") 

982 if he: 

983 e = he 

984 try: 

985 nth = int(last) 

986 assert 5 >= nth >= 1 

987 except (KeyError, ValueError, AssertionError): 

988 raise CroniterBadCronError( 

989 f"[{expr_format}] is not acceptable." 

990 f" Invalid day_of_week value: '{nth}'" 

991 ) 

992 elif last: 

993 e = last 

994 nth = g["pre"] # 'l' 

995 

996 if field_index == DAY_FIELD: 

997 # Handle W (nearest weekday) in day-of-month: 15w, w15 

998 w_match = nearest_weekday_re.match(str(e)) 

999 if w_match: 

1000 w_day = int(w_match.group(1) or w_match.group(2)) 

1001 if w_day < 1 or w_day > 31: 

1002 raise CroniterBadCronError( 

1003 f"[{expr_format}] is not acceptable," 

1004 f" nearest weekday day value '{w_day}' out of range" 

1005 ) 

1006 if len(e_list) > 0 or len(res) > 0: 

1007 raise CroniterBadCronError( 

1008 f"[{expr_format}] is not acceptable." 

1009 f" 'W' can only be used with a single day value," 

1010 f" not in a list or range" 

1011 ) 

1012 nearest_weekday.add(w_day) 

1013 res.append(w_day) 

1014 continue 

1015 

1016 # Before matching step_search_re, normalize "*" to "{min}-{max}". 

1017 # Example: in the minute field, "*/5" normalizes to "0-59/5" 

1018 t = re.sub( 

1019 r"^\*(\/.+)$", 

1020 r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]), 

1021 str(e), 

1022 ) 

1023 m = step_search_re.search(t) 

1024 

1025 if not m: 

1026 # Before matching step_search_re, 

1027 # normalize "{start}/{step}" to "{start}-{max}/{step}". 

1028 # Example: in the minute field, "10/5" normalizes to "10-59/5" 

1029 t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e)) 

1030 m = step_search_re.search(t) 

1031 

1032 if m: 

1033 # early abort if low/high are out of bounds 

1034 (low, high, step) = m.group(1), m.group(2), m.group(4) or 1 

1035 if field_index == DAY_FIELD and high == "l": 

1036 high = "31" 

1037 

1038 if not only_int_re.search(low): 

1039 low = str(cls._alphaconv(field_index, low, expressions)) 

1040 

1041 if not only_int_re.search(high): 

1042 high = str(cls._alphaconv(field_index, high, expressions)) 

1043 

1044 # normally, it's already guarded by the RE that should not accept 

1045 # not-int values. 

1046 if not only_int_re.search(str(step)): 

1047 raise CroniterBadCronError( 

1048 f"[{expr_format}] step '{step}'" 

1049 f" in field {field_index} is not acceptable" 

1050 ) 

1051 step = int(step) 

1052 

1053 for band in low, high: 

1054 if not only_int_re.search(str(band)): 

1055 raise CroniterBadCronError( 

1056 f"[{expr_format}] bands '{low}-{high}'" 

1057 f" in field {field_index} are not acceptable" 

1058 ) 

1059 

1060 low, high = ( 

1061 cls.value_alias(int(_val), field_index, expressions) 

1062 for _val in (low, high) 

1063 ) 

1064 

1065 if max(low, high) > max( 

1066 cls.RANGES[field_index][0], cls.RANGES[field_index][1] 

1067 ): 

1068 raise CroniterBadCronError(f"{expr_format} is out of bands") 

1069 

1070 if from_timestamp: 

1071 low = cls._get_low_from_current_date_number( 

1072 field_index, int(step), int(from_timestamp) 

1073 ) 

1074 

1075 # Handle when the second bound of the range is in backtracking order: 

1076 # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH 

1077 if low > high: 

1078 whole_field_range = list( 

1079 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1) 

1080 ) 

1081 # Add FirstBound -> ENDRANGE, respecting step 

1082 rng = list(range(low, cls.RANGES[field_index][1] + 1, step)) 

1083 # Then 0 -> SecondBound, but skipping n first occurences according to step 

1084 # EG to respect such expressions : Apr-Jan/3 

1085 to_skip = 0 

1086 if rng: 

1087 already_skipped = list(reversed(whole_field_range)).index(rng[-1]) 

1088 curpos = whole_field_range.index(rng[-1]) 

1089 if ((curpos + step) > len(whole_field_range)) and ( 

1090 already_skipped < step 

1091 ): 

1092 to_skip = step - already_skipped 

1093 rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step)) 

1094 # if we include a range type: Jan-Jan, or Sun-Sun, 

1095 # it means the whole cycle (all days of week, # all monthes of year, etc) 

1096 elif low == high: 

1097 rng = list( 

1098 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step) 

1099 ) 

1100 else: 

1101 try: 

1102 rng = list(range(low, high + 1, step)) 

1103 except ValueError as exc: 

1104 raise CroniterBadCronError(f"invalid range: {exc}") 

1105 

1106 if field_index == DOW_FIELD and nth and nth != "l": 

1107 rng = [f"{item}#{nth}" for item in rng] 

1108 e_list += [a for a in rng if a not in seen] 

1109 seen.update(rng) 

1110 else: 

1111 if t.startswith("-"): 

1112 raise CroniterBadCronError( 

1113 f"[{expr_format}] is not acceptable, negative numbers not allowed" 

1114 ) 

1115 if not star_or_int_re.search(t): 

1116 t = cls._alphaconv(field_index, t, expressions) 

1117 

1118 try: 

1119 t = int(t) 

1120 except ValueError: 

1121 pass 

1122 

1123 t = cls.value_alias(t, field_index, expressions) 

1124 

1125 if t not in ["*", "l"] and ( 

1126 int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1] 

1127 ): 

1128 raise CroniterBadCronError( 

1129 f"[{expr_format}] is not acceptable, out of range" 

1130 ) 

1131 

1132 res.append(t) 

1133 

1134 if field_index == DOW_FIELD and nth: 

1135 if t not in nth_weekday_of_month: 

1136 nth_weekday_of_month[t] = set() 

1137 nth_weekday_of_month[t].add(nth) 

1138 

1139 res = set(res) 

1140 res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i) 

1141 if len(res) == cls.LEN_MEANS_ALL[field_index]: 

1142 # Make sure the wildcard is used in the correct way (avoid over-optimization) 

1143 if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or ( 

1144 field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD] 

1145 ): 

1146 pass 

1147 else: 

1148 res = ["*"] 

1149 

1150 expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res) 

1151 

1152 # Check to make sure the dow combo in use is supported 

1153 if nth_weekday_of_month: 

1154 dow_expanded_set = set(expanded[DOW_FIELD]) 

1155 dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys()) 

1156 dow_expanded_set.discard("*") 

1157 # Skip: if it's all weeks instead of wildcard 

1158 if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]: 

1159 raise CroniterUnsupportedSyntaxError( 

1160 f"day-of-week field does not support mixing literal values and nth" 

1161 f" day of week syntax. Cron: '{expr_format}'" 

1162 f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}" 

1163 ) 

1164 

1165 if strict: 

1166 # Cross-validate day-of-month against month (and optionally year) 

1167 # to reject impossible combinations like "0 0 31 2 *" (Feb 31st). 

1168 days = expanded[DAY_FIELD] 

1169 months = expanded[MONTH_FIELD] 

1170 if days != ["*"] and days != ["l"] and months != ["*"]: 

1171 int_days = [d for d in days if isinstance(d, int)] 

1172 int_months = [m for m in months if isinstance(m, int)] 

1173 if int_days and int_months: 

1174 # Determine max days per month, accounting for leap years 

1175 days_in_month = dict(cls.DAYS_IN_MONTH) 

1176 if 2 in int_months: 

1177 has_leap_year = True # assume possible by default 

1178 if strict_year is not None: 

1179 # Year explicitly provided as parameter 

1180 if isinstance(strict_year, int): 

1181 has_leap_year = calendar.isleap(strict_year) 

1182 else: 

1183 has_leap_year = any(calendar.isleap(y) for y in strict_year) 

1184 elif len(expanded) > YEAR_FIELD: 

1185 years = expanded[YEAR_FIELD] 

1186 if years != ["*"]: 

1187 int_years = [y for y in years if isinstance(y, int)] 

1188 if int_years: 

1189 has_leap_year = any(calendar.isleap(y) for y in int_years) 

1190 if has_leap_year: 

1191 days_in_month[2] = 29 

1192 min_day = min(int_days) 

1193 max_possible = max(days_in_month[m] for m in int_months) 

1194 if min_day > max_possible: 

1195 raise CroniterBadCronError( 

1196 f"[{expr_format}] is not acceptable. Day(s) {int_days}" 

1197 f" can never occur in month(s) {int_months}" 

1198 ) 

1199 

1200 return expanded, nth_weekday_of_month, expressions, nearest_weekday 

1201 

1202 @classmethod 

1203 def expand( 

1204 cls, 

1205 expr_format: str, 

1206 hash_id: Optional[Union[bytes, str]] = None, 

1207 second_at_beginning: bool = False, 

1208 from_timestamp: Optional[float] = None, 

1209 strict: bool = False, 

1210 strict_year: Optional[Union[int, list[int]]] = None, 

1211 ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]: 

1212 """ 

1213 Expand a cron expression format into a noramlized format of 

1214 list[list[int | 'l' | '*']]. The first list representing each element 

1215 of the epxression, and each sub-list representing the allowed values 

1216 for that expression component. 

1217 

1218 A tuple is returned, the first value being the expanded epxression 

1219 list, and the second being a `nth_weekday_of_month` mapping. 

1220 

1221 Examples: 

1222 

1223 # Every minute 

1224 >>> croniter.expand('* * * * *') 

1225 ([['*'], ['*'], ['*'], ['*'], ['*']], {}) 

1226 

1227 # On the hour 

1228 >>> croniter.expand('0 0 * * *') 

1229 ([[0], [0], ['*'], ['*'], ['*']], {}) 

1230 

1231 # Hours 0-5 and 10 monday through friday 

1232 >>> croniter.expand('0-5,10 * * * mon-fri') 

1233 ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {}) 

1234 

1235 Note that some special values such as nth day of week are expanded to a 

1236 special mapping format for later processing: 

1237 

1238 # Every minute on the 3rd tuesday of the month 

1239 >>> croniter.expand('* * * * 2#3') 

1240 ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}}) 

1241 

1242 # Every hour on the last day of the month 

1243 >>> croniter.expand('0 * l * *') 

1244 ([[0], ['*'], ['l'], ['*'], ['*']], {}) 

1245 

1246 # On the hour every 15 seconds 

1247 >>> croniter.expand('0 0 * * * */15') 

1248 ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {}) 

1249 """ 

1250 try: 

1251 expanded, nth_weekday_of_month, _expressions, _nearest_weekday = cls._expand( 

1252 expr_format, 

1253 hash_id=hash_id, 

1254 second_at_beginning=second_at_beginning, 

1255 from_timestamp=from_timestamp, 

1256 strict=strict, 

1257 strict_year=strict_year, 

1258 ) 

1259 return expanded, nth_weekday_of_month 

1260 except (ValueError,) as exc: 

1261 if isinstance(exc, CroniterError): 

1262 raise 

1263 trace = _traceback.format_exc() 

1264 raise CroniterBadCronError(trace) 

1265 

1266 @classmethod 

1267 def _get_low_from_current_date_number(cls, field_index, step, from_timestamp): 

1268 dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT) 

1269 if field_index == MINUTE_FIELD: 

1270 return dt.minute % step 

1271 if field_index == HOUR_FIELD: 

1272 return dt.hour % step 

1273 if field_index == DAY_FIELD: 

1274 return ((dt.day - 1) % step) + 1 

1275 if field_index == MONTH_FIELD: 

1276 return dt.month % step 

1277 if field_index == DOW_FIELD: 

1278 return (dt.weekday() + 1) % step 

1279 

1280 raise ValueError("Can't get current date number for index larger than 4") 

1281 

1282 @classmethod 

1283 def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False, strict=False, strict_year=None): 

1284 if hash_id: 

1285 if not isinstance(hash_id, (bytes, str)): 

1286 raise TypeError("hash_id must be bytes or UTF-8 string") 

1287 if not isinstance(hash_id, bytes): 

1288 hash_id = hash_id.encode(encoding) 

1289 try: 

1290 cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning, strict=strict, strict_year=strict_year) 

1291 except CroniterError: 

1292 return False 

1293 return True 

1294 

1295 @classmethod 

1296 def match( 

1297 cls, 

1298 cron_expression, 

1299 testdate, 

1300 day_or=True, 

1301 second_at_beginning=False, 

1302 precision_in_seconds=None, 

1303 ): 

1304 return cls.match_range( 

1305 cron_expression, testdate, testdate, day_or, second_at_beginning, precision_in_seconds 

1306 ) 

1307 

1308 @classmethod 

1309 def match_range( 

1310 cls, 

1311 cron_expression, 

1312 from_datetime, 

1313 to_datetime, 

1314 day_or=True, 

1315 second_at_beginning=False, 

1316 precision_in_seconds=None, 

1317 ): 

1318 cron = cls( 

1319 cron_expression, 

1320 to_datetime, 

1321 ret_type=datetime.datetime, 

1322 day_or=day_or, 

1323 second_at_beginning=second_at_beginning, 

1324 ) 

1325 tdp = cron.get_current(datetime.datetime) 

1326 if not tdp.microsecond: 

1327 tdp += relativedelta(microseconds=1) 

1328 cron.set_current(tdp, force=True) 

1329 try: 

1330 tdt = cron.get_prev() 

1331 except CroniterBadDateError: 

1332 return False 

1333 if precision_in_seconds is None: 

1334 precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60 

1335 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds 

1336 return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second 

1337 

1338 

1339def croniter_range( 

1340 start, 

1341 stop, 

1342 expr_format, 

1343 ret_type=None, 

1344 day_or=True, 

1345 exclude_ends=False, 

1346 _croniter=None, 

1347 second_at_beginning=False, 

1348 expand_from_start_time=False, 

1349): 

1350 """ 

1351 Generator that provides all times from start to stop matching the given cron expression. 

1352 If the cron expression matches either 'start' and/or 'stop', those times will be returned as 

1353 well unless 'exclude_ends=True' is passed. 

1354 

1355 You can think of this function as sibling to the builtin range function for datetime objects. 

1356 Like range(start,stop,step), except that here 'step' is a cron expression. 

1357 """ 

1358 _croniter = _croniter or croniter 

1359 auto_rt = datetime.datetime 

1360 # type is used in first if branch for perfs reasons 

1361 if type(start) is not type(stop) and not ( 

1362 isinstance(start, type(stop)) or isinstance(stop, type(start)) 

1363 ): 

1364 raise CroniterBadTypeRangeError( 

1365 f"The start and stop must be same type. {type(start)} != {type(stop)}" 

1366 ) 

1367 if isinstance(start, (float, int)): 

1368 start, stop = ( 

1369 datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop) 

1370 ) 

1371 auto_rt = float 

1372 if ret_type is None: 

1373 ret_type = auto_rt 

1374 if not exclude_ends: 

1375 ms1 = relativedelta(microseconds=1) 

1376 if start < stop: # Forward (normal) time order 

1377 start -= ms1 

1378 stop += ms1 

1379 else: # Reverse time order 

1380 start += ms1 

1381 stop -= ms1 

1382 year_span = math.floor(abs(stop.year - start.year)) + 1 

1383 ic = _croniter( 

1384 expr_format, 

1385 start, 

1386 ret_type=datetime.datetime, 

1387 day_or=day_or, 

1388 max_years_between_matches=year_span, 

1389 second_at_beginning=second_at_beginning, 

1390 expand_from_start_time=expand_from_start_time, 

1391 ) 

1392 # define a continue (cont) condition function and step function for the main while loop 

1393 if start < stop: # Forward 

1394 

1395 def cont(v): 

1396 return v < stop 

1397 

1398 step = ic.get_next 

1399 else: # Reverse 

1400 

1401 def cont(v): 

1402 return v > stop 

1403 

1404 step = ic.get_prev 

1405 try: 

1406 dt = step() 

1407 while cont(dt): 

1408 if ret_type is float: 

1409 yield ic.get_current(float) 

1410 else: 

1411 yield dt 

1412 dt = step() 

1413 except CroniterBadDateError: 

1414 # Stop iteration when this exception is raised; no match found within the given year range 

1415 return 

1416 

1417 

1418class HashExpander: 

1419 def __init__(self, cronit): 

1420 self.cron = cronit 

1421 

1422 def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None): 

1423 """Return a hashed/random integer given range/hash information""" 

1424 if range_end is None: 

1425 range_end = self.cron.RANGES[idx][1] 

1426 if range_begin is None: 

1427 range_begin = self.cron.RANGES[idx][0] 

1428 if hash_type == "r": 

1429 crc = random.randint(0, 0xFFFFFFFF) 

1430 else: 

1431 crc = binascii.crc32(hash_id) & 0xFFFFFFFF 

1432 return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin 

1433 

1434 def match(self, efl, idx, expr, hash_id=None, **kw): 

1435 return hash_expression_re.match(expr) 

1436 

1437 def expand(self, efl, idx, expr, hash_id=None, match="", **kw): 

1438 """Expand a hashed/random expression to its normal representation""" 

1439 if match == "": 

1440 match = self.match(efl, idx, expr, hash_id, **kw) 

1441 if not match: 

1442 return expr 

1443 m = match.groupdict() 

1444 

1445 if m["hash_type"] == "h" and hash_id is None: 

1446 raise CroniterBadCronError("Hashed definitions must include hash_id") 

1447 

1448 if m["range_begin"] and m["range_end"]: 

1449 if int(m["range_begin"]) >= int(m["range_end"]): 

1450 raise CroniterBadCronError("Range end must be greater than range begin") 

1451 

1452 if m["range_begin"] and m["range_end"] and m["divisor"]: 

1453 # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54) 

1454 if int(m["divisor"]) == 0: 

1455 raise CroniterBadCronError(f"Bad expression: {expr}") 

1456 

1457 x = self.do( 

1458 idx, 

1459 hash_type=m["hash_type"], 

1460 hash_id=hash_id, 

1461 range_begin=int(m["range_begin"]), 

1462 range_end=int(m["divisor"]) - 1 + int(m["range_begin"]), 

1463 ) 

1464 return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}" 

1465 if m["range_begin"] and m["range_end"]: 

1466 # Example: H(0-29) -> 12 

1467 return str( 

1468 self.do( 

1469 idx, 

1470 hash_type=m["hash_type"], 

1471 hash_id=hash_id, 

1472 range_end=int(m["range_end"]), 

1473 range_begin=int(m["range_begin"]), 

1474 ) 

1475 ) 

1476 if m["divisor"]: 

1477 # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52) 

1478 if int(m["divisor"]) == 0: 

1479 raise CroniterBadCronError(f"Bad expression: {expr}") 

1480 

1481 x = self.do( 

1482 idx, 

1483 hash_type=m["hash_type"], 

1484 hash_id=hash_id, 

1485 range_begin=self.cron.RANGES[idx][0], 

1486 range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0], 

1487 ) 

1488 return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}" 

1489 

1490 # Example: H -> 32 

1491 return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id)) 

1492 

1493 

1494EXPANDERS = {"hash": HashExpander}