1#!/usr/bin/env python
2import binascii
3import calendar
4import copy
5import datetime
6import math
7import platform
8import random
9import re
10import struct
11import sys
12import traceback as _traceback
13from time import time
14from typing import Any, Literal, Optional, Union
15
16from dateutil.relativedelta import relativedelta
17from dateutil.tz import datetime_exists, tzutc
18
19ExpandedExpression = list[Union[int, Literal["*", "l"]]]
20
21
22def is_32bit() -> bool:
23 """
24 Detect if Python is running in 32-bit mode.
25 Returns True if running on 32-bit Python, False for 64-bit.
26 """
27 # Method 1: Check pointer size
28 bits = struct.calcsize("P") * 8
29
30 # Method 2: Check platform architecture string
31 try:
32 architecture = platform.architecture()[0]
33 except RuntimeError:
34 architecture = None
35
36 # Method 3: Check maxsize
37 is_small_maxsize = sys.maxsize <= 2**32
38
39 # Evaluate all available methods
40 is_32 = False
41
42 if bits == 32:
43 is_32 = True
44 elif architecture and "32" in architecture:
45 is_32 = True
46 elif is_small_maxsize:
47 is_32 = True
48
49 return is_32
50
51
52try:
53 # https://github.com/python/cpython/issues/101069 detection
54 if is_32bit():
55 datetime.datetime.fromtimestamp(3999999999)
56 OVERFLOW32B_MODE = False
57except OverflowError:
58 OVERFLOW32B_MODE = True
59
60
61UTC_DT = datetime.timezone.utc
62EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT)
63
64# fmt: off
65M_ALPHAS: dict[str, Union[int, str]] = {
66 "jan": 1, "feb": 2, "mar": 3, "apr": 4, # noqa: E241
67 "may": 5, "jun": 6, "jul": 7, "aug": 8, # noqa: E241
68 "sep": 9, "oct": 10, "nov": 11, "dec": 12,
69}
70DOW_ALPHAS: dict[str, Union[int, str]] = {
71 "sun": 0, "mon": 1, "tue": 2, "wed": 3, "thu": 4, "fri": 5, "sat": 6
72}
73# fmt: on
74
75MINUTE_FIELD = 0
76HOUR_FIELD = 1
77DAY_FIELD = 2
78MONTH_FIELD = 3
79DOW_FIELD = 4
80SECOND_FIELD = 5
81YEAR_FIELD = 6
82
83UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD)
84SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD)
85YEAR_FIELDS = (
86 MINUTE_FIELD,
87 HOUR_FIELD,
88 DAY_FIELD,
89 MONTH_FIELD,
90 DOW_FIELD,
91 SECOND_FIELD,
92 YEAR_FIELD,
93)
94
95step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$")
96only_int_re = re.compile(r"^\d+$")
97
98DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
99WEEKDAYS = "|".join(DOW_ALPHAS.keys())
100MONTHS = "|".join(M_ALPHAS.keys())
101star_or_int_re = re.compile(r"^(\d+|\*)$")
102special_dow_re = re.compile(
103 rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)"
104 rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$"
105)
106re_star = re.compile("[*]")
107hash_expression_re = re.compile(
108 r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$"
109)
110
111CRON_FIELDS = {
112 "unix": UNIX_FIELDS,
113 "second": SECOND_FIELDS,
114 "year": YEAR_FIELDS,
115 len(UNIX_FIELDS): UNIX_FIELDS,
116 len(SECOND_FIELDS): SECOND_FIELDS,
117 len(YEAR_FIELDS): YEAR_FIELDS,
118}
119UNIX_CRON_LEN = len(UNIX_FIELDS)
120SECOND_CRON_LEN = len(SECOND_FIELDS)
121YEAR_CRON_LEN = len(YEAR_FIELDS)
122# retrocompat
123VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)}
124TIMESTAMP_TO_DT_CACHE: dict[tuple[float, str], datetime.datetime] = {}
125EXPRESSIONS: dict[tuple[str, Optional[bytes], bool], list[str]] = {}
126MARKER = object()
127
128
129def datetime_to_timestamp(d):
130 if d.tzinfo is not None:
131 d = d.replace(tzinfo=None) - d.utcoffset()
132
133 return (d - datetime.datetime(1970, 1, 1)).total_seconds()
134
135
136def _is_leap(year: int) -> bool:
137 return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0)
138
139
140def _last_day_of_month(year: int, month: int) -> int:
141 """Calculate the last day of the given month (honor leap years)."""
142 last_day = DAYS[month - 1]
143 if month == 2 and _is_leap(year):
144 last_day += 1
145 return last_day
146
147
148def _is_successor(
149 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
150) -> bool:
151 """Check if the given date is a successor (after/before) of the previous date."""
152 if is_prev:
153 return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT)
154 return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT)
155
156
157def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta:
158 """Calculate the timezone difference of the given dates."""
159 offset1 = date1.utcoffset()
160 offset2 = date2.utcoffset()
161 assert offset1 is not None
162 assert offset2 is not None
163 return offset2 - offset1
164
165
166def _add_tzinfo(
167 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
168) -> tuple[datetime.datetime, bool]:
169 """Add the tzinfo from the previous date to the given date.
170
171 In case the new date is ambiguous, determine the correct date
172 based on it being closer to the previous date but still a successor
173 (after/before based on `is_prev`).
174
175 In case the date does not exist, jump forward to the next existing date.
176 """
177 localize = getattr(previous_date.tzinfo, "localize", None)
178 if localize is not None:
179 # pylint: disable-next=import-outside-toplevel
180 import pytz
181
182 try:
183 result = localize(date, is_dst=None)
184 except pytz.NonExistentTimeError:
185 while True:
186 date += datetime.timedelta(minutes=1)
187 try:
188 result = localize(date, is_dst=None)
189 except pytz.NonExistentTimeError:
190 continue
191 break
192 return result, False
193 except pytz.AmbiguousTimeError:
194 closer = localize(date, is_dst=not is_prev)
195 farther = localize(date, is_dst=is_prev)
196 # TODO: Check negative DST
197 assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
198 if _is_successor(closer, previous_date, is_prev):
199 result = closer
200 else:
201 assert _is_successor(farther, previous_date, is_prev)
202 result = farther
203 return result, True
204
205 result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo)
206 if not datetime_exists(result):
207 while not datetime_exists(result):
208 result += datetime.timedelta(minutes=1)
209 return result, False
210
211 # result is closer to the previous date
212 farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo)
213 # Comparing the UTC offsets in the check for the date being ambiguous.
214 if result.utcoffset() != farther.utcoffset():
215 # TODO: Check negative DST
216 assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
217 if not _is_successor(result, previous_date, is_prev):
218 assert _is_successor(farther, previous_date, is_prev)
219 result = farther
220 return result, True
221
222
223class CroniterError(ValueError):
224 """General top-level Croniter base exception"""
225
226
227class CroniterBadTypeRangeError(TypeError):
228 """."""
229
230
231class CroniterBadCronError(CroniterError):
232 """Syntax, unknown value, or range error within a cron expression"""
233
234
235class CroniterUnsupportedSyntaxError(CroniterBadCronError):
236 """Valid cron syntax, but likely to produce inaccurate results"""
237
238 # Extending CroniterBadCronError, which may be contridatory, but this allows
239 # catching both errors with a single exception. From a user perspective
240 # these will likely be handled the same way.
241
242
243class CroniterBadDateError(CroniterError):
244 """Unable to find next/prev timestamp match"""
245
246
247class CroniterNotAlphaError(CroniterBadCronError):
248 """Cron syntax contains an invalid day or month abbreviation"""
249
250
251class croniter:
252 MONTHS_IN_YEAR = 12
253
254 # This helps with expanding `*` fields into `lower-upper` ranges. Each item
255 # in this tuple maps to the corresponding field index
256 RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099))
257
258 ALPHACONV: tuple[dict[str, Union[int, str]], ...] = (
259 {}, # 0: min
260 {}, # 1: hour
261 {"l": "l"}, # 2: dom
262 # 3: mon
263 copy.deepcopy(M_ALPHAS),
264 # 4: dow
265 copy.deepcopy(DOW_ALPHAS),
266 # 5: second
267 {},
268 # 6: year
269 {},
270 )
271
272 LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {})
273
274 LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130)
275
276 def __init__(
277 self,
278 expr_format,
279 start_time=None,
280 ret_type=float,
281 day_or=True,
282 max_years_between_matches=None,
283 is_prev=False,
284 hash_id=None,
285 implement_cron_bug=False,
286 second_at_beginning=None,
287 expand_from_start_time=False,
288 ):
289 self._ret_type = ret_type
290 self._day_or = day_or
291 self._implement_cron_bug = implement_cron_bug
292 self.second_at_beginning = bool(second_at_beginning)
293 self._expand_from_start_time = expand_from_start_time
294
295 if hash_id:
296 if not isinstance(hash_id, (bytes, str)):
297 raise TypeError("hash_id must be bytes or UTF-8 string")
298 if not isinstance(hash_id, bytes):
299 hash_id = hash_id.encode("UTF-8")
300
301 self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None
302 if not self._max_years_btw_matches_explicitly_set:
303 max_years_between_matches = 50
304 self._max_years_between_matches = max(int(max_years_between_matches), 1)
305
306 if start_time is None:
307 start_time = time()
308
309 self.tzinfo = None
310
311 self.start_time = None
312 self.dst_start_time = None
313 self.cur = None
314 self.set_current(start_time, force=False)
315
316 self.expanded, self.nth_weekday_of_month = self.expand(
317 expr_format,
318 hash_id=hash_id,
319 from_timestamp=self.dst_start_time if self._expand_from_start_time else None,
320 second_at_beginning=second_at_beginning,
321 )
322 self.fields = CRON_FIELDS[len(self.expanded)]
323 self.expressions = EXPRESSIONS[(expr_format, hash_id, second_at_beginning)]
324 self._is_prev = is_prev
325
326 @classmethod
327 def _alphaconv(cls, index, key, expressions):
328 try:
329 return cls.ALPHACONV[index][key]
330 except KeyError:
331 raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable")
332
333 def get_next(self, ret_type=None, start_time=None, update_current=True):
334 if start_time and self._expand_from_start_time:
335 raise ValueError(
336 "start_time is not supported when using expand_from_start_time = True."
337 )
338 return self._get_next(
339 ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current
340 )
341
342 def get_prev(self, ret_type=None, start_time=None, update_current=True):
343 return self._get_next(
344 ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current
345 )
346
347 def get_current(self, ret_type=None):
348 ret_type = ret_type or self._ret_type
349 if issubclass(ret_type, datetime.datetime):
350 return self.timestamp_to_datetime(self.cur)
351 return self.cur
352
353 def set_current(
354 self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True
355 ) -> Optional[float]:
356 if (force or (self.cur is None)) and start_time is not None:
357 if isinstance(start_time, datetime.datetime):
358 self.tzinfo = start_time.tzinfo
359 start_time = self.datetime_to_timestamp(start_time)
360
361 self.start_time = start_time
362 self.dst_start_time = start_time
363 self.cur = start_time
364 return self.cur
365
366 @staticmethod
367 def datetime_to_timestamp(d: datetime.datetime) -> float:
368 """
369 Converts a `datetime` object `d` into a UNIX timestamp.
370 """
371 return datetime_to_timestamp(d)
372
373 _datetime_to_timestamp = datetime_to_timestamp # retrocompat
374
375 def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime:
376 """
377 Converts a UNIX `timestamp` into a `datetime` object.
378 """
379 if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set
380 tzinfo = self.tzinfo
381 key = (timestamp, repr(tzinfo))
382 try:
383 return TIMESTAMP_TO_DT_CACHE[key]
384 except KeyError:
385 pass
386 if OVERFLOW32B_MODE:
387 # degraded mode to workaround Y2038
388 # see https://github.com/python/cpython/issues/101069
389 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp)
390 else:
391 result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None)
392 if tzinfo:
393 result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo)
394 TIMESTAMP_TO_DT_CACHE[key] = result
395 return result
396
397 _timestamp_to_datetime = timestamp_to_datetime # retrocompat
398
399 def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None):
400 if update_current is None:
401 update_current = True
402 self.set_current(start_time, force=True)
403 if is_prev is None:
404 is_prev = self._is_prev
405 self._is_prev = is_prev
406
407 ret_type = ret_type or self._ret_type
408
409 if not issubclass(ret_type, (float, datetime.datetime)):
410 raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.")
411
412 result = self._calc_next(is_prev)
413 timestamp = self.datetime_to_timestamp(result)
414 if update_current:
415 self.cur = timestamp
416 if issubclass(ret_type, datetime.datetime):
417 return result
418 return timestamp
419
420 # iterator protocol, to enable direct use of croniter
421 # objects in a loop, like "for dt in croniter("5 0 * * *'): ..."
422 # or for combining multiple croniters into single
423 # dates feed using 'itertools' module
424 def all_next(self, ret_type=None, start_time=None, update_current=None):
425 """
426 Returns a generator yielding consecutive dates.
427
428 May be used instead of an implicit call to __iter__ whenever a
429 non-default `ret_type` needs to be specified.
430 """
431 # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could
432 # be used instead
433 try:
434 while True:
435 self._is_prev = False
436 yield self._get_next(
437 ret_type=ret_type, start_time=start_time, update_current=update_current
438 )
439 start_time = None
440 except CroniterBadDateError:
441 if self._max_years_btw_matches_explicitly_set:
442 return
443 raise
444
445 def all_prev(self, ret_type=None, start_time=None, update_current=None):
446 """
447 Returns a generator yielding previous dates.
448 """
449 try:
450 while True:
451 self._is_prev = True
452 yield self._get_next(
453 ret_type=ret_type, start_time=start_time, update_current=update_current
454 )
455 start_time = None
456 except CroniterBadDateError:
457 if self._max_years_btw_matches_explicitly_set:
458 return
459 raise
460
461 def iter(self, *args, **kwargs):
462 return self.all_prev if self._is_prev else self.all_next
463
464 def __iter__(self):
465 return self
466
467 __next__ = next = _get_next
468
469 def _calc_next(self, is_prev: bool) -> datetime.datetime:
470 current = self.timestamp_to_datetime(self.cur)
471 expanded = self.expanded[:]
472 nth_weekday_of_month = self.nth_weekday_of_month.copy()
473
474 # exception to support day of month and day of week as defined in cron
475 if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or:
476 # If requested, handle a bug in vixie cron/ISC cron where day_of_month and
477 # day_of_week form an intersection (AND) instead of a union (OR) if either
478 # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html)
479 if self._implement_cron_bug and (
480 re_star.match(self.expressions[DAY_FIELD])
481 or re_star.match(self.expressions[DOW_FIELD])
482 ):
483 # To produce a schedule identical to the cron bug, we'll bypass the code
484 # that makes a union of DOM and DOW, and instead skip to the code that
485 # does an intersect instead
486 pass
487 else:
488 bak = expanded[DOW_FIELD]
489 expanded[DOW_FIELD] = ["*"]
490 t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
491 expanded[DOW_FIELD] = bak
492 expanded[DAY_FIELD] = ["*"]
493
494 t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
495 if is_prev:
496 return t1 if t1 > t2 else t2
497 return t1 if t1 < t2 else t2
498
499 return self._calc(current, expanded, nth_weekday_of_month, is_prev)
500
501 def _calc(
502 self,
503 now: datetime.datetime,
504 expanded: list[ExpandedExpression],
505 nth_weekday_of_month: dict[int, set[int]],
506 is_prev: bool,
507 ) -> datetime.datetime:
508 if is_prev:
509 nearest_diff_method = self._get_prev_nearest_diff
510 offset = relativedelta(microseconds=-1)
511 else:
512 nearest_diff_method = self._get_next_nearest_diff
513 if len(expanded) > UNIX_CRON_LEN:
514 offset = relativedelta(seconds=1)
515 else:
516 offset = relativedelta(minutes=1)
517 # Calculate the next cron time in local time a.k.a. timezone unaware time.
518 unaware_time = now.replace(tzinfo=None) + offset
519 if len(expanded) > UNIX_CRON_LEN:
520 unaware_time = unaware_time.replace(microsecond=0)
521 else:
522 unaware_time = unaware_time.replace(second=0, microsecond=0)
523
524 month = unaware_time.month
525 year = current_year = unaware_time.year
526
527 def proc_year(d):
528 if len(expanded) == YEAR_CRON_LEN:
529 try:
530 expanded[YEAR_FIELD].index("*")
531 except ValueError:
532 # use None as range_val to indicate no loop
533 diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None)
534 if diff_year is None:
535 return None, d
536 if diff_year != 0:
537 if is_prev:
538 d += relativedelta(
539 years=diff_year, month=12, day=31, hour=23, minute=59, second=59
540 )
541 else:
542 d += relativedelta(
543 years=diff_year, month=1, day=1, hour=0, minute=0, second=0
544 )
545 return True, d
546 return False, d
547
548 def proc_month(d):
549 try:
550 expanded[MONTH_FIELD].index("*")
551 except ValueError:
552 diff_month = nearest_diff_method(
553 d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR
554 )
555 reset_day = 1
556
557 if diff_month is not None and diff_month != 0:
558 if is_prev:
559 d += relativedelta(months=diff_month)
560 reset_day = _last_day_of_month(d.year, d.month)
561 d += relativedelta(day=reset_day, hour=23, minute=59, second=59)
562 else:
563 d += relativedelta(
564 months=diff_month, day=reset_day, hour=0, minute=0, second=0
565 )
566 return True, d
567 return False, d
568
569 def proc_day_of_month(d):
570 try:
571 expanded[DAY_FIELD].index("*")
572 except ValueError:
573 days = _last_day_of_month(year, month)
574 if "l" in expanded[DAY_FIELD] and days == d.day:
575 return False, d
576
577 if is_prev:
578 days_in_prev_month = DAYS[(month - 2) % self.MONTHS_IN_YEAR]
579 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month)
580 else:
581 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days)
582
583 if diff_day is not None and diff_day != 0:
584 if is_prev:
585 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
586 else:
587 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
588 return True, d
589 return False, d
590
591 def proc_day_of_week(d):
592 try:
593 expanded[DOW_FIELD].index("*")
594 except ValueError:
595 diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7)
596 if diff_day_of_week is not None and diff_day_of_week != 0:
597 if is_prev:
598 d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59)
599 else:
600 d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
601 return True, d
602 return False, d
603
604 def proc_day_of_week_nth(d):
605 if "*" in nth_weekday_of_month:
606 s = nth_weekday_of_month["*"]
607 for i in range(0, 7):
608 if i in nth_weekday_of_month:
609 nth_weekday_of_month[i].update(s)
610 else:
611 nth_weekday_of_month[i] = s
612 del nth_weekday_of_month["*"]
613
614 candidates = []
615 for wday, nth in nth_weekday_of_month.items():
616 c = self._get_nth_weekday_of_month(d.year, d.month, wday)
617 for n in nth:
618 if n == "l":
619 candidate = c[-1]
620 elif len(c) < n:
621 continue
622 else:
623 candidate = c[n - 1]
624 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
625 candidates.append(candidate)
626
627 if not candidates:
628 if is_prev:
629 d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
630 else:
631 days = _last_day_of_month(year, month)
632 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
633 return True, d
634
635 candidates.sort()
636 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
637 if diff_day != 0:
638 if is_prev:
639 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
640 else:
641 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
642 return True, d
643 return False, d
644
645 def proc_hour(d):
646 try:
647 expanded[HOUR_FIELD].index("*")
648 except ValueError:
649 diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24)
650 if diff_hour is not None and diff_hour != 0:
651 if is_prev:
652 d += relativedelta(hours=diff_hour, minute=59, second=59)
653 else:
654 d += relativedelta(hours=diff_hour, minute=0, second=0)
655 return True, d
656 return False, d
657
658 def proc_minute(d):
659 try:
660 expanded[MINUTE_FIELD].index("*")
661 except ValueError:
662 diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60)
663 if diff_min is not None and diff_min != 0:
664 if is_prev:
665 d += relativedelta(minutes=diff_min, second=59)
666 else:
667 d += relativedelta(minutes=diff_min, second=0)
668 return True, d
669 return False, d
670
671 def proc_second(d):
672 if len(expanded) > UNIX_CRON_LEN:
673 try:
674 expanded[SECOND_FIELD].index("*")
675 except ValueError:
676 diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60)
677 if diff_sec is not None and diff_sec != 0:
678 d += relativedelta(seconds=diff_sec)
679 return True, d
680 else:
681 d += relativedelta(second=0)
682 return False, d
683
684 procs = [
685 proc_year,
686 proc_month,
687 proc_day_of_month,
688 (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week),
689 proc_hour,
690 proc_minute,
691 proc_second,
692 ]
693
694 while abs(year - current_year) <= self._max_years_between_matches:
695 next = False
696 stop = False
697 for proc in procs:
698 (changed, unaware_time) = proc(unaware_time)
699 # `None` can be set mostly for year processing
700 # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff
701 if changed is None:
702 stop = True
703 break
704 if changed:
705 month, year = unaware_time.month, unaware_time.year
706 next = True
707 break
708 if stop:
709 break
710 if next:
711 continue
712
713 unaware_time = unaware_time.replace(microsecond=0)
714 if now.tzinfo is None:
715 return unaware_time
716
717 # Add timezone information back and handle DST changes
718 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
719
720 if not exists and (
721 not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD]
722 ):
723 # The calculated local date does not exist and moving the time forward
724 # to the next valid time isn't the correct solution. Search for the
725 # next matching cron time that exists.
726 while not exists:
727 unaware_time = self._calc(
728 unaware_time, expanded, nth_weekday_of_month, is_prev
729 )
730 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
731
732 offset_delta = _timezone_delta(now, aware_time)
733 if not offset_delta:
734 # There was no DST change.
735 return aware_time
736
737 # There was a DST change. So check if there is a alternative cron time
738 # for the other UTC offset.
739 alternative_unaware_time = now.replace(tzinfo=None) + offset_delta
740 alternative_unaware_time = self._calc(
741 alternative_unaware_time, expanded, nth_weekday_of_month, is_prev
742 )
743 alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev)
744
745 if not _is_successor(alternative_aware_time, now, is_prev):
746 # The alternative time is an ancestor of now. Thus it is not an alternative.
747 return aware_time
748
749 if _is_successor(aware_time, alternative_aware_time, is_prev):
750 return alternative_aware_time
751
752 return aware_time
753
754 if is_prev:
755 raise CroniterBadDateError("failed to find prev date")
756 raise CroniterBadDateError("failed to find next date")
757
758 @staticmethod
759 def _get_next_nearest_diff(x, to_check, range_val):
760 """
761 `range_val` is the range of a field.
762 If no available time, we can move to next loop(like next month).
763 `range_val` can also be set to `None` to indicate that there is no loop.
764 ( Currently, should only used for `year` field )
765 """
766 for i, d in enumerate(to_check):
767 if range_val is not None:
768 if d == "l":
769 # if 'l' then it is the last day of month
770 # => its value of range_val
771 d = range_val
772 elif d > range_val:
773 continue
774 if d >= x:
775 return d - x
776 # When range_val is None and x not exists in to_check,
777 # `None` will be returned to suggest no more available time
778 if range_val is None:
779 return None
780 return to_check[0] - x + range_val
781
782 @staticmethod
783 def _get_prev_nearest_diff(x, to_check, range_val):
784 """
785 `range_val` is the range of a field.
786 If no available time, we can move to previous loop(like previous month).
787 Range_val can also be set to `None` to indicate that there is no loop.
788 ( Currently should only used for `year` field )
789 """
790 candidates = to_check[:]
791 candidates.reverse()
792 for d in candidates:
793 if d != "l" and d <= x:
794 return d - x
795 if "l" in candidates:
796 return -x
797 # When range_val is None and x not exists in to_check,
798 # `None` will be returned to suggest no more available time
799 if range_val is None:
800 return None
801 candidate = candidates[0]
802 for c in candidates:
803 # fixed: c < range_val
804 # this code will reject all 31 day of month, 12 month, 59 second,
805 # 23 hour and so on.
806 # if candidates has just a element, this will not harmful.
807 # but candidates have multiple elements, then values equal to
808 # range_val will rejected.
809 if c <= range_val:
810 candidate = c
811 break
812 # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error
813 # return 2021-03-02 06:00:00
814 if candidate > range_val:
815 return -range_val
816 return candidate - x - range_val
817
818 @staticmethod
819 def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]:
820 """For a given year/month return a list of days in nth-day-of-month order.
821 The last weekday of the month is always [-1].
822 """
823 w = (day_of_week + 6) % 7
824 c = calendar.Calendar(w).monthdayscalendar(year, month)
825 if c[0][0] == 0:
826 c.pop(0)
827 return tuple(i[0] for i in c)
828
829 @classmethod
830 def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN):
831 if isinstance(len_expressions, (list, dict, tuple, set)):
832 len_expressions = len(len_expressions)
833 if val in cls.LOWMAP[field_index] and not (
834 # do not support 0 as a month either for classical 5 fields cron,
835 # 6fields second repeat form or 7 fields year form
836 # but still let conversion happen if day field is shifted
837 (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN)
838 or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN)
839 or (
840 field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD]
841 and len_expressions == YEAR_CRON_LEN
842 )
843 ):
844 val = cls.LOWMAP[field_index][val]
845 return val
846
847 @classmethod
848 def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None):
849 # Split the expression in components, and normalize L -> l, MON -> mon,
850 # etc. Keep expr_format untouched so we can use it in the exception
851 # messages.
852 expr_aliases = {
853 "@midnight": ("0 0 * * *", "h h(0-2) * * * h"),
854 "@hourly": ("0 * * * *", "h * * * * h"),
855 "@daily": ("0 0 * * *", "h h * * * h"),
856 "@weekly": ("0 0 * * 0", "h h * * h h"),
857 "@monthly": ("0 0 1 * *", "h h h * * h"),
858 "@yearly": ("0 0 1 1 *", "h h h h * h"),
859 "@annually": ("0 0 1 1 *", "h h h h * h"),
860 }
861
862 efl = expr_format.lower()
863 hash_id_expr = 1 if hash_id is not None else 0
864 try:
865 efl = expr_aliases[efl][hash_id_expr]
866 except KeyError:
867 pass
868
869 expressions = efl.split()
870
871 if len(expressions) not in VALID_LEN_EXPRESSION:
872 raise CroniterBadCronError(
873 "Exactly 5, 6 or 7 columns has to be specified for iterator expression."
874 )
875
876 if len(expressions) > UNIX_CRON_LEN and second_at_beginning:
877 # move second to it's own(6th) field to process by same logical
878 expressions.insert(SECOND_FIELD, expressions.pop(0))
879
880 expanded = []
881 nth_weekday_of_month = {}
882
883 for field_index, expr in enumerate(expressions):
884 for expanderid, expander in EXPANDERS.items():
885 expr = expander(cls).expand(
886 efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp
887 )
888
889 if "?" in expr:
890 if expr != "?":
891 raise CroniterBadCronError(
892 f"[{expr_format}] is not acceptable."
893 f" Question mark can not used with other characters"
894 )
895 if field_index not in [DAY_FIELD, DOW_FIELD]:
896 raise CroniterBadCronError(
897 f"[{expr_format}] is not acceptable. "
898 f"Question mark can only used in day_of_month or day_of_week"
899 )
900 # currently just trade `?` as `*`
901 expr = "*"
902
903 e_list = expr.split(",")
904 res = []
905
906 while len(e_list) > 0:
907 e = e_list.pop()
908 nth = None
909
910 if field_index == DOW_FIELD:
911 # Handle special case in the dow expression: 2#3, l3
912 special_dow_rem = special_dow_re.match(str(e))
913 if special_dow_rem:
914 g = special_dow_rem.groupdict()
915 he, last = g.get("he", ""), g.get("last", "")
916 if he:
917 e = he
918 try:
919 nth = int(last)
920 assert 5 >= nth >= 1
921 except (KeyError, ValueError, AssertionError):
922 raise CroniterBadCronError(
923 f"[{expr_format}] is not acceptable."
924 f" Invalid day_of_week value: '{nth}'"
925 )
926 elif last:
927 e = last
928 nth = g["pre"] # 'l'
929
930 # Before matching step_search_re, normalize "*" to "{min}-{max}".
931 # Example: in the minute field, "*/5" normalizes to "0-59/5"
932 t = re.sub(
933 r"^\*(\/.+)$",
934 r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]),
935 str(e),
936 )
937 m = step_search_re.search(t)
938
939 if not m:
940 # Before matching step_search_re,
941 # normalize "{start}/{step}" to "{start}-{max}/{step}".
942 # Example: in the minute field, "10/5" normalizes to "10-59/5"
943 t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e))
944 m = step_search_re.search(t)
945
946 if m:
947 # early abort if low/high are out of bounds
948 (low, high, step) = m.group(1), m.group(2), m.group(4) or 1
949 if field_index == DAY_FIELD and high == "l":
950 high = "31"
951
952 if not only_int_re.search(low):
953 low = str(cls._alphaconv(field_index, low, expressions))
954
955 if not only_int_re.search(high):
956 high = str(cls._alphaconv(field_index, high, expressions))
957
958 # normally, it's already guarded by the RE that should not accept
959 # not-int values.
960 if not only_int_re.search(str(step)):
961 raise CroniterBadCronError(
962 f"[{expr_format}] step '{step}'"
963 f" in field {field_index} is not acceptable"
964 )
965 step = int(step)
966
967 for band in low, high:
968 if not only_int_re.search(str(band)):
969 raise CroniterBadCronError(
970 f"[{expr_format}] bands '{low}-{high}'"
971 f" in field {field_index} are not acceptable"
972 )
973
974 low, high = (
975 cls.value_alias(int(_val), field_index, expressions)
976 for _val in (low, high)
977 )
978
979 if max(low, high) > max(
980 cls.RANGES[field_index][0], cls.RANGES[field_index][1]
981 ):
982 raise CroniterBadCronError(f"{expr_format} is out of bands")
983
984 if from_timestamp:
985 low = cls._get_low_from_current_date_number(
986 field_index, int(step), int(from_timestamp)
987 )
988
989 # Handle when the second bound of the range is in backtracking order:
990 # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH
991 if low > high:
992 whole_field_range = list(
993 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1)
994 )
995 # Add FirstBound -> ENDRANGE, respecting step
996 rng = list(range(low, cls.RANGES[field_index][1] + 1, step))
997 # Then 0 -> SecondBound, but skipping n first occurences according to step
998 # EG to respect such expressions : Apr-Jan/3
999 to_skip = 0
1000 if rng:
1001 already_skipped = list(reversed(whole_field_range)).index(rng[-1])
1002 curpos = whole_field_range.index(rng[-1])
1003 if ((curpos + step) > len(whole_field_range)) and (
1004 already_skipped < step
1005 ):
1006 to_skip = step - already_skipped
1007 rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step))
1008 # if we include a range type: Jan-Jan, or Sun-Sun,
1009 # it means the whole cycle (all days of week, # all monthes of year, etc)
1010 elif low == high:
1011 rng = list(
1012 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step)
1013 )
1014 else:
1015 try:
1016 rng = list(range(low, high + 1, step))
1017 except ValueError as exc:
1018 raise CroniterBadCronError(f"invalid range: {exc}")
1019
1020 if field_index == DOW_FIELD and nth and nth != "l":
1021 rng = [f"{item}#{nth}" for item in rng]
1022 e_list += [a for a in rng if a not in e_list]
1023 else:
1024 if t.startswith("-"):
1025 raise CroniterBadCronError(
1026 f"[{expr_format}] is not acceptable, negative numbers not allowed"
1027 )
1028 if not star_or_int_re.search(t):
1029 t = cls._alphaconv(field_index, t, expressions)
1030
1031 try:
1032 t = int(t)
1033 except ValueError:
1034 pass
1035
1036 t = cls.value_alias(t, field_index, expressions)
1037
1038 if t not in ["*", "l"] and (
1039 int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1]
1040 ):
1041 raise CroniterBadCronError(
1042 f"[{expr_format}] is not acceptable, out of range"
1043 )
1044
1045 res.append(t)
1046
1047 if field_index == DOW_FIELD and nth:
1048 if t not in nth_weekday_of_month:
1049 nth_weekday_of_month[t] = set()
1050 nth_weekday_of_month[t].add(nth)
1051
1052 res = set(res)
1053 res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i)
1054 if len(res) == cls.LEN_MEANS_ALL[field_index]:
1055 # Make sure the wildcard is used in the correct way (avoid over-optimization)
1056 if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or (
1057 field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD]
1058 ):
1059 pass
1060 else:
1061 res = ["*"]
1062
1063 expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res)
1064
1065 # Check to make sure the dow combo in use is supported
1066 if nth_weekday_of_month:
1067 dow_expanded_set = set(expanded[DOW_FIELD])
1068 dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys())
1069 dow_expanded_set.discard("*")
1070 # Skip: if it's all weeks instead of wildcard
1071 if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]:
1072 raise CroniterUnsupportedSyntaxError(
1073 f"day-of-week field does not support mixing literal values and nth"
1074 f" day of week syntax. Cron: '{expr_format}'"
1075 f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}"
1076 )
1077
1078 EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] = expressions
1079 return expanded, nth_weekday_of_month
1080
1081 @classmethod
1082 def expand(
1083 cls,
1084 expr_format: str,
1085 hash_id: Optional[Union[bytes, str]] = None,
1086 second_at_beginning: bool = False,
1087 from_timestamp: Optional[float] = None,
1088 ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]:
1089 """
1090 Expand a cron expression format into a noramlized format of
1091 list[list[int | 'l' | '*']]. The first list representing each element
1092 of the epxression, and each sub-list representing the allowed values
1093 for that expression component.
1094
1095 A tuple is returned, the first value being the expanded epxression
1096 list, and the second being a `nth_weekday_of_month` mapping.
1097
1098 Examples:
1099
1100 # Every minute
1101 >>> croniter.expand('* * * * *')
1102 ([['*'], ['*'], ['*'], ['*'], ['*']], {})
1103
1104 # On the hour
1105 >>> croniter.expand('0 0 * * *')
1106 ([[0], [0], ['*'], ['*'], ['*']], {})
1107
1108 # Hours 0-5 and 10 monday through friday
1109 >>> croniter.expand('0-5,10 * * * mon-fri')
1110 ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {})
1111
1112 Note that some special values such as nth day of week are expanded to a
1113 special mapping format for later processing:
1114
1115 # Every minute on the 3rd tuesday of the month
1116 >>> croniter.expand('* * * * 2#3')
1117 ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}})
1118
1119 # Every hour on the last day of the month
1120 >>> croniter.expand('0 * l * *')
1121 ([[0], ['*'], ['l'], ['*'], ['*']], {})
1122
1123 # On the hour every 15 seconds
1124 >>> croniter.expand('0 0 * * * */15')
1125 ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {})
1126 """
1127 try:
1128 return cls._expand(
1129 expr_format,
1130 hash_id=hash_id,
1131 second_at_beginning=second_at_beginning,
1132 from_timestamp=from_timestamp,
1133 )
1134 except (ValueError,) as exc:
1135 if isinstance(exc, CroniterError):
1136 raise
1137 trace = _traceback.format_exc()
1138 raise CroniterBadCronError(trace)
1139
1140 @classmethod
1141 def _get_low_from_current_date_number(cls, field_index, step, from_timestamp):
1142 dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT)
1143 if field_index == MINUTE_FIELD:
1144 return dt.minute % step
1145 if field_index == HOUR_FIELD:
1146 return dt.hour % step
1147 if field_index == DAY_FIELD:
1148 return ((dt.day - 1) % step) + 1
1149 if field_index == MONTH_FIELD:
1150 return dt.month % step
1151 if field_index == DOW_FIELD:
1152 return (dt.weekday() + 1) % step
1153
1154 raise ValueError("Can't get current date number for index larger than 4")
1155
1156 @classmethod
1157 def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False):
1158 if hash_id:
1159 if not isinstance(hash_id, (bytes, str)):
1160 raise TypeError("hash_id must be bytes or UTF-8 string")
1161 if not isinstance(hash_id, bytes):
1162 hash_id = hash_id.encode(encoding)
1163 try:
1164 cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning)
1165 except CroniterError:
1166 return False
1167 return True
1168
1169 @classmethod
1170 def match(cls, cron_expression, testdate, day_or=True, second_at_beginning=False):
1171 return cls.match_range(cron_expression, testdate, testdate, day_or, second_at_beginning)
1172
1173 @classmethod
1174 def match_range(
1175 cls, cron_expression, from_datetime, to_datetime, day_or=True, second_at_beginning=False
1176 ):
1177 cron = cls(
1178 cron_expression,
1179 to_datetime,
1180 ret_type=datetime.datetime,
1181 day_or=day_or,
1182 second_at_beginning=second_at_beginning,
1183 )
1184 tdp = cron.get_current(datetime.datetime)
1185 if not tdp.microsecond:
1186 tdp += relativedelta(microseconds=1)
1187 cron.set_current(tdp, force=True)
1188 try:
1189 tdt = cron.get_prev()
1190 except CroniterBadDateError:
1191 return False
1192 precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60
1193 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds
1194 return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second
1195
1196
1197def croniter_range(
1198 start,
1199 stop,
1200 expr_format,
1201 ret_type=None,
1202 day_or=True,
1203 exclude_ends=False,
1204 _croniter=None,
1205 second_at_beginning=False,
1206 expand_from_start_time=False,
1207):
1208 """
1209 Generator that provides all times from start to stop matching the given cron expression.
1210 If the cron expression matches either 'start' and/or 'stop', those times will be returned as
1211 well unless 'exclude_ends=True' is passed.
1212
1213 You can think of this function as sibling to the builtin range function for datetime objects.
1214 Like range(start,stop,step), except that here 'step' is a cron expression.
1215 """
1216 _croniter = _croniter or croniter
1217 auto_rt = datetime.datetime
1218 # type is used in first if branch for perfs reasons
1219 if type(start) is not type(stop) and not (
1220 isinstance(start, type(stop)) or isinstance(stop, type(start))
1221 ):
1222 raise CroniterBadTypeRangeError(
1223 f"The start and stop must be same type. {type(start)} != {type(stop)}"
1224 )
1225 if isinstance(start, (float, int)):
1226 start, stop = (
1227 datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop)
1228 )
1229 auto_rt = float
1230 if ret_type is None:
1231 ret_type = auto_rt
1232 if not exclude_ends:
1233 ms1 = relativedelta(microseconds=1)
1234 if start < stop: # Forward (normal) time order
1235 start -= ms1
1236 stop += ms1
1237 else: # Reverse time order
1238 start += ms1
1239 stop -= ms1
1240 year_span = math.floor(abs(stop.year - start.year)) + 1
1241 ic = _croniter(
1242 expr_format,
1243 start,
1244 ret_type=datetime.datetime,
1245 day_or=day_or,
1246 max_years_between_matches=year_span,
1247 second_at_beginning=second_at_beginning,
1248 expand_from_start_time=expand_from_start_time,
1249 )
1250 # define a continue (cont) condition function and step function for the main while loop
1251 if start < stop: # Forward
1252
1253 def cont(v):
1254 return v < stop
1255
1256 step = ic.get_next
1257 else: # Reverse
1258
1259 def cont(v):
1260 return v > stop
1261
1262 step = ic.get_prev
1263 try:
1264 dt = step()
1265 while cont(dt):
1266 if ret_type is float:
1267 yield ic.get_current(float)
1268 else:
1269 yield dt
1270 dt = step()
1271 except CroniterBadDateError:
1272 # Stop iteration when this exception is raised; no match found within the given year range
1273 return
1274
1275
1276class HashExpander:
1277 def __init__(self, cronit):
1278 self.cron = cronit
1279
1280 def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None):
1281 """Return a hashed/random integer given range/hash information"""
1282 if range_end is None:
1283 range_end = self.cron.RANGES[idx][1]
1284 if range_begin is None:
1285 range_begin = self.cron.RANGES[idx][0]
1286 if hash_type == "r":
1287 crc = random.randint(0, 0xFFFFFFFF)
1288 else:
1289 crc = binascii.crc32(hash_id) & 0xFFFFFFFF
1290 return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin
1291
1292 def match(self, efl, idx, expr, hash_id=None, **kw):
1293 return hash_expression_re.match(expr)
1294
1295 def expand(self, efl, idx, expr, hash_id=None, match="", **kw):
1296 """Expand a hashed/random expression to its normal representation"""
1297 if match == "":
1298 match = self.match(efl, idx, expr, hash_id, **kw)
1299 if not match:
1300 return expr
1301 m = match.groupdict()
1302
1303 if m["hash_type"] == "h" and hash_id is None:
1304 raise CroniterBadCronError("Hashed definitions must include hash_id")
1305
1306 if m["range_begin"] and m["range_end"]:
1307 if int(m["range_begin"]) >= int(m["range_end"]):
1308 raise CroniterBadCronError("Range end must be greater than range begin")
1309
1310 if m["range_begin"] and m["range_end"] and m["divisor"]:
1311 # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54)
1312 if int(m["divisor"]) == 0:
1313 raise CroniterBadCronError(f"Bad expression: {expr}")
1314
1315 x = self.do(
1316 idx,
1317 hash_type=m["hash_type"],
1318 hash_id=hash_id,
1319 range_begin=int(m["range_begin"]),
1320 range_end=int(m["divisor"]) - 1 + int(m["range_begin"]),
1321 )
1322 return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}"
1323 if m["range_begin"] and m["range_end"]:
1324 # Example: H(0-29) -> 12
1325 return str(
1326 self.do(
1327 idx,
1328 hash_type=m["hash_type"],
1329 hash_id=hash_id,
1330 range_end=int(m["range_end"]),
1331 range_begin=int(m["range_begin"]),
1332 )
1333 )
1334 if m["divisor"]:
1335 # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52)
1336 if int(m["divisor"]) == 0:
1337 raise CroniterBadCronError(f"Bad expression: {expr}")
1338
1339 x = self.do(
1340 idx,
1341 hash_type=m["hash_type"],
1342 hash_id=hash_id,
1343 range_begin=self.cron.RANGES[idx][0],
1344 range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0],
1345 )
1346 return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}"
1347
1348 # Example: H -> 32
1349 return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id))
1350
1351
1352EXPANDERS = {"hash": HashExpander}