Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/croniter/croniter.py: 79%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2import binascii
3import calendar
4import copy
5import datetime
6import math
7import platform
8import random
9import re
10import struct
11import sys
12import traceback as _traceback
13from time import time
14from typing import Any, Literal, Optional, Union
16from dateutil.relativedelta import relativedelta
17from dateutil.tz import datetime_exists, tzutc
19ExpandedExpression = list[Union[int, Literal["*", "l"]]]
22def is_32bit() -> bool:
23 """
24 Detect if Python is running in 32-bit mode.
25 Returns True if running on 32-bit Python, False for 64-bit.
26 """
27 # Method 1: Check pointer size
28 bits = struct.calcsize("P") * 8
30 # Method 2: Check platform architecture string
31 try:
32 architecture = platform.architecture()[0]
33 except RuntimeError:
34 architecture = None
36 # Method 3: Check maxsize
37 is_small_maxsize = sys.maxsize <= 2**32
39 # Evaluate all available methods
40 is_32 = False
42 if bits == 32:
43 is_32 = True
44 elif architecture and "32" in architecture:
45 is_32 = True
46 elif is_small_maxsize:
47 is_32 = True
49 return is_32
52try:
53 # https://github.com/python/cpython/issues/101069 detection
54 if is_32bit():
55 datetime.datetime.fromtimestamp(3999999999)
56 OVERFLOW32B_MODE = False
57except OverflowError:
58 OVERFLOW32B_MODE = True
61UTC_DT = datetime.timezone.utc
62EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT)
64M_ALPHAS: dict[str, Union[int, str]] = {
65 "jan": 1,
66 "feb": 2,
67 "mar": 3,
68 "apr": 4,
69 "may": 5,
70 "jun": 6,
71 "jul": 7,
72 "aug": 8,
73 "sep": 9,
74 "oct": 10,
75 "nov": 11,
76 "dec": 12,
77}
78DOW_ALPHAS: dict[str, Union[int, str]] = {
79 "sun": 0,
80 "mon": 1,
81 "tue": 2,
82 "wed": 3,
83 "thu": 4,
84 "fri": 5,
85 "sat": 6,
86}
88MINUTE_FIELD = 0
89HOUR_FIELD = 1
90DAY_FIELD = 2
91MONTH_FIELD = 3
92DOW_FIELD = 4
93SECOND_FIELD = 5
94YEAR_FIELD = 6
96UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD)
97SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD)
98YEAR_FIELDS = (
99 MINUTE_FIELD,
100 HOUR_FIELD,
101 DAY_FIELD,
102 MONTH_FIELD,
103 DOW_FIELD,
104 SECOND_FIELD,
105 YEAR_FIELD,
106)
108step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$")
109only_int_re = re.compile(r"^\d+$")
111DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
112WEEKDAYS = "|".join(DOW_ALPHAS.keys())
113MONTHS = "|".join(M_ALPHAS.keys())
114star_or_int_re = re.compile(r"^(\d+|\*)$")
115special_dow_re = re.compile(
116 rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)"
117 rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$"
118)
119nearest_weekday_re = re.compile(r"^(?:(\d+)w|w(\d+))$")
120re_star = re.compile("[*]")
121hash_expression_re = re.compile(
122 r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$"
123)
125CRON_FIELDS = {
126 "unix": UNIX_FIELDS,
127 "second": SECOND_FIELDS,
128 "year": YEAR_FIELDS,
129 len(UNIX_FIELDS): UNIX_FIELDS,
130 len(SECOND_FIELDS): SECOND_FIELDS,
131 len(YEAR_FIELDS): YEAR_FIELDS,
132}
133UNIX_CRON_LEN = len(UNIX_FIELDS)
134SECOND_CRON_LEN = len(SECOND_FIELDS)
135YEAR_CRON_LEN = len(YEAR_FIELDS)
136# retrocompat
137VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)}
139MARKER = object()
142def datetime_to_timestamp(d):
143 if d.tzinfo is not None:
144 d = d.replace(tzinfo=None) - d.utcoffset()
146 return (d - datetime.datetime(1970, 1, 1)).total_seconds()
149def _is_leap(year: int) -> bool:
150 return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0)
153def _last_day_of_month(year: int, month: int) -> int:
154 """Calculate the last day of the given month (honor leap years)."""
155 last_day = DAYS[month - 1]
156 if month == 2 and _is_leap(year):
157 last_day += 1
158 return last_day
161def _is_successor(
162 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
163) -> bool:
164 """Check if the given date is a successor (after/before) of the previous date."""
165 if is_prev:
166 return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT)
167 return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT)
170def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta:
171 """Calculate the timezone difference of the given dates."""
172 offset1 = date1.utcoffset()
173 offset2 = date2.utcoffset()
174 assert offset1 is not None
175 assert offset2 is not None
176 return offset2 - offset1
179def _add_tzinfo(
180 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
181) -> tuple[datetime.datetime, bool]:
182 """Add the tzinfo from the previous date to the given date.
184 In case the new date is ambiguous, determine the correct date
185 based on it being closer to the previous date but still a successor
186 (after/before based on `is_prev`).
188 In case the date does not exist, jump forward to the next existing date.
189 """
190 localize = getattr(previous_date.tzinfo, "localize", None)
191 if localize is not None:
192 # pylint: disable-next=import-outside-toplevel
193 import pytz
195 try:
196 result = localize(date, is_dst=None)
197 except pytz.NonExistentTimeError:
198 while True:
199 date += datetime.timedelta(minutes=1)
200 try:
201 result = localize(date, is_dst=None)
202 except pytz.NonExistentTimeError:
203 continue
204 break
205 return result, False
206 except pytz.AmbiguousTimeError:
207 closer = localize(date, is_dst=not is_prev)
208 farther = localize(date, is_dst=is_prev)
209 # TODO: Check negative DST
210 assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
211 if _is_successor(closer, previous_date, is_prev):
212 result = closer
213 else:
214 assert _is_successor(farther, previous_date, is_prev)
215 result = farther
216 return result, True
218 result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo)
219 if not datetime_exists(result):
220 while not datetime_exists(result):
221 result += datetime.timedelta(minutes=1)
222 return result, False
224 # result is closer to the previous date
225 farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo)
226 # Comparing the UTC offsets in the check for the date being ambiguous.
227 if result.utcoffset() != farther.utcoffset():
228 # TODO: Check negative DST
229 assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
230 if not _is_successor(result, previous_date, is_prev):
231 assert _is_successor(farther, previous_date, is_prev)
232 result = farther
233 return result, True
236class CroniterError(ValueError):
237 """General top-level Croniter base exception"""
240class CroniterBadTypeRangeError(TypeError):
241 """."""
244class CroniterBadCronError(CroniterError):
245 """Syntax, unknown value, or range error within a cron expression"""
248class CroniterUnsupportedSyntaxError(CroniterBadCronError):
249 """Valid cron syntax, but likely to produce inaccurate results"""
251 # Extending CroniterBadCronError, which may be contridatory, but this allows
252 # catching both errors with a single exception. From a user perspective
253 # these will likely be handled the same way.
256class CroniterBadDateError(CroniterError):
257 """Unable to find next/prev timestamp match"""
260class CroniterNotAlphaError(CroniterBadCronError):
261 """Cron syntax contains an invalid day or month abbreviation"""
264class croniter:
265 MONTHS_IN_YEAR = 12
267 # This helps with expanding `*` fields into `lower-upper` ranges. Each item
268 # in this tuple maps to the corresponding field index
269 RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099))
271 ALPHACONV: tuple[dict[str, Union[int, str]], ...] = (
272 {}, # 0: min
273 {}, # 1: hour
274 {"l": "l"}, # 2: dom
275 # 3: mon
276 copy.deepcopy(M_ALPHAS),
277 # 4: dow
278 copy.deepcopy(DOW_ALPHAS),
279 # 5: second
280 {},
281 # 6: year
282 {},
283 )
285 LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {})
287 LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130)
289 def __init__(
290 self,
291 expr_format: str,
292 start_time: Optional[Union[datetime.datetime, float]] = None,
293 ret_type: type = float,
294 day_or: bool = True,
295 max_years_between_matches: Optional[int] = None,
296 is_prev: bool = False,
297 hash_id: Optional[Union[bytes, str]] = None,
298 implement_cron_bug: bool = False,
299 second_at_beginning: bool = False,
300 expand_from_start_time: bool = False,
301 ) -> None:
302 self._ret_type = ret_type
303 self._day_or = day_or
304 self._implement_cron_bug = implement_cron_bug
305 self.second_at_beginning = bool(second_at_beginning)
306 self._expand_from_start_time = expand_from_start_time
308 if hash_id is not None:
309 if not isinstance(hash_id, (bytes, str)):
310 raise TypeError("hash_id must be bytes or UTF-8 string")
311 if not isinstance(hash_id, bytes):
312 hash_id = hash_id.encode("UTF-8")
314 self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None
315 if max_years_between_matches is None:
316 max_years_between_matches = 50
317 self._max_years_between_matches = max(int(max_years_between_matches), 1)
319 if start_time is None:
320 start_time = time()
322 self.tzinfo: Optional[datetime.tzinfo] = None
324 self.start_time = 0.0
325 self.dst_start_time = 0.0
326 self.cur = 0.0
327 self.set_current(start_time, force=True)
329 self.expanded, self.nth_weekday_of_month, self.expressions, self.nearest_weekday = self._expand(
330 expr_format,
331 hash_id=hash_id,
332 from_timestamp=self.dst_start_time if self._expand_from_start_time else None,
333 second_at_beginning=second_at_beginning,
334 )
335 self.fields = CRON_FIELDS[len(self.expanded)]
336 self._is_prev = is_prev
338 @classmethod
339 def _alphaconv(cls, index, key, expressions):
340 try:
341 return cls.ALPHACONV[index][key]
342 except KeyError:
343 raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable")
345 def get_next(self, ret_type=None, start_time=None, update_current=True):
346 if start_time and self._expand_from_start_time:
347 raise ValueError(
348 "start_time is not supported when using expand_from_start_time = True."
349 )
350 return self._get_next(
351 ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current
352 )
354 def get_prev(self, ret_type=None, start_time=None, update_current=True):
355 return self._get_next(
356 ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current
357 )
359 def get_current(self, ret_type=None):
360 ret_type = ret_type or self._ret_type
361 if issubclass(ret_type, datetime.datetime):
362 return self.timestamp_to_datetime(self.cur)
363 return self.cur
365 def set_current(
366 self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True
367 ) -> float:
368 if (force or (self.cur is None)) and start_time is not None:
369 if isinstance(start_time, datetime.datetime):
370 self.tzinfo = start_time.tzinfo
371 start_time = self.datetime_to_timestamp(start_time)
373 self.start_time = start_time
374 self.dst_start_time = start_time
375 self.cur = start_time
376 return self.cur
378 @staticmethod
379 def datetime_to_timestamp(d: datetime.datetime) -> float:
380 """
381 Converts a `datetime` object `d` into a UNIX timestamp.
382 """
383 return datetime_to_timestamp(d)
385 _datetime_to_timestamp = datetime_to_timestamp # retrocompat
387 def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime:
388 """
389 Converts a UNIX `timestamp` into a `datetime` object.
390 """
391 if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set
392 tzinfo = self.tzinfo
393 if OVERFLOW32B_MODE:
394 # degraded mode to workaround Y2038
395 # see https://github.com/python/cpython/issues/101069
396 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp)
397 else:
398 result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None)
399 if tzinfo:
400 result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo)
401 return result
403 _timestamp_to_datetime = timestamp_to_datetime # retrocompat
405 def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None):
406 if update_current is None:
407 update_current = True
408 self.set_current(start_time, force=True)
409 if is_prev is None:
410 is_prev = self._is_prev
411 self._is_prev = is_prev
413 ret_type = ret_type or self._ret_type
415 if not issubclass(ret_type, (float, datetime.datetime)):
416 raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.")
418 result = self._calc_next(is_prev)
419 timestamp = self.datetime_to_timestamp(result)
420 if update_current:
421 self.cur = timestamp
422 if issubclass(ret_type, datetime.datetime):
423 return result
424 return timestamp
426 # iterator protocol, to enable direct use of croniter
427 # objects in a loop, like "for dt in croniter("5 0 * * *'): ..."
428 # or for combining multiple croniters into single
429 # dates feed using 'itertools' module
430 def all_next(self, ret_type=None, start_time=None, update_current=None):
431 """
432 Returns a generator yielding consecutive dates.
434 May be used instead of an implicit call to __iter__ whenever a
435 non-default `ret_type` needs to be specified.
436 """
437 # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could
438 # be used instead
439 try:
440 while True:
441 self._is_prev = False
442 yield self._get_next(
443 ret_type=ret_type, start_time=start_time, update_current=update_current
444 )
445 start_time = None
446 except CroniterBadDateError:
447 if self._max_years_btw_matches_explicitly_set:
448 return
449 raise
451 def all_prev(self, ret_type=None, start_time=None, update_current=None):
452 """
453 Returns a generator yielding previous dates.
454 """
455 try:
456 while True:
457 self._is_prev = True
458 yield self._get_next(
459 ret_type=ret_type, start_time=start_time, update_current=update_current
460 )
461 start_time = None
462 except CroniterBadDateError:
463 if self._max_years_btw_matches_explicitly_set:
464 return
465 raise
467 def iter(self, *args, **kwargs):
468 return self.all_prev if self._is_prev else self.all_next
470 def __iter__(self):
471 return self
473 __next__ = next = _get_next
475 def _calc_next(self, is_prev: bool) -> datetime.datetime:
476 current = self.timestamp_to_datetime(self.cur)
477 expanded = self.expanded[:]
478 nth_weekday_of_month = self.nth_weekday_of_month.copy()
480 # exception to support day of month and day of week as defined in cron
481 if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or:
482 # If requested, handle a bug in vixie cron/ISC cron where day_of_month and
483 # day_of_week form an intersection (AND) instead of a union (OR) if either
484 # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html)
485 if self._implement_cron_bug and (
486 re_star.match(self.expressions[DAY_FIELD])
487 or re_star.match(self.expressions[DOW_FIELD])
488 ):
489 # To produce a schedule identical to the cron bug, we'll bypass the code
490 # that makes a union of DOM and DOW, and instead skip to the code that
491 # does an intersect instead
492 pass
493 else:
494 bak = expanded[DOW_FIELD]
495 expanded[DOW_FIELD] = ["*"]
496 t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
497 expanded[DOW_FIELD] = bak
498 expanded[DAY_FIELD] = ["*"]
500 t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
501 if is_prev:
502 return t1 if t1 > t2 else t2
503 return t1 if t1 < t2 else t2
505 return self._calc(current, expanded, nth_weekday_of_month, is_prev)
507 def _calc(
508 self,
509 now: datetime.datetime,
510 expanded: list[ExpandedExpression],
511 nth_weekday_of_month: dict[int, set[int]],
512 is_prev: bool,
513 ) -> datetime.datetime:
514 if is_prev:
515 nearest_diff_method = self._get_prev_nearest_diff
516 offset = relativedelta(microseconds=-1)
517 else:
518 nearest_diff_method = self._get_next_nearest_diff
519 if len(expanded) > UNIX_CRON_LEN:
520 offset = relativedelta(seconds=1)
521 else:
522 offset = relativedelta(minutes=1)
523 # Calculate the next cron time in local time a.k.a. timezone unaware time.
524 unaware_time = now.replace(tzinfo=None) + offset
525 if len(expanded) > UNIX_CRON_LEN:
526 unaware_time = unaware_time.replace(microsecond=0)
527 else:
528 unaware_time = unaware_time.replace(second=0, microsecond=0)
530 month = unaware_time.month
531 year = current_year = unaware_time.year
533 def proc_year(d):
534 if len(expanded) == YEAR_CRON_LEN:
535 try:
536 expanded[YEAR_FIELD].index("*")
537 except ValueError:
538 # use None as range_val to indicate no loop
539 diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None)
540 if diff_year is None:
541 return None, d
542 if diff_year != 0:
543 if is_prev:
544 d += relativedelta(
545 years=diff_year, month=12, day=31, hour=23, minute=59, second=59
546 )
547 else:
548 d += relativedelta(
549 years=diff_year, month=1, day=1, hour=0, minute=0, second=0
550 )
551 return True, d
552 return False, d
554 def proc_month(d):
555 try:
556 expanded[MONTH_FIELD].index("*")
557 except ValueError:
558 diff_month = nearest_diff_method(
559 d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR
560 )
561 reset_day = 1
563 if diff_month is not None and diff_month != 0:
564 if is_prev:
565 d += relativedelta(months=diff_month)
566 reset_day = _last_day_of_month(d.year, d.month)
567 d += relativedelta(day=reset_day, hour=23, minute=59, second=59)
568 else:
569 d += relativedelta(
570 months=diff_month, day=reset_day, hour=0, minute=0, second=0
571 )
572 return True, d
573 return False, d
575 def proc_day_of_month(d):
576 try:
577 expanded[DAY_FIELD].index("*")
578 except ValueError:
579 days = _last_day_of_month(year, month)
580 if "l" in expanded[DAY_FIELD] and days == d.day:
581 return False, d
583 if is_prev:
584 prev_month = (month - 2) % self.MONTHS_IN_YEAR + 1
585 prev_year = year - 1 if month == 1 else year
586 days_in_prev_month = _last_day_of_month(prev_year, prev_month)
587 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month)
588 else:
589 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days)
591 if diff_day is not None and diff_day != 0:
592 if is_prev:
593 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
594 else:
595 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
596 return True, d
597 return False, d
599 def proc_day_of_week(d):
600 try:
601 expanded[DOW_FIELD].index("*")
602 except ValueError:
603 diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7)
604 if diff_day_of_week is not None and diff_day_of_week != 0:
605 if is_prev:
606 d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59)
607 else:
608 d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
609 return True, d
610 return False, d
612 def proc_day_of_week_nth(d):
613 if "*" in nth_weekday_of_month:
614 s = nth_weekday_of_month["*"]
615 for i in range(0, 7):
616 if i in nth_weekday_of_month:
617 nth_weekday_of_month[i].update(s)
618 else:
619 nth_weekday_of_month[i] = s
620 del nth_weekday_of_month["*"]
622 candidates = []
623 for wday, nth in nth_weekday_of_month.items():
624 c = self._get_nth_weekday_of_month(d.year, d.month, wday)
625 for n in nth:
626 if n == "l":
627 candidate = c[-1]
628 elif len(c) < n:
629 continue
630 else:
631 candidate = c[n - 1]
632 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
633 candidates.append(candidate)
635 if not candidates:
636 if is_prev:
637 d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
638 else:
639 days = _last_day_of_month(year, month)
640 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
641 return True, d
643 candidates.sort()
644 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
645 if diff_day != 0:
646 if is_prev:
647 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
648 else:
649 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
650 return True, d
651 return False, d
653 def proc_nearest_weekday(d):
654 """Process W (nearest weekday) day-of-month entries."""
655 candidates = []
656 for w_day in self.nearest_weekday:
657 candidate = self._get_nearest_weekday(d.year, d.month, w_day)
658 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
659 candidates.append(candidate)
661 if not candidates:
662 if is_prev:
663 d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
664 else:
665 days = _last_day_of_month(year, month)
666 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
667 return True, d
669 candidates.sort()
670 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
671 if diff_day != 0:
672 if is_prev:
673 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
674 else:
675 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
676 return True, d
677 return False, d
679 def proc_hour(d):
680 try:
681 expanded[HOUR_FIELD].index("*")
682 except ValueError:
683 diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24)
684 if diff_hour is not None and diff_hour != 0:
685 if is_prev:
686 d += relativedelta(hours=diff_hour, minute=59, second=59)
687 else:
688 d += relativedelta(hours=diff_hour, minute=0, second=0)
689 return True, d
690 return False, d
692 def proc_minute(d):
693 try:
694 expanded[MINUTE_FIELD].index("*")
695 except ValueError:
696 diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60)
697 if diff_min is not None and diff_min != 0:
698 if is_prev:
699 d += relativedelta(minutes=diff_min, second=59)
700 else:
701 d += relativedelta(minutes=diff_min, second=0)
702 return True, d
703 return False, d
705 def proc_second(d):
706 if len(expanded) > UNIX_CRON_LEN:
707 try:
708 expanded[SECOND_FIELD].index("*")
709 except ValueError:
710 diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60)
711 if diff_sec is not None and diff_sec != 0:
712 d += relativedelta(seconds=diff_sec)
713 return True, d
714 else:
715 d += relativedelta(second=0)
716 return False, d
718 procs = [
719 proc_year,
720 proc_month,
721 (proc_nearest_weekday if self.nearest_weekday else proc_day_of_month),
722 (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week),
723 proc_hour,
724 proc_minute,
725 proc_second,
726 ]
728 while abs(year - current_year) <= self._max_years_between_matches:
729 next = False
730 stop = False
731 for proc in procs:
732 (changed, unaware_time) = proc(unaware_time)
733 # `None` can be set mostly for year processing
734 # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff
735 if changed is None:
736 stop = True
737 break
738 if changed:
739 month, year = unaware_time.month, unaware_time.year
740 next = True
741 break
742 if stop:
743 break
744 if next:
745 continue
747 unaware_time = unaware_time.replace(microsecond=0)
748 if now.tzinfo is None:
749 return unaware_time
751 # Add timezone information back and handle DST changes
752 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
754 if not exists and (
755 not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD]
756 ):
757 # The calculated local date does not exist and moving the time forward
758 # to the next valid time isn't the correct solution. Search for the
759 # next matching cron time that exists.
760 while not exists:
761 unaware_time = self._calc(
762 unaware_time, expanded, nth_weekday_of_month, is_prev
763 )
764 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
766 offset_delta = _timezone_delta(now, aware_time)
767 if not offset_delta:
768 # There was no DST change.
769 return aware_time
771 # There was a DST change. So check if there is a alternative cron time
772 # for the other UTC offset.
773 alternative_unaware_time = now.replace(tzinfo=None) + offset_delta
774 alternative_unaware_time = self._calc(
775 alternative_unaware_time, expanded, nth_weekday_of_month, is_prev
776 )
777 alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev)
779 if not _is_successor(alternative_aware_time, now, is_prev):
780 # The alternative time is an ancestor of now. Thus it is not an alternative.
781 return aware_time
783 if _is_successor(aware_time, alternative_aware_time, is_prev):
784 return alternative_aware_time
786 return aware_time
788 if is_prev:
789 raise CroniterBadDateError("failed to find prev date")
790 raise CroniterBadDateError("failed to find next date")
792 @staticmethod
793 def _get_next_nearest_diff(x, to_check, range_val):
794 """
795 `range_val` is the range of a field.
796 If no available time, we can move to next loop(like next month).
797 `range_val` can also be set to `None` to indicate that there is no loop.
798 ( Currently, should only used for `year` field )
799 """
800 for i, d in enumerate(to_check):
801 if range_val is not None:
802 if d == "l":
803 # if 'l' then it is the last day of month
804 # => its value of range_val
805 d = range_val
806 elif d > range_val:
807 continue
808 if d >= x:
809 return d - x
810 # When range_val is None and x not exists in to_check,
811 # `None` will be returned to suggest no more available time
812 if range_val is None:
813 return None
814 return to_check[0] - x + range_val
816 @staticmethod
817 def _get_prev_nearest_diff(x, to_check, range_val):
818 """
819 `range_val` is the range of a field.
820 If no available time, we can move to previous loop(like previous month).
821 Range_val can also be set to `None` to indicate that there is no loop.
822 ( Currently should only used for `year` field )
823 """
824 candidates = to_check[:]
825 candidates.reverse()
826 for d in candidates:
827 if d != "l" and d <= x:
828 return d - x
829 if "l" in candidates:
830 return -x
831 # When range_val is None and x not exists in to_check,
832 # `None` will be returned to suggest no more available time
833 if range_val is None:
834 return None
835 candidate = candidates[0]
836 for c in candidates:
837 # fixed: c < range_val
838 # this code will reject all 31 day of month, 12 month, 59 second,
839 # 23 hour and so on.
840 # if candidates has just a element, this will not harmful.
841 # but candidates have multiple elements, then values equal to
842 # range_val will rejected.
843 if c <= range_val:
844 candidate = c
845 break
846 # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error
847 # return 2021-03-02 06:00:00
848 if candidate > range_val:
849 return -range_val
850 return candidate - x - range_val
852 @staticmethod
853 def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]:
854 """For a given year/month return a list of days in nth-day-of-month order.
855 The last weekday of the month is always [-1].
856 """
857 w = (day_of_week + 6) % 7
858 c = calendar.Calendar(w).monthdayscalendar(year, month)
859 if c[0][0] == 0:
860 c.pop(0)
861 return tuple(i[0] for i in c)
863 @staticmethod
864 def _get_nearest_weekday(year, month, day):
865 """Get the nearest weekday (Mon-Fri) to the given day in the given month.
867 Rules:
868 - If the day is a weekday, return it.
869 - If Saturday, return Friday (day-1), unless that crosses into previous month,
870 then return Monday (day+2).
871 - If Sunday, return Monday (day+1), unless that crosses into next month,
872 then return Friday (day-2).
873 """
874 last_day = _last_day_of_month(year, month)
875 day = min(day, last_day)
876 weekday = calendar.weekday(year, month, day) # 0=Mon, 6=Sun
877 if weekday < 5: # Mon-Fri
878 return day
879 if weekday == 5: # Saturday
880 if day > 1:
881 return day - 1 # Friday
882 else:
883 return day + 2 # Monday (1st is Sat, so 3rd is Mon)
884 # Sunday
885 if day < last_day:
886 return day + 1 # Monday
887 else:
888 return day - 2 # Friday (last day is Sun, go back to Fri)
890 @classmethod
891 def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN):
892 if isinstance(len_expressions, (list, dict, tuple, set)):
893 len_expressions = len(len_expressions)
894 if val in cls.LOWMAP[field_index] and not (
895 # do not support 0 as a month either for classical 5 fields cron,
896 # 6fields second repeat form or 7 fields year form
897 # but still let conversion happen if day field is shifted
898 (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN)
899 or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN)
900 or (
901 field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD]
902 and len_expressions == YEAR_CRON_LEN
903 )
904 ):
905 val = cls.LOWMAP[field_index][val]
906 return val
908 # Maximum days in each month (non-leap year for Feb)
909 DAYS_IN_MONTH = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
911 @classmethod
912 def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None, strict=False, strict_year=None):
913 # Split the expression in components, and normalize L -> l, MON -> mon,
914 # etc. Keep expr_format untouched so we can use it in the exception
915 # messages.
916 expr_aliases = {
917 "@midnight": ("0 0 * * *", "h h(0-2) * * * h"),
918 "@hourly": ("0 * * * *", "h * * * * h"),
919 "@daily": ("0 0 * * *", "h h * * * h"),
920 "@weekly": ("0 0 * * 0", "h h * * h h"),
921 "@monthly": ("0 0 1 * *", "h h h * * h"),
922 "@yearly": ("0 0 1 1 *", "h h h h * h"),
923 "@annually": ("0 0 1 1 *", "h h h h * h"),
924 }
926 efl = expr_format.lower()
927 hash_id_expr = 1 if hash_id is not None else 0
928 try:
929 efl = expr_aliases[efl][hash_id_expr]
930 except KeyError:
931 pass
933 expressions = efl.split()
935 if len(expressions) not in VALID_LEN_EXPRESSION:
936 raise CroniterBadCronError(
937 "Exactly 5, 6 or 7 columns has to be specified for iterator expression."
938 )
940 if len(expressions) > UNIX_CRON_LEN and second_at_beginning:
941 # move second to it's own(6th) field to process by same logical
942 expressions.insert(SECOND_FIELD, expressions.pop(0))
944 expanded = []
945 nth_weekday_of_month = {}
946 nearest_weekday = set()
948 for field_index, expr in enumerate(expressions):
949 for expanderid, expander in EXPANDERS.items():
950 expr = expander(cls).expand(
951 efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp
952 )
954 if "?" in expr:
955 if expr != "?":
956 raise CroniterBadCronError(
957 f"[{expr_format}] is not acceptable."
958 f" Question mark can not used with other characters"
959 )
960 if field_index not in [DAY_FIELD, DOW_FIELD]:
961 raise CroniterBadCronError(
962 f"[{expr_format}] is not acceptable. "
963 f"Question mark can only used in day_of_month or day_of_week"
964 )
965 # currently just trade `?` as `*`
966 expr = "*"
968 e_list = expr.split(",")
969 res = []
970 seen = set()
972 while len(e_list) > 0:
973 e = e_list.pop()
974 nth = None
976 if field_index == DOW_FIELD:
977 # Handle special case in the dow expression: 2#3, l3
978 special_dow_rem = special_dow_re.match(str(e))
979 if special_dow_rem:
980 g = special_dow_rem.groupdict()
981 he, last = g.get("he", ""), g.get("last", "")
982 if he:
983 e = he
984 try:
985 nth = int(last)
986 assert 5 >= nth >= 1
987 except (KeyError, ValueError, AssertionError):
988 raise CroniterBadCronError(
989 f"[{expr_format}] is not acceptable."
990 f" Invalid day_of_week value: '{nth}'"
991 )
992 elif last:
993 e = last
994 nth = g["pre"] # 'l'
996 if field_index == DAY_FIELD:
997 # Handle W (nearest weekday) in day-of-month: 15w, w15
998 w_match = nearest_weekday_re.match(str(e))
999 if w_match:
1000 w_day = int(w_match.group(1) or w_match.group(2))
1001 if w_day < 1 or w_day > 31:
1002 raise CroniterBadCronError(
1003 f"[{expr_format}] is not acceptable,"
1004 f" nearest weekday day value '{w_day}' out of range"
1005 )
1006 if len(e_list) > 0 or len(res) > 0:
1007 raise CroniterBadCronError(
1008 f"[{expr_format}] is not acceptable."
1009 f" 'W' can only be used with a single day value,"
1010 f" not in a list or range"
1011 )
1012 nearest_weekday.add(w_day)
1013 res.append(w_day)
1014 continue
1016 # Before matching step_search_re, normalize "*" to "{min}-{max}".
1017 # Example: in the minute field, "*/5" normalizes to "0-59/5"
1018 t = re.sub(
1019 r"^\*(\/.+)$",
1020 r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]),
1021 str(e),
1022 )
1023 m = step_search_re.search(t)
1025 if not m:
1026 # Before matching step_search_re,
1027 # normalize "{start}/{step}" to "{start}-{max}/{step}".
1028 # Example: in the minute field, "10/5" normalizes to "10-59/5"
1029 t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e))
1030 m = step_search_re.search(t)
1032 if m:
1033 # early abort if low/high are out of bounds
1034 (low, high, step) = m.group(1), m.group(2), m.group(4) or 1
1035 if field_index == DAY_FIELD and high == "l":
1036 high = "31"
1038 if not only_int_re.search(low):
1039 low = str(cls._alphaconv(field_index, low, expressions))
1041 if not only_int_re.search(high):
1042 high = str(cls._alphaconv(field_index, high, expressions))
1044 # normally, it's already guarded by the RE that should not accept
1045 # not-int values.
1046 if not only_int_re.search(str(step)):
1047 raise CroniterBadCronError(
1048 f"[{expr_format}] step '{step}'"
1049 f" in field {field_index} is not acceptable"
1050 )
1051 step = int(step)
1053 for band in low, high:
1054 if not only_int_re.search(str(band)):
1055 raise CroniterBadCronError(
1056 f"[{expr_format}] bands '{low}-{high}'"
1057 f" in field {field_index} are not acceptable"
1058 )
1060 low, high = (
1061 cls.value_alias(int(_val), field_index, expressions)
1062 for _val in (low, high)
1063 )
1065 if max(low, high) > max(
1066 cls.RANGES[field_index][0], cls.RANGES[field_index][1]
1067 ):
1068 raise CroniterBadCronError(f"{expr_format} is out of bands")
1070 if from_timestamp:
1071 low = cls._get_low_from_current_date_number(
1072 field_index, int(step), int(from_timestamp)
1073 )
1075 # Handle when the second bound of the range is in backtracking order:
1076 # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH
1077 if low > high:
1078 whole_field_range = list(
1079 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1)
1080 )
1081 # Add FirstBound -> ENDRANGE, respecting step
1082 rng = list(range(low, cls.RANGES[field_index][1] + 1, step))
1083 # Then 0 -> SecondBound, but skipping n first occurences according to step
1084 # EG to respect such expressions : Apr-Jan/3
1085 to_skip = 0
1086 if rng:
1087 already_skipped = list(reversed(whole_field_range)).index(rng[-1])
1088 curpos = whole_field_range.index(rng[-1])
1089 if ((curpos + step) > len(whole_field_range)) and (
1090 already_skipped < step
1091 ):
1092 to_skip = step - already_skipped
1093 rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step))
1094 # if we include a range type: Jan-Jan, or Sun-Sun,
1095 # it means the whole cycle (all days of week, # all monthes of year, etc)
1096 elif low == high:
1097 rng = list(
1098 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step)
1099 )
1100 else:
1101 try:
1102 rng = list(range(low, high + 1, step))
1103 except ValueError as exc:
1104 raise CroniterBadCronError(f"invalid range: {exc}")
1106 if field_index == DOW_FIELD and nth and nth != "l":
1107 rng = [f"{item}#{nth}" for item in rng]
1108 e_list += [a for a in rng if a not in seen]
1109 seen.update(rng)
1110 else:
1111 if t.startswith("-"):
1112 raise CroniterBadCronError(
1113 f"[{expr_format}] is not acceptable, negative numbers not allowed"
1114 )
1115 if not star_or_int_re.search(t):
1116 t = cls._alphaconv(field_index, t, expressions)
1118 try:
1119 t = int(t)
1120 except ValueError:
1121 pass
1123 t = cls.value_alias(t, field_index, expressions)
1125 if t not in ["*", "l"] and (
1126 int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1]
1127 ):
1128 raise CroniterBadCronError(
1129 f"[{expr_format}] is not acceptable, out of range"
1130 )
1132 res.append(t)
1134 if field_index == DOW_FIELD and nth:
1135 if t not in nth_weekday_of_month:
1136 nth_weekday_of_month[t] = set()
1137 nth_weekday_of_month[t].add(nth)
1139 res = set(res)
1140 res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i)
1141 if len(res) == cls.LEN_MEANS_ALL[field_index]:
1142 # Make sure the wildcard is used in the correct way (avoid over-optimization)
1143 if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or (
1144 field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD]
1145 ):
1146 pass
1147 else:
1148 res = ["*"]
1150 expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res)
1152 # Check to make sure the dow combo in use is supported
1153 if nth_weekday_of_month:
1154 dow_expanded_set = set(expanded[DOW_FIELD])
1155 dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys())
1156 dow_expanded_set.discard("*")
1157 # Skip: if it's all weeks instead of wildcard
1158 if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]:
1159 raise CroniterUnsupportedSyntaxError(
1160 f"day-of-week field does not support mixing literal values and nth"
1161 f" day of week syntax. Cron: '{expr_format}'"
1162 f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}"
1163 )
1165 if strict:
1166 # Cross-validate day-of-month against month (and optionally year)
1167 # to reject impossible combinations like "0 0 31 2 *" (Feb 31st).
1168 days = expanded[DAY_FIELD]
1169 months = expanded[MONTH_FIELD]
1170 if days != ["*"] and days != ["l"] and months != ["*"]:
1171 int_days = [d for d in days if isinstance(d, int)]
1172 int_months = [m for m in months if isinstance(m, int)]
1173 if int_days and int_months:
1174 # Determine max days per month, accounting for leap years
1175 days_in_month = dict(cls.DAYS_IN_MONTH)
1176 if 2 in int_months:
1177 has_leap_year = True # assume possible by default
1178 if strict_year is not None:
1179 # Year explicitly provided as parameter
1180 if isinstance(strict_year, int):
1181 has_leap_year = calendar.isleap(strict_year)
1182 else:
1183 has_leap_year = any(calendar.isleap(y) for y in strict_year)
1184 elif len(expanded) > YEAR_FIELD:
1185 years = expanded[YEAR_FIELD]
1186 if years != ["*"]:
1187 int_years = [y for y in years if isinstance(y, int)]
1188 if int_years:
1189 has_leap_year = any(calendar.isleap(y) for y in int_years)
1190 if has_leap_year:
1191 days_in_month[2] = 29
1192 min_day = min(int_days)
1193 max_possible = max(days_in_month[m] for m in int_months)
1194 if min_day > max_possible:
1195 raise CroniterBadCronError(
1196 f"[{expr_format}] is not acceptable. Day(s) {int_days}"
1197 f" can never occur in month(s) {int_months}"
1198 )
1200 return expanded, nth_weekday_of_month, expressions, nearest_weekday
1202 @classmethod
1203 def expand(
1204 cls,
1205 expr_format: str,
1206 hash_id: Optional[Union[bytes, str]] = None,
1207 second_at_beginning: bool = False,
1208 from_timestamp: Optional[float] = None,
1209 strict: bool = False,
1210 strict_year: Optional[Union[int, list[int]]] = None,
1211 ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]:
1212 """
1213 Expand a cron expression format into a noramlized format of
1214 list[list[int | 'l' | '*']]. The first list representing each element
1215 of the epxression, and each sub-list representing the allowed values
1216 for that expression component.
1218 A tuple is returned, the first value being the expanded epxression
1219 list, and the second being a `nth_weekday_of_month` mapping.
1221 Examples:
1223 # Every minute
1224 >>> croniter.expand('* * * * *')
1225 ([['*'], ['*'], ['*'], ['*'], ['*']], {})
1227 # On the hour
1228 >>> croniter.expand('0 0 * * *')
1229 ([[0], [0], ['*'], ['*'], ['*']], {})
1231 # Hours 0-5 and 10 monday through friday
1232 >>> croniter.expand('0-5,10 * * * mon-fri')
1233 ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {})
1235 Note that some special values such as nth day of week are expanded to a
1236 special mapping format for later processing:
1238 # Every minute on the 3rd tuesday of the month
1239 >>> croniter.expand('* * * * 2#3')
1240 ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}})
1242 # Every hour on the last day of the month
1243 >>> croniter.expand('0 * l * *')
1244 ([[0], ['*'], ['l'], ['*'], ['*']], {})
1246 # On the hour every 15 seconds
1247 >>> croniter.expand('0 0 * * * */15')
1248 ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {})
1249 """
1250 try:
1251 expanded, nth_weekday_of_month, _expressions, _nearest_weekday = cls._expand(
1252 expr_format,
1253 hash_id=hash_id,
1254 second_at_beginning=second_at_beginning,
1255 from_timestamp=from_timestamp,
1256 strict=strict,
1257 strict_year=strict_year,
1258 )
1259 return expanded, nth_weekday_of_month
1260 except (ValueError,) as exc:
1261 if isinstance(exc, CroniterError):
1262 raise
1263 trace = _traceback.format_exc()
1264 raise CroniterBadCronError(trace)
1266 @classmethod
1267 def _get_low_from_current_date_number(cls, field_index, step, from_timestamp):
1268 dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT)
1269 if field_index == MINUTE_FIELD:
1270 return dt.minute % step
1271 if field_index == HOUR_FIELD:
1272 return dt.hour % step
1273 if field_index == DAY_FIELD:
1274 return ((dt.day - 1) % step) + 1
1275 if field_index == MONTH_FIELD:
1276 return dt.month % step
1277 if field_index == DOW_FIELD:
1278 return (dt.weekday() + 1) % step
1280 raise ValueError("Can't get current date number for index larger than 4")
1282 @classmethod
1283 def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False, strict=False, strict_year=None):
1284 if hash_id:
1285 if not isinstance(hash_id, (bytes, str)):
1286 raise TypeError("hash_id must be bytes or UTF-8 string")
1287 if not isinstance(hash_id, bytes):
1288 hash_id = hash_id.encode(encoding)
1289 try:
1290 cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning, strict=strict, strict_year=strict_year)
1291 except CroniterError:
1292 return False
1293 return True
1295 @classmethod
1296 def match(
1297 cls,
1298 cron_expression,
1299 testdate,
1300 day_or=True,
1301 second_at_beginning=False,
1302 precision_in_seconds=None,
1303 ):
1304 return cls.match_range(
1305 cron_expression, testdate, testdate, day_or, second_at_beginning, precision_in_seconds
1306 )
1308 @classmethod
1309 def match_range(
1310 cls,
1311 cron_expression,
1312 from_datetime,
1313 to_datetime,
1314 day_or=True,
1315 second_at_beginning=False,
1316 precision_in_seconds=None,
1317 ):
1318 cron = cls(
1319 cron_expression,
1320 to_datetime,
1321 ret_type=datetime.datetime,
1322 day_or=day_or,
1323 second_at_beginning=second_at_beginning,
1324 )
1325 tdp = cron.get_current(datetime.datetime)
1326 if not tdp.microsecond:
1327 tdp += relativedelta(microseconds=1)
1328 cron.set_current(tdp, force=True)
1329 try:
1330 tdt = cron.get_prev()
1331 except CroniterBadDateError:
1332 return False
1333 if precision_in_seconds is None:
1334 precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60
1335 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds
1336 return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second
1339def croniter_range(
1340 start,
1341 stop,
1342 expr_format,
1343 ret_type=None,
1344 day_or=True,
1345 exclude_ends=False,
1346 _croniter=None,
1347 second_at_beginning=False,
1348 expand_from_start_time=False,
1349):
1350 """
1351 Generator that provides all times from start to stop matching the given cron expression.
1352 If the cron expression matches either 'start' and/or 'stop', those times will be returned as
1353 well unless 'exclude_ends=True' is passed.
1355 You can think of this function as sibling to the builtin range function for datetime objects.
1356 Like range(start,stop,step), except that here 'step' is a cron expression.
1357 """
1358 _croniter = _croniter or croniter
1359 auto_rt = datetime.datetime
1360 # type is used in first if branch for perfs reasons
1361 if type(start) is not type(stop) and not (
1362 isinstance(start, type(stop)) or isinstance(stop, type(start))
1363 ):
1364 raise CroniterBadTypeRangeError(
1365 f"The start and stop must be same type. {type(start)} != {type(stop)}"
1366 )
1367 if isinstance(start, (float, int)):
1368 start, stop = (
1369 datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop)
1370 )
1371 auto_rt = float
1372 if ret_type is None:
1373 ret_type = auto_rt
1374 if not exclude_ends:
1375 ms1 = relativedelta(microseconds=1)
1376 if start < stop: # Forward (normal) time order
1377 start -= ms1
1378 stop += ms1
1379 else: # Reverse time order
1380 start += ms1
1381 stop -= ms1
1382 year_span = math.floor(abs(stop.year - start.year)) + 1
1383 ic = _croniter(
1384 expr_format,
1385 start,
1386 ret_type=datetime.datetime,
1387 day_or=day_or,
1388 max_years_between_matches=year_span,
1389 second_at_beginning=second_at_beginning,
1390 expand_from_start_time=expand_from_start_time,
1391 )
1392 # define a continue (cont) condition function and step function for the main while loop
1393 if start < stop: # Forward
1395 def cont(v):
1396 return v < stop
1398 step = ic.get_next
1399 else: # Reverse
1401 def cont(v):
1402 return v > stop
1404 step = ic.get_prev
1405 try:
1406 dt = step()
1407 while cont(dt):
1408 if ret_type is float:
1409 yield ic.get_current(float)
1410 else:
1411 yield dt
1412 dt = step()
1413 except CroniterBadDateError:
1414 # Stop iteration when this exception is raised; no match found within the given year range
1415 return
1418class HashExpander:
1419 def __init__(self, cronit):
1420 self.cron = cronit
1422 def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None):
1423 """Return a hashed/random integer given range/hash information"""
1424 if range_end is None:
1425 range_end = self.cron.RANGES[idx][1]
1426 if range_begin is None:
1427 range_begin = self.cron.RANGES[idx][0]
1428 if hash_type == "r":
1429 crc = random.randint(0, 0xFFFFFFFF)
1430 else:
1431 crc = binascii.crc32(hash_id) & 0xFFFFFFFF
1432 return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin
1434 def match(self, efl, idx, expr, hash_id=None, **kw):
1435 return hash_expression_re.match(expr)
1437 def expand(self, efl, idx, expr, hash_id=None, match="", **kw):
1438 """Expand a hashed/random expression to its normal representation"""
1439 if match == "":
1440 match = self.match(efl, idx, expr, hash_id, **kw)
1441 if not match:
1442 return expr
1443 m = match.groupdict()
1445 if m["hash_type"] == "h" and hash_id is None:
1446 raise CroniterBadCronError("Hashed definitions must include hash_id")
1448 if m["range_begin"] and m["range_end"]:
1449 if int(m["range_begin"]) >= int(m["range_end"]):
1450 raise CroniterBadCronError("Range end must be greater than range begin")
1452 if m["range_begin"] and m["range_end"] and m["divisor"]:
1453 # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54)
1454 if int(m["divisor"]) == 0:
1455 raise CroniterBadCronError(f"Bad expression: {expr}")
1457 x = self.do(
1458 idx,
1459 hash_type=m["hash_type"],
1460 hash_id=hash_id,
1461 range_begin=int(m["range_begin"]),
1462 range_end=int(m["divisor"]) - 1 + int(m["range_begin"]),
1463 )
1464 return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}"
1465 if m["range_begin"] and m["range_end"]:
1466 # Example: H(0-29) -> 12
1467 return str(
1468 self.do(
1469 idx,
1470 hash_type=m["hash_type"],
1471 hash_id=hash_id,
1472 range_end=int(m["range_end"]),
1473 range_begin=int(m["range_begin"]),
1474 )
1475 )
1476 if m["divisor"]:
1477 # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52)
1478 if int(m["divisor"]) == 0:
1479 raise CroniterBadCronError(f"Bad expression: {expr}")
1481 x = self.do(
1482 idx,
1483 hash_type=m["hash_type"],
1484 hash_id=hash_id,
1485 range_begin=self.cron.RANGES[idx][0],
1486 range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0],
1487 )
1488 return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}"
1490 # Example: H -> 32
1491 return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id))
1494EXPANDERS = {"hash": HashExpander}