1#!/usr/bin/env python
2import binascii
3import calendar
4import copy
5import datetime
6import math
7import platform
8import random
9import re
10import struct
11import sys
12import traceback as _traceback
13from time import time
14from typing import Any, Literal, Optional, Union
15
16from dateutil.relativedelta import relativedelta
17from dateutil.tz import datetime_exists, tzutc
18
19ExpandedExpression = list[Union[int, Literal["*", "l"]]]
20
21
22def is_32bit() -> bool:
23 """
24 Detect if Python is running in 32-bit mode.
25 Returns True if running on 32-bit Python, False for 64-bit.
26 """
27 # Method 1: Check pointer size
28 bits = struct.calcsize("P") * 8
29
30 # Method 2: Check platform architecture string
31 try:
32 architecture = platform.architecture()[0]
33 except RuntimeError:
34 architecture = None
35
36 # Method 3: Check maxsize
37 is_small_maxsize = sys.maxsize <= 2**32
38
39 # Evaluate all available methods
40 is_32 = False
41
42 if bits == 32:
43 is_32 = True
44 elif architecture and "32" in architecture:
45 is_32 = True
46 elif is_small_maxsize:
47 is_32 = True
48
49 return is_32
50
51
52try:
53 # https://github.com/python/cpython/issues/101069 detection
54 if is_32bit():
55 datetime.datetime.fromtimestamp(3999999999)
56 OVERFLOW32B_MODE = False
57except OverflowError:
58 OVERFLOW32B_MODE = True
59
60
61UTC_DT = datetime.timezone.utc
62EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT)
63
64M_ALPHAS: dict[str, Union[int, str]] = {
65 "jan": 1,
66 "feb": 2,
67 "mar": 3,
68 "apr": 4, # noqa: E241
69 "may": 5,
70 "jun": 6,
71 "jul": 7,
72 "aug": 8, # noqa: E241
73 "sep": 9,
74 "oct": 10,
75 "nov": 11,
76 "dec": 12,
77}
78DOW_ALPHAS: dict[str, Union[int, str]] = {
79 "sun": 0,
80 "mon": 1,
81 "tue": 2,
82 "wed": 3,
83 "thu": 4,
84 "fri": 5,
85 "sat": 6,
86}
87
88MINUTE_FIELD = 0
89HOUR_FIELD = 1
90DAY_FIELD = 2
91MONTH_FIELD = 3
92DOW_FIELD = 4
93SECOND_FIELD = 5
94YEAR_FIELD = 6
95
96UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD)
97SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD)
98YEAR_FIELDS = (
99 MINUTE_FIELD,
100 HOUR_FIELD,
101 DAY_FIELD,
102 MONTH_FIELD,
103 DOW_FIELD,
104 SECOND_FIELD,
105 YEAR_FIELD,
106)
107
108step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$")
109only_int_re = re.compile(r"^\d+$")
110
111DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
112WEEKDAYS = "|".join(DOW_ALPHAS.keys())
113MONTHS = "|".join(M_ALPHAS.keys())
114star_or_int_re = re.compile(r"^(\d+|\*)$")
115special_dow_re = re.compile(
116 rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)"
117 rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$"
118)
119re_star = re.compile("[*]")
120hash_expression_re = re.compile(
121 r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$"
122)
123
124CRON_FIELDS = {
125 "unix": UNIX_FIELDS,
126 "second": SECOND_FIELDS,
127 "year": YEAR_FIELDS,
128 len(UNIX_FIELDS): UNIX_FIELDS,
129 len(SECOND_FIELDS): SECOND_FIELDS,
130 len(YEAR_FIELDS): YEAR_FIELDS,
131}
132UNIX_CRON_LEN = len(UNIX_FIELDS)
133SECOND_CRON_LEN = len(SECOND_FIELDS)
134YEAR_CRON_LEN = len(YEAR_FIELDS)
135# retrocompat
136VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)}
137EXPRESSIONS: dict[tuple[str, Optional[bytes], bool], list[str]] = {}
138MARKER = object()
139
140
141def datetime_to_timestamp(d):
142 if d.tzinfo is not None:
143 d = d.replace(tzinfo=None) - d.utcoffset()
144
145 return (d - datetime.datetime(1970, 1, 1)).total_seconds()
146
147
148def _is_leap(year: int) -> bool:
149 return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0)
150
151
152def _last_day_of_month(year: int, month: int) -> int:
153 """Calculate the last day of the given month (honor leap years)."""
154 last_day = DAYS[month - 1]
155 if month == 2 and _is_leap(year):
156 last_day += 1
157 return last_day
158
159
160def _is_successor(
161 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
162) -> bool:
163 """Check if the given date is a successor (after/before) of the previous date."""
164 if is_prev:
165 return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT)
166 return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT)
167
168
169def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta:
170 """Calculate the timezone difference of the given dates."""
171 offset1 = date1.utcoffset()
172 offset2 = date2.utcoffset()
173 assert offset1 is not None
174 assert offset2 is not None
175 return offset2 - offset1
176
177
178def _add_tzinfo(
179 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
180) -> tuple[datetime.datetime, bool]:
181 """Add the tzinfo from the previous date to the given date.
182
183 In case the new date is ambiguous, determine the correct date
184 based on it being closer to the previous date but still a successor
185 (after/before based on `is_prev`).
186
187 In case the date does not exist, jump forward to the next existing date.
188 """
189 localize = getattr(previous_date.tzinfo, "localize", None)
190 if localize is not None:
191 # pylint: disable-next=import-outside-toplevel
192 import pytz
193
194 try:
195 result = localize(date, is_dst=None)
196 except pytz.NonExistentTimeError:
197 while True:
198 date += datetime.timedelta(minutes=1)
199 try:
200 result = localize(date, is_dst=None)
201 except pytz.NonExistentTimeError:
202 continue
203 break
204 return result, False
205 except pytz.AmbiguousTimeError:
206 closer = localize(date, is_dst=not is_prev)
207 farther = localize(date, is_dst=is_prev)
208 # TODO: Check negative DST
209 assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
210 if _is_successor(closer, previous_date, is_prev):
211 result = closer
212 else:
213 assert _is_successor(farther, previous_date, is_prev)
214 result = farther
215 return result, True
216
217 result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo)
218 if not datetime_exists(result):
219 while not datetime_exists(result):
220 result += datetime.timedelta(minutes=1)
221 return result, False
222
223 # result is closer to the previous date
224 farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo)
225 # Comparing the UTC offsets in the check for the date being ambiguous.
226 if result.utcoffset() != farther.utcoffset():
227 # TODO: Check negative DST
228 assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
229 if not _is_successor(result, previous_date, is_prev):
230 assert _is_successor(farther, previous_date, is_prev)
231 result = farther
232 return result, True
233
234
235class CroniterError(ValueError):
236 """General top-level Croniter base exception"""
237
238
239class CroniterBadTypeRangeError(TypeError):
240 """."""
241
242
243class CroniterBadCronError(CroniterError):
244 """Syntax, unknown value, or range error within a cron expression"""
245
246
247class CroniterUnsupportedSyntaxError(CroniterBadCronError):
248 """Valid cron syntax, but likely to produce inaccurate results"""
249
250 # Extending CroniterBadCronError, which may be contridatory, but this allows
251 # catching both errors with a single exception. From a user perspective
252 # these will likely be handled the same way.
253
254
255class CroniterBadDateError(CroniterError):
256 """Unable to find next/prev timestamp match"""
257
258
259class CroniterNotAlphaError(CroniterBadCronError):
260 """Cron syntax contains an invalid day or month abbreviation"""
261
262
263class croniter:
264 MONTHS_IN_YEAR = 12
265
266 # This helps with expanding `*` fields into `lower-upper` ranges. Each item
267 # in this tuple maps to the corresponding field index
268 RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099))
269
270 ALPHACONV: tuple[dict[str, Union[int, str]], ...] = (
271 {}, # 0: min
272 {}, # 1: hour
273 {"l": "l"}, # 2: dom
274 # 3: mon
275 copy.deepcopy(M_ALPHAS),
276 # 4: dow
277 copy.deepcopy(DOW_ALPHAS),
278 # 5: second
279 {},
280 # 6: year
281 {},
282 )
283
284 LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {})
285
286 LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130)
287
288 def __init__(
289 self,
290 expr_format: str,
291 start_time: Optional[Union[datetime.datetime, float]] = None,
292 ret_type: type = float,
293 day_or: bool = True,
294 max_years_between_matches: Optional[int] = None,
295 is_prev: bool = False,
296 hash_id: Optional[Union[bytes, str]] = None,
297 implement_cron_bug: bool = False,
298 second_at_beginning: bool = False,
299 expand_from_start_time: bool = False,
300 ) -> None:
301 self._ret_type = ret_type
302 self._day_or = day_or
303 self._implement_cron_bug = implement_cron_bug
304 self.second_at_beginning = bool(second_at_beginning)
305 self._expand_from_start_time = expand_from_start_time
306
307 if hash_id is not None:
308 if not isinstance(hash_id, (bytes, str)):
309 raise TypeError("hash_id must be bytes or UTF-8 string")
310 if not isinstance(hash_id, bytes):
311 hash_id = hash_id.encode("UTF-8")
312
313 self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None
314 if max_years_between_matches is None:
315 max_years_between_matches = 50
316 self._max_years_between_matches = max(int(max_years_between_matches), 1)
317
318 if start_time is None:
319 start_time = time()
320
321 self.tzinfo: Optional[datetime.tzinfo] = None
322
323 self.start_time = 0.0
324 self.dst_start_time = 0.0
325 self.cur = 0.0
326 self.set_current(start_time, force=True)
327
328 self.expanded, self.nth_weekday_of_month = self.expand(
329 expr_format,
330 hash_id=hash_id,
331 from_timestamp=self.dst_start_time if self._expand_from_start_time else None,
332 second_at_beginning=second_at_beginning,
333 )
334 self.fields = CRON_FIELDS[len(self.expanded)]
335 self.expressions = EXPRESSIONS[(expr_format, hash_id, second_at_beginning)]
336 self._is_prev = is_prev
337
338 @classmethod
339 def _alphaconv(cls, index, key, expressions):
340 try:
341 return cls.ALPHACONV[index][key]
342 except KeyError:
343 raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable")
344
345 def get_next(self, ret_type=None, start_time=None, update_current=True):
346 if start_time and self._expand_from_start_time:
347 raise ValueError(
348 "start_time is not supported when using expand_from_start_time = True."
349 )
350 return self._get_next(
351 ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current
352 )
353
354 def get_prev(self, ret_type=None, start_time=None, update_current=True):
355 return self._get_next(
356 ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current
357 )
358
359 def get_current(self, ret_type=None):
360 ret_type = ret_type or self._ret_type
361 if issubclass(ret_type, datetime.datetime):
362 return self.timestamp_to_datetime(self.cur)
363 return self.cur
364
365 def set_current(
366 self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True
367 ) -> float:
368 if (force or (self.cur is None)) and start_time is not None:
369 if isinstance(start_time, datetime.datetime):
370 self.tzinfo = start_time.tzinfo
371 start_time = self.datetime_to_timestamp(start_time)
372
373 self.start_time = start_time
374 self.dst_start_time = start_time
375 self.cur = start_time
376 return self.cur
377
378 @staticmethod
379 def datetime_to_timestamp(d: datetime.datetime) -> float:
380 """
381 Converts a `datetime` object `d` into a UNIX timestamp.
382 """
383 return datetime_to_timestamp(d)
384
385 _datetime_to_timestamp = datetime_to_timestamp # retrocompat
386
387 def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime:
388 """
389 Converts a UNIX `timestamp` into a `datetime` object.
390 """
391 if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set
392 tzinfo = self.tzinfo
393 if OVERFLOW32B_MODE:
394 # degraded mode to workaround Y2038
395 # see https://github.com/python/cpython/issues/101069
396 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp)
397 else:
398 result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None)
399 if tzinfo:
400 result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo)
401 return result
402
403 _timestamp_to_datetime = timestamp_to_datetime # retrocompat
404
405 def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None):
406 if update_current is None:
407 update_current = True
408 self.set_current(start_time, force=True)
409 if is_prev is None:
410 is_prev = self._is_prev
411 self._is_prev = is_prev
412
413 ret_type = ret_type or self._ret_type
414
415 if not issubclass(ret_type, (float, datetime.datetime)):
416 raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.")
417
418 result = self._calc_next(is_prev)
419 timestamp = self.datetime_to_timestamp(result)
420 if update_current:
421 self.cur = timestamp
422 if issubclass(ret_type, datetime.datetime):
423 return result
424 return timestamp
425
426 # iterator protocol, to enable direct use of croniter
427 # objects in a loop, like "for dt in croniter("5 0 * * *'): ..."
428 # or for combining multiple croniters into single
429 # dates feed using 'itertools' module
430 def all_next(self, ret_type=None, start_time=None, update_current=None):
431 """
432 Returns a generator yielding consecutive dates.
433
434 May be used instead of an implicit call to __iter__ whenever a
435 non-default `ret_type` needs to be specified.
436 """
437 # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could
438 # be used instead
439 try:
440 while True:
441 self._is_prev = False
442 yield self._get_next(
443 ret_type=ret_type, start_time=start_time, update_current=update_current
444 )
445 start_time = None
446 except CroniterBadDateError:
447 if self._max_years_btw_matches_explicitly_set:
448 return
449 raise
450
451 def all_prev(self, ret_type=None, start_time=None, update_current=None):
452 """
453 Returns a generator yielding previous dates.
454 """
455 try:
456 while True:
457 self._is_prev = True
458 yield self._get_next(
459 ret_type=ret_type, start_time=start_time, update_current=update_current
460 )
461 start_time = None
462 except CroniterBadDateError:
463 if self._max_years_btw_matches_explicitly_set:
464 return
465 raise
466
467 def iter(self, *args, **kwargs):
468 return self.all_prev if self._is_prev else self.all_next
469
470 def __iter__(self):
471 return self
472
473 __next__ = next = _get_next
474
475 def _calc_next(self, is_prev: bool) -> datetime.datetime:
476 current = self.timestamp_to_datetime(self.cur)
477 expanded = self.expanded[:]
478 nth_weekday_of_month = self.nth_weekday_of_month.copy()
479
480 # exception to support day of month and day of week as defined in cron
481 if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or:
482 # If requested, handle a bug in vixie cron/ISC cron where day_of_month and
483 # day_of_week form an intersection (AND) instead of a union (OR) if either
484 # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html)
485 if self._implement_cron_bug and (
486 re_star.match(self.expressions[DAY_FIELD])
487 or re_star.match(self.expressions[DOW_FIELD])
488 ):
489 # To produce a schedule identical to the cron bug, we'll bypass the code
490 # that makes a union of DOM and DOW, and instead skip to the code that
491 # does an intersect instead
492 pass
493 else:
494 bak = expanded[DOW_FIELD]
495 expanded[DOW_FIELD] = ["*"]
496 t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
497 expanded[DOW_FIELD] = bak
498 expanded[DAY_FIELD] = ["*"]
499
500 t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
501 if is_prev:
502 return t1 if t1 > t2 else t2
503 return t1 if t1 < t2 else t2
504
505 return self._calc(current, expanded, nth_weekday_of_month, is_prev)
506
507 def _calc(
508 self,
509 now: datetime.datetime,
510 expanded: list[ExpandedExpression],
511 nth_weekday_of_month: dict[int, set[int]],
512 is_prev: bool,
513 ) -> datetime.datetime:
514 if is_prev:
515 nearest_diff_method = self._get_prev_nearest_diff
516 offset = relativedelta(microseconds=-1)
517 else:
518 nearest_diff_method = self._get_next_nearest_diff
519 if len(expanded) > UNIX_CRON_LEN:
520 offset = relativedelta(seconds=1)
521 else:
522 offset = relativedelta(minutes=1)
523 # Calculate the next cron time in local time a.k.a. timezone unaware time.
524 unaware_time = now.replace(tzinfo=None) + offset
525 if len(expanded) > UNIX_CRON_LEN:
526 unaware_time = unaware_time.replace(microsecond=0)
527 else:
528 unaware_time = unaware_time.replace(second=0, microsecond=0)
529
530 month = unaware_time.month
531 year = current_year = unaware_time.year
532
533 def proc_year(d):
534 if len(expanded) == YEAR_CRON_LEN:
535 try:
536 expanded[YEAR_FIELD].index("*")
537 except ValueError:
538 # use None as range_val to indicate no loop
539 diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None)
540 if diff_year is None:
541 return None, d
542 if diff_year != 0:
543 if is_prev:
544 d += relativedelta(
545 years=diff_year, month=12, day=31, hour=23, minute=59, second=59
546 )
547 else:
548 d += relativedelta(
549 years=diff_year, month=1, day=1, hour=0, minute=0, second=0
550 )
551 return True, d
552 return False, d
553
554 def proc_month(d):
555 try:
556 expanded[MONTH_FIELD].index("*")
557 except ValueError:
558 diff_month = nearest_diff_method(
559 d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR
560 )
561 reset_day = 1
562
563 if diff_month is not None and diff_month != 0:
564 if is_prev:
565 d += relativedelta(months=diff_month)
566 reset_day = _last_day_of_month(d.year, d.month)
567 d += relativedelta(day=reset_day, hour=23, minute=59, second=59)
568 else:
569 d += relativedelta(
570 months=diff_month, day=reset_day, hour=0, minute=0, second=0
571 )
572 return True, d
573 return False, d
574
575 def proc_day_of_month(d):
576 try:
577 expanded[DAY_FIELD].index("*")
578 except ValueError:
579 days = _last_day_of_month(year, month)
580 if "l" in expanded[DAY_FIELD] and days == d.day:
581 return False, d
582
583 if is_prev:
584 days_in_prev_month = DAYS[(month - 2) % self.MONTHS_IN_YEAR]
585 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month)
586 else:
587 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days)
588
589 if diff_day is not None and diff_day != 0:
590 if is_prev:
591 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
592 else:
593 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
594 return True, d
595 return False, d
596
597 def proc_day_of_week(d):
598 try:
599 expanded[DOW_FIELD].index("*")
600 except ValueError:
601 diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7)
602 if diff_day_of_week is not None and diff_day_of_week != 0:
603 if is_prev:
604 d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59)
605 else:
606 d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
607 return True, d
608 return False, d
609
610 def proc_day_of_week_nth(d):
611 if "*" in nth_weekday_of_month:
612 s = nth_weekday_of_month["*"]
613 for i in range(0, 7):
614 if i in nth_weekday_of_month:
615 nth_weekday_of_month[i].update(s)
616 else:
617 nth_weekday_of_month[i] = s
618 del nth_weekday_of_month["*"]
619
620 candidates = []
621 for wday, nth in nth_weekday_of_month.items():
622 c = self._get_nth_weekday_of_month(d.year, d.month, wday)
623 for n in nth:
624 if n == "l":
625 candidate = c[-1]
626 elif len(c) < n:
627 continue
628 else:
629 candidate = c[n - 1]
630 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
631 candidates.append(candidate)
632
633 if not candidates:
634 if is_prev:
635 d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
636 else:
637 days = _last_day_of_month(year, month)
638 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
639 return True, d
640
641 candidates.sort()
642 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
643 if diff_day != 0:
644 if is_prev:
645 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
646 else:
647 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
648 return True, d
649 return False, d
650
651 def proc_hour(d):
652 try:
653 expanded[HOUR_FIELD].index("*")
654 except ValueError:
655 diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24)
656 if diff_hour is not None and diff_hour != 0:
657 if is_prev:
658 d += relativedelta(hours=diff_hour, minute=59, second=59)
659 else:
660 d += relativedelta(hours=diff_hour, minute=0, second=0)
661 return True, d
662 return False, d
663
664 def proc_minute(d):
665 try:
666 expanded[MINUTE_FIELD].index("*")
667 except ValueError:
668 diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60)
669 if diff_min is not None and diff_min != 0:
670 if is_prev:
671 d += relativedelta(minutes=diff_min, second=59)
672 else:
673 d += relativedelta(minutes=diff_min, second=0)
674 return True, d
675 return False, d
676
677 def proc_second(d):
678 if len(expanded) > UNIX_CRON_LEN:
679 try:
680 expanded[SECOND_FIELD].index("*")
681 except ValueError:
682 diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60)
683 if diff_sec is not None and diff_sec != 0:
684 d += relativedelta(seconds=diff_sec)
685 return True, d
686 else:
687 d += relativedelta(second=0)
688 return False, d
689
690 procs = [
691 proc_year,
692 proc_month,
693 proc_day_of_month,
694 (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week),
695 proc_hour,
696 proc_minute,
697 proc_second,
698 ]
699
700 while abs(year - current_year) <= self._max_years_between_matches:
701 next = False
702 stop = False
703 for proc in procs:
704 (changed, unaware_time) = proc(unaware_time)
705 # `None` can be set mostly for year processing
706 # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff
707 if changed is None:
708 stop = True
709 break
710 if changed:
711 month, year = unaware_time.month, unaware_time.year
712 next = True
713 break
714 if stop:
715 break
716 if next:
717 continue
718
719 unaware_time = unaware_time.replace(microsecond=0)
720 if now.tzinfo is None:
721 return unaware_time
722
723 # Add timezone information back and handle DST changes
724 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
725
726 if not exists and (
727 not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD]
728 ):
729 # The calculated local date does not exist and moving the time forward
730 # to the next valid time isn't the correct solution. Search for the
731 # next matching cron time that exists.
732 while not exists:
733 unaware_time = self._calc(
734 unaware_time, expanded, nth_weekday_of_month, is_prev
735 )
736 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
737
738 offset_delta = _timezone_delta(now, aware_time)
739 if not offset_delta:
740 # There was no DST change.
741 return aware_time
742
743 # There was a DST change. So check if there is a alternative cron time
744 # for the other UTC offset.
745 alternative_unaware_time = now.replace(tzinfo=None) + offset_delta
746 alternative_unaware_time = self._calc(
747 alternative_unaware_time, expanded, nth_weekday_of_month, is_prev
748 )
749 alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev)
750
751 if not _is_successor(alternative_aware_time, now, is_prev):
752 # The alternative time is an ancestor of now. Thus it is not an alternative.
753 return aware_time
754
755 if _is_successor(aware_time, alternative_aware_time, is_prev):
756 return alternative_aware_time
757
758 return aware_time
759
760 if is_prev:
761 raise CroniterBadDateError("failed to find prev date")
762 raise CroniterBadDateError("failed to find next date")
763
764 @staticmethod
765 def _get_next_nearest_diff(x, to_check, range_val):
766 """
767 `range_val` is the range of a field.
768 If no available time, we can move to next loop(like next month).
769 `range_val` can also be set to `None` to indicate that there is no loop.
770 ( Currently, should only used for `year` field )
771 """
772 for i, d in enumerate(to_check):
773 if range_val is not None:
774 if d == "l":
775 # if 'l' then it is the last day of month
776 # => its value of range_val
777 d = range_val
778 elif d > range_val:
779 continue
780 if d >= x:
781 return d - x
782 # When range_val is None and x not exists in to_check,
783 # `None` will be returned to suggest no more available time
784 if range_val is None:
785 return None
786 return to_check[0] - x + range_val
787
788 @staticmethod
789 def _get_prev_nearest_diff(x, to_check, range_val):
790 """
791 `range_val` is the range of a field.
792 If no available time, we can move to previous loop(like previous month).
793 Range_val can also be set to `None` to indicate that there is no loop.
794 ( Currently should only used for `year` field )
795 """
796 candidates = to_check[:]
797 candidates.reverse()
798 for d in candidates:
799 if d != "l" and d <= x:
800 return d - x
801 if "l" in candidates:
802 return -x
803 # When range_val is None and x not exists in to_check,
804 # `None` will be returned to suggest no more available time
805 if range_val is None:
806 return None
807 candidate = candidates[0]
808 for c in candidates:
809 # fixed: c < range_val
810 # this code will reject all 31 day of month, 12 month, 59 second,
811 # 23 hour and so on.
812 # if candidates has just a element, this will not harmful.
813 # but candidates have multiple elements, then values equal to
814 # range_val will rejected.
815 if c <= range_val:
816 candidate = c
817 break
818 # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error
819 # return 2021-03-02 06:00:00
820 if candidate > range_val:
821 return -range_val
822 return candidate - x - range_val
823
824 @staticmethod
825 def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]:
826 """For a given year/month return a list of days in nth-day-of-month order.
827 The last weekday of the month is always [-1].
828 """
829 w = (day_of_week + 6) % 7
830 c = calendar.Calendar(w).monthdayscalendar(year, month)
831 if c[0][0] == 0:
832 c.pop(0)
833 return tuple(i[0] for i in c)
834
835 @classmethod
836 def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN):
837 if isinstance(len_expressions, (list, dict, tuple, set)):
838 len_expressions = len(len_expressions)
839 if val in cls.LOWMAP[field_index] and not (
840 # do not support 0 as a month either for classical 5 fields cron,
841 # 6fields second repeat form or 7 fields year form
842 # but still let conversion happen if day field is shifted
843 (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN)
844 or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN)
845 or (
846 field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD]
847 and len_expressions == YEAR_CRON_LEN
848 )
849 ):
850 val = cls.LOWMAP[field_index][val]
851 return val
852
853 @classmethod
854 def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None):
855 # Split the expression in components, and normalize L -> l, MON -> mon,
856 # etc. Keep expr_format untouched so we can use it in the exception
857 # messages.
858 expr_aliases = {
859 "@midnight": ("0 0 * * *", "h h(0-2) * * * h"),
860 "@hourly": ("0 * * * *", "h * * * * h"),
861 "@daily": ("0 0 * * *", "h h * * * h"),
862 "@weekly": ("0 0 * * 0", "h h * * h h"),
863 "@monthly": ("0 0 1 * *", "h h h * * h"),
864 "@yearly": ("0 0 1 1 *", "h h h h * h"),
865 "@annually": ("0 0 1 1 *", "h h h h * h"),
866 }
867
868 efl = expr_format.lower()
869 hash_id_expr = 1 if hash_id is not None else 0
870 try:
871 efl = expr_aliases[efl][hash_id_expr]
872 except KeyError:
873 pass
874
875 expressions = efl.split()
876
877 if len(expressions) not in VALID_LEN_EXPRESSION:
878 raise CroniterBadCronError(
879 "Exactly 5, 6 or 7 columns has to be specified for iterator expression."
880 )
881
882 if len(expressions) > UNIX_CRON_LEN and second_at_beginning:
883 # move second to it's own(6th) field to process by same logical
884 expressions.insert(SECOND_FIELD, expressions.pop(0))
885
886 expanded = []
887 nth_weekday_of_month = {}
888
889 for field_index, expr in enumerate(expressions):
890 for expanderid, expander in EXPANDERS.items():
891 expr = expander(cls).expand(
892 efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp
893 )
894
895 if "?" in expr:
896 if expr != "?":
897 raise CroniterBadCronError(
898 f"[{expr_format}] is not acceptable."
899 f" Question mark can not used with other characters"
900 )
901 if field_index not in [DAY_FIELD, DOW_FIELD]:
902 raise CroniterBadCronError(
903 f"[{expr_format}] is not acceptable. "
904 f"Question mark can only used in day_of_month or day_of_week"
905 )
906 # currently just trade `?` as `*`
907 expr = "*"
908
909 e_list = expr.split(",")
910 res = []
911
912 while len(e_list) > 0:
913 e = e_list.pop()
914 nth = None
915
916 if field_index == DOW_FIELD:
917 # Handle special case in the dow expression: 2#3, l3
918 special_dow_rem = special_dow_re.match(str(e))
919 if special_dow_rem:
920 g = special_dow_rem.groupdict()
921 he, last = g.get("he", ""), g.get("last", "")
922 if he:
923 e = he
924 try:
925 nth = int(last)
926 assert 5 >= nth >= 1
927 except (KeyError, ValueError, AssertionError):
928 raise CroniterBadCronError(
929 f"[{expr_format}] is not acceptable."
930 f" Invalid day_of_week value: '{nth}'"
931 )
932 elif last:
933 e = last
934 nth = g["pre"] # 'l'
935
936 # Before matching step_search_re, normalize "*" to "{min}-{max}".
937 # Example: in the minute field, "*/5" normalizes to "0-59/5"
938 t = re.sub(
939 r"^\*(\/.+)$",
940 r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]),
941 str(e),
942 )
943 m = step_search_re.search(t)
944
945 if not m:
946 # Before matching step_search_re,
947 # normalize "{start}/{step}" to "{start}-{max}/{step}".
948 # Example: in the minute field, "10/5" normalizes to "10-59/5"
949 t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e))
950 m = step_search_re.search(t)
951
952 if m:
953 # early abort if low/high are out of bounds
954 (low, high, step) = m.group(1), m.group(2), m.group(4) or 1
955 if field_index == DAY_FIELD and high == "l":
956 high = "31"
957
958 if not only_int_re.search(low):
959 low = str(cls._alphaconv(field_index, low, expressions))
960
961 if not only_int_re.search(high):
962 high = str(cls._alphaconv(field_index, high, expressions))
963
964 # normally, it's already guarded by the RE that should not accept
965 # not-int values.
966 if not only_int_re.search(str(step)):
967 raise CroniterBadCronError(
968 f"[{expr_format}] step '{step}'"
969 f" in field {field_index} is not acceptable"
970 )
971 step = int(step)
972
973 for band in low, high:
974 if not only_int_re.search(str(band)):
975 raise CroniterBadCronError(
976 f"[{expr_format}] bands '{low}-{high}'"
977 f" in field {field_index} are not acceptable"
978 )
979
980 low, high = (
981 cls.value_alias(int(_val), field_index, expressions)
982 for _val in (low, high)
983 )
984
985 if max(low, high) > max(
986 cls.RANGES[field_index][0], cls.RANGES[field_index][1]
987 ):
988 raise CroniterBadCronError(f"{expr_format} is out of bands")
989
990 if from_timestamp:
991 low = cls._get_low_from_current_date_number(
992 field_index, int(step), int(from_timestamp)
993 )
994
995 # Handle when the second bound of the range is in backtracking order:
996 # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH
997 if low > high:
998 whole_field_range = list(
999 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1)
1000 )
1001 # Add FirstBound -> ENDRANGE, respecting step
1002 rng = list(range(low, cls.RANGES[field_index][1] + 1, step))
1003 # Then 0 -> SecondBound, but skipping n first occurences according to step
1004 # EG to respect such expressions : Apr-Jan/3
1005 to_skip = 0
1006 if rng:
1007 already_skipped = list(reversed(whole_field_range)).index(rng[-1])
1008 curpos = whole_field_range.index(rng[-1])
1009 if ((curpos + step) > len(whole_field_range)) and (
1010 already_skipped < step
1011 ):
1012 to_skip = step - already_skipped
1013 rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step))
1014 # if we include a range type: Jan-Jan, or Sun-Sun,
1015 # it means the whole cycle (all days of week, # all monthes of year, etc)
1016 elif low == high:
1017 rng = list(
1018 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step)
1019 )
1020 else:
1021 try:
1022 rng = list(range(low, high + 1, step))
1023 except ValueError as exc:
1024 raise CroniterBadCronError(f"invalid range: {exc}")
1025
1026 if field_index == DOW_FIELD and nth and nth != "l":
1027 rng = [f"{item}#{nth}" for item in rng]
1028 e_list += [a for a in rng if a not in e_list]
1029 else:
1030 if t.startswith("-"):
1031 raise CroniterBadCronError(
1032 f"[{expr_format}] is not acceptable, negative numbers not allowed"
1033 )
1034 if not star_or_int_re.search(t):
1035 t = cls._alphaconv(field_index, t, expressions)
1036
1037 try:
1038 t = int(t)
1039 except ValueError:
1040 pass
1041
1042 t = cls.value_alias(t, field_index, expressions)
1043
1044 if t not in ["*", "l"] and (
1045 int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1]
1046 ):
1047 raise CroniterBadCronError(
1048 f"[{expr_format}] is not acceptable, out of range"
1049 )
1050
1051 res.append(t)
1052
1053 if field_index == DOW_FIELD and nth:
1054 if t not in nth_weekday_of_month:
1055 nth_weekday_of_month[t] = set()
1056 nth_weekday_of_month[t].add(nth)
1057
1058 res = set(res)
1059 res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i)
1060 if len(res) == cls.LEN_MEANS_ALL[field_index]:
1061 # Make sure the wildcard is used in the correct way (avoid over-optimization)
1062 if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or (
1063 field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD]
1064 ):
1065 pass
1066 else:
1067 res = ["*"]
1068
1069 expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res)
1070
1071 # Check to make sure the dow combo in use is supported
1072 if nth_weekday_of_month:
1073 dow_expanded_set = set(expanded[DOW_FIELD])
1074 dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys())
1075 dow_expanded_set.discard("*")
1076 # Skip: if it's all weeks instead of wildcard
1077 if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]:
1078 raise CroniterUnsupportedSyntaxError(
1079 f"day-of-week field does not support mixing literal values and nth"
1080 f" day of week syntax. Cron: '{expr_format}'"
1081 f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}"
1082 )
1083
1084 EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] = expressions
1085 return expanded, nth_weekday_of_month
1086
1087 @classmethod
1088 def expand(
1089 cls,
1090 expr_format: str,
1091 hash_id: Optional[Union[bytes, str]] = None,
1092 second_at_beginning: bool = False,
1093 from_timestamp: Optional[float] = None,
1094 ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]:
1095 """
1096 Expand a cron expression format into a noramlized format of
1097 list[list[int | 'l' | '*']]. The first list representing each element
1098 of the epxression, and each sub-list representing the allowed values
1099 for that expression component.
1100
1101 A tuple is returned, the first value being the expanded epxression
1102 list, and the second being a `nth_weekday_of_month` mapping.
1103
1104 Examples:
1105
1106 # Every minute
1107 >>> croniter.expand('* * * * *')
1108 ([['*'], ['*'], ['*'], ['*'], ['*']], {})
1109
1110 # On the hour
1111 >>> croniter.expand('0 0 * * *')
1112 ([[0], [0], ['*'], ['*'], ['*']], {})
1113
1114 # Hours 0-5 and 10 monday through friday
1115 >>> croniter.expand('0-5,10 * * * mon-fri')
1116 ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {})
1117
1118 Note that some special values such as nth day of week are expanded to a
1119 special mapping format for later processing:
1120
1121 # Every minute on the 3rd tuesday of the month
1122 >>> croniter.expand('* * * * 2#3')
1123 ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}})
1124
1125 # Every hour on the last day of the month
1126 >>> croniter.expand('0 * l * *')
1127 ([[0], ['*'], ['l'], ['*'], ['*']], {})
1128
1129 # On the hour every 15 seconds
1130 >>> croniter.expand('0 0 * * * */15')
1131 ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {})
1132 """
1133 try:
1134 return cls._expand(
1135 expr_format,
1136 hash_id=hash_id,
1137 second_at_beginning=second_at_beginning,
1138 from_timestamp=from_timestamp,
1139 )
1140 except (ValueError,) as exc:
1141 if isinstance(exc, CroniterError):
1142 raise
1143 trace = _traceback.format_exc()
1144 raise CroniterBadCronError(trace)
1145
1146 @classmethod
1147 def _get_low_from_current_date_number(cls, field_index, step, from_timestamp):
1148 dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT)
1149 if field_index == MINUTE_FIELD:
1150 return dt.minute % step
1151 if field_index == HOUR_FIELD:
1152 return dt.hour % step
1153 if field_index == DAY_FIELD:
1154 return ((dt.day - 1) % step) + 1
1155 if field_index == MONTH_FIELD:
1156 return dt.month % step
1157 if field_index == DOW_FIELD:
1158 return (dt.weekday() + 1) % step
1159
1160 raise ValueError("Can't get current date number for index larger than 4")
1161
1162 @classmethod
1163 def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False):
1164 if hash_id:
1165 if not isinstance(hash_id, (bytes, str)):
1166 raise TypeError("hash_id must be bytes or UTF-8 string")
1167 if not isinstance(hash_id, bytes):
1168 hash_id = hash_id.encode(encoding)
1169 try:
1170 cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning)
1171 except CroniterError:
1172 return False
1173 return True
1174
1175 @classmethod
1176 def match(cls, cron_expression, testdate, day_or=True, second_at_beginning=False):
1177 return cls.match_range(cron_expression, testdate, testdate, day_or, second_at_beginning)
1178
1179 @classmethod
1180 def match_range(
1181 cls, cron_expression, from_datetime, to_datetime, day_or=True, second_at_beginning=False
1182 ):
1183 cron = cls(
1184 cron_expression,
1185 to_datetime,
1186 ret_type=datetime.datetime,
1187 day_or=day_or,
1188 second_at_beginning=second_at_beginning,
1189 )
1190 tdp = cron.get_current(datetime.datetime)
1191 if not tdp.microsecond:
1192 tdp += relativedelta(microseconds=1)
1193 cron.set_current(tdp, force=True)
1194 try:
1195 tdt = cron.get_prev()
1196 except CroniterBadDateError:
1197 return False
1198 precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60
1199 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds
1200 return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second
1201
1202
1203def croniter_range(
1204 start,
1205 stop,
1206 expr_format,
1207 ret_type=None,
1208 day_or=True,
1209 exclude_ends=False,
1210 _croniter=None,
1211 second_at_beginning=False,
1212 expand_from_start_time=False,
1213):
1214 """
1215 Generator that provides all times from start to stop matching the given cron expression.
1216 If the cron expression matches either 'start' and/or 'stop', those times will be returned as
1217 well unless 'exclude_ends=True' is passed.
1218
1219 You can think of this function as sibling to the builtin range function for datetime objects.
1220 Like range(start,stop,step), except that here 'step' is a cron expression.
1221 """
1222 _croniter = _croniter or croniter
1223 auto_rt = datetime.datetime
1224 # type is used in first if branch for perfs reasons
1225 if type(start) is not type(stop) and not (
1226 isinstance(start, type(stop)) or isinstance(stop, type(start))
1227 ):
1228 raise CroniterBadTypeRangeError(
1229 f"The start and stop must be same type. {type(start)} != {type(stop)}"
1230 )
1231 if isinstance(start, (float, int)):
1232 start, stop = (
1233 datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop)
1234 )
1235 auto_rt = float
1236 if ret_type is None:
1237 ret_type = auto_rt
1238 if not exclude_ends:
1239 ms1 = relativedelta(microseconds=1)
1240 if start < stop: # Forward (normal) time order
1241 start -= ms1
1242 stop += ms1
1243 else: # Reverse time order
1244 start += ms1
1245 stop -= ms1
1246 year_span = math.floor(abs(stop.year - start.year)) + 1
1247 ic = _croniter(
1248 expr_format,
1249 start,
1250 ret_type=datetime.datetime,
1251 day_or=day_or,
1252 max_years_between_matches=year_span,
1253 second_at_beginning=second_at_beginning,
1254 expand_from_start_time=expand_from_start_time,
1255 )
1256 # define a continue (cont) condition function and step function for the main while loop
1257 if start < stop: # Forward
1258
1259 def cont(v):
1260 return v < stop
1261
1262 step = ic.get_next
1263 else: # Reverse
1264
1265 def cont(v):
1266 return v > stop
1267
1268 step = ic.get_prev
1269 try:
1270 dt = step()
1271 while cont(dt):
1272 if ret_type is float:
1273 yield ic.get_current(float)
1274 else:
1275 yield dt
1276 dt = step()
1277 except CroniterBadDateError:
1278 # Stop iteration when this exception is raised; no match found within the given year range
1279 return
1280
1281
1282class HashExpander:
1283 def __init__(self, cronit):
1284 self.cron = cronit
1285
1286 def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None):
1287 """Return a hashed/random integer given range/hash information"""
1288 if range_end is None:
1289 range_end = self.cron.RANGES[idx][1]
1290 if range_begin is None:
1291 range_begin = self.cron.RANGES[idx][0]
1292 if hash_type == "r":
1293 crc = random.randint(0, 0xFFFFFFFF)
1294 else:
1295 crc = binascii.crc32(hash_id) & 0xFFFFFFFF
1296 return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin
1297
1298 def match(self, efl, idx, expr, hash_id=None, **kw):
1299 return hash_expression_re.match(expr)
1300
1301 def expand(self, efl, idx, expr, hash_id=None, match="", **kw):
1302 """Expand a hashed/random expression to its normal representation"""
1303 if match == "":
1304 match = self.match(efl, idx, expr, hash_id, **kw)
1305 if not match:
1306 return expr
1307 m = match.groupdict()
1308
1309 if m["hash_type"] == "h" and hash_id is None:
1310 raise CroniterBadCronError("Hashed definitions must include hash_id")
1311
1312 if m["range_begin"] and m["range_end"]:
1313 if int(m["range_begin"]) >= int(m["range_end"]):
1314 raise CroniterBadCronError("Range end must be greater than range begin")
1315
1316 if m["range_begin"] and m["range_end"] and m["divisor"]:
1317 # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54)
1318 if int(m["divisor"]) == 0:
1319 raise CroniterBadCronError(f"Bad expression: {expr}")
1320
1321 x = self.do(
1322 idx,
1323 hash_type=m["hash_type"],
1324 hash_id=hash_id,
1325 range_begin=int(m["range_begin"]),
1326 range_end=int(m["divisor"]) - 1 + int(m["range_begin"]),
1327 )
1328 return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}"
1329 if m["range_begin"] and m["range_end"]:
1330 # Example: H(0-29) -> 12
1331 return str(
1332 self.do(
1333 idx,
1334 hash_type=m["hash_type"],
1335 hash_id=hash_id,
1336 range_end=int(m["range_end"]),
1337 range_begin=int(m["range_begin"]),
1338 )
1339 )
1340 if m["divisor"]:
1341 # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52)
1342 if int(m["divisor"]) == 0:
1343 raise CroniterBadCronError(f"Bad expression: {expr}")
1344
1345 x = self.do(
1346 idx,
1347 hash_type=m["hash_type"],
1348 hash_id=hash_id,
1349 range_begin=self.cron.RANGES[idx][0],
1350 range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0],
1351 )
1352 return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}"
1353
1354 # Example: H -> 32
1355 return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id))
1356
1357
1358EXPANDERS = {"hash": HashExpander}