Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/croniter/croniter.py: 78%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
2import binascii
3import calendar
4import copy
5import datetime
6import math
7import platform
8import random
9import re
10import struct
11import sys
12import traceback as _traceback
13from time import time
14from typing import Any, Literal, Optional, Union
16from dateutil.relativedelta import relativedelta
17from dateutil.tz import datetime_exists, tzutc
19ExpandedExpression = list[Union[int, Literal["*", "l"]]]
22def is_32bit() -> bool:
23 """
24 Detect if Python is running in 32-bit mode.
25 Returns True if running on 32-bit Python, False for 64-bit.
26 """
27 # Method 1: Check pointer size
28 bits = struct.calcsize("P") * 8
30 # Method 2: Check platform architecture string
31 try:
32 architecture = platform.architecture()[0]
33 except RuntimeError:
34 architecture = None
36 # Method 3: Check maxsize
37 is_small_maxsize = sys.maxsize <= 2**32
39 # Evaluate all available methods
40 is_32 = False
42 if bits == 32:
43 is_32 = True
44 elif architecture and "32" in architecture:
45 is_32 = True
46 elif is_small_maxsize:
47 is_32 = True
49 return is_32
52try:
53 # https://github.com/python/cpython/issues/101069 detection
54 if is_32bit():
55 datetime.datetime.fromtimestamp(3999999999)
56 OVERFLOW32B_MODE = False
57except OverflowError:
58 OVERFLOW32B_MODE = True
61UTC_DT = datetime.timezone.utc
62EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT)
64M_ALPHAS: dict[str, Union[int, str]] = {
65 "jan": 1,
66 "feb": 2,
67 "mar": 3,
68 "apr": 4,
69 "may": 5,
70 "jun": 6,
71 "jul": 7,
72 "aug": 8,
73 "sep": 9,
74 "oct": 10,
75 "nov": 11,
76 "dec": 12,
77}
78DOW_ALPHAS: dict[str, Union[int, str]] = {
79 "sun": 0,
80 "mon": 1,
81 "tue": 2,
82 "wed": 3,
83 "thu": 4,
84 "fri": 5,
85 "sat": 6,
86}
88MINUTE_FIELD = 0
89HOUR_FIELD = 1
90DAY_FIELD = 2
91MONTH_FIELD = 3
92DOW_FIELD = 4
93SECOND_FIELD = 5
94YEAR_FIELD = 6
96UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD)
97SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD)
98YEAR_FIELDS = (
99 MINUTE_FIELD,
100 HOUR_FIELD,
101 DAY_FIELD,
102 MONTH_FIELD,
103 DOW_FIELD,
104 SECOND_FIELD,
105 YEAR_FIELD,
106)
108step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$")
109only_int_re = re.compile(r"^\d+$")
111DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
112WEEKDAYS = "|".join(DOW_ALPHAS.keys())
113MONTHS = "|".join(M_ALPHAS.keys())
114star_or_int_re = re.compile(r"^(\d+|\*)$")
115special_dow_re = re.compile(
116 rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)"
117 rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$"
118)
119nearest_weekday_re = re.compile(r"^(?:(\d+)w|w(\d+))$")
120re_star = re.compile("[*]")
121hash_expression_re = re.compile(
122 r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$"
123)
125CRON_FIELDS = {
126 "unix": UNIX_FIELDS,
127 "second": SECOND_FIELDS,
128 "year": YEAR_FIELDS,
129 len(UNIX_FIELDS): UNIX_FIELDS,
130 len(SECOND_FIELDS): SECOND_FIELDS,
131 len(YEAR_FIELDS): YEAR_FIELDS,
132}
133UNIX_CRON_LEN = len(UNIX_FIELDS)
134SECOND_CRON_LEN = len(SECOND_FIELDS)
135YEAR_CRON_LEN = len(YEAR_FIELDS)
136# retrocompat
137VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)}
139MARKER = object()
142def datetime_to_timestamp(d):
143 if d.tzinfo is not None:
144 d = d.replace(tzinfo=None) - d.utcoffset()
146 return (d - datetime.datetime(1970, 1, 1)).total_seconds()
149def _is_leap(year: int) -> bool:
150 return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0)
153def _last_day_of_month(year: int, month: int) -> int:
154 """Calculate the last day of the given month (honor leap years)."""
155 last_day = DAYS[month - 1]
156 if month == 2 and _is_leap(year):
157 last_day += 1
158 return last_day
161def _is_successor(
162 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
163) -> bool:
164 """Check if the given date is a successor (after/before) of the previous date."""
165 if is_prev:
166 return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT)
167 return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT)
170def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta:
171 """Calculate the timezone difference of the given dates."""
172 offset1 = date1.utcoffset()
173 offset2 = date2.utcoffset()
174 assert offset1 is not None
175 assert offset2 is not None
176 return offset2 - offset1
179def _add_tzinfo(
180 date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
181) -> tuple[datetime.datetime, bool]:
182 """Add the tzinfo from the previous date to the given date.
184 In case the new date is ambiguous, determine the correct date
185 based on it being closer to the previous date but still a successor
186 (after/before based on `is_prev`).
188 In case the date does not exist, jump forward to the next existing date.
189 """
190 localize = getattr(previous_date.tzinfo, "localize", None)
191 if localize is not None:
192 # pylint: disable-next=import-outside-toplevel
193 import pytz
195 try:
196 result = localize(date, is_dst=None)
197 except pytz.NonExistentTimeError:
198 while True:
199 date += datetime.timedelta(minutes=1)
200 try:
201 result = localize(date, is_dst=None)
202 except pytz.NonExistentTimeError:
203 continue
204 break
205 return result, False
206 except pytz.AmbiguousTimeError:
207 closer = localize(date, is_dst=not is_prev)
208 farther = localize(date, is_dst=is_prev)
209 # TODO: Check negative DST
210 assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
211 if _is_successor(closer, previous_date, is_prev):
212 result = closer
213 else:
214 assert _is_successor(farther, previous_date, is_prev)
215 result = farther
216 return result, True
218 result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo)
219 if not datetime_exists(result):
220 while not datetime_exists(result):
221 result += datetime.timedelta(minutes=1)
222 return result, False
224 # result is closer to the previous date
225 farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo)
226 # Comparing the UTC offsets in the check for the date being ambiguous.
227 if result.utcoffset() != farther.utcoffset():
228 # TODO: Check negative DST
229 assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
230 if not _is_successor(result, previous_date, is_prev):
231 assert _is_successor(farther, previous_date, is_prev)
232 result = farther
233 return result, True
236class CroniterError(ValueError):
237 """General top-level Croniter base exception"""
240class CroniterBadTypeRangeError(TypeError):
241 """."""
244class CroniterBadCronError(CroniterError):
245 """Syntax, unknown value, or range error within a cron expression"""
248class CroniterUnsupportedSyntaxError(CroniterBadCronError):
249 """Valid cron syntax, but likely to produce inaccurate results"""
251 # Extending CroniterBadCronError, which may be contridatory, but this allows
252 # catching both errors with a single exception. From a user perspective
253 # these will likely be handled the same way.
256class CroniterBadDateError(CroniterError):
257 """Unable to find next/prev timestamp match"""
260class CroniterNotAlphaError(CroniterBadCronError):
261 """Cron syntax contains an invalid day or month abbreviation"""
264class croniter:
265 MONTHS_IN_YEAR = 12
267 # This helps with expanding `*` fields into `lower-upper` ranges. Each item
268 # in this tuple maps to the corresponding field index
269 RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099))
271 ALPHACONV: tuple[dict[str, Union[int, str]], ...] = (
272 {}, # 0: min
273 {}, # 1: hour
274 {"l": "l"}, # 2: dom
275 # 3: mon
276 copy.deepcopy(M_ALPHAS),
277 # 4: dow
278 copy.deepcopy(DOW_ALPHAS),
279 # 5: second
280 {},
281 # 6: year
282 {},
283 )
285 LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {})
287 LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130)
289 def __init__(
290 self,
291 expr_format: str,
292 start_time: Optional[Union[datetime.datetime, float]] = None,
293 ret_type: type = float,
294 day_or: bool = True,
295 max_years_between_matches: Optional[int] = None,
296 is_prev: bool = False,
297 hash_id: Optional[Union[bytes, str]] = None,
298 implement_cron_bug: bool = False,
299 second_at_beginning: bool = False,
300 expand_from_start_time: bool = False,
301 ) -> None:
302 self._ret_type = ret_type
303 self._day_or = day_or
304 self._implement_cron_bug = implement_cron_bug
305 self.second_at_beginning = bool(second_at_beginning)
306 self._expand_from_start_time = expand_from_start_time
308 if hash_id is not None:
309 if not isinstance(hash_id, (bytes, str)):
310 raise TypeError("hash_id must be bytes or UTF-8 string")
311 if not isinstance(hash_id, bytes):
312 hash_id = hash_id.encode("UTF-8")
314 self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None
315 if max_years_between_matches is None:
316 max_years_between_matches = 50
317 self._max_years_between_matches = max(int(max_years_between_matches), 1)
319 if start_time is None:
320 start_time = time()
322 self.tzinfo: Optional[datetime.tzinfo] = None
324 self.start_time = 0.0
325 self.dst_start_time = 0.0
326 self.cur = 0.0
327 self.set_current(start_time, force=True)
329 self.expanded, self.nth_weekday_of_month, self.expressions, self.nearest_weekday = self._expand(
330 expr_format,
331 hash_id=hash_id,
332 from_timestamp=self.dst_start_time if self._expand_from_start_time else None,
333 second_at_beginning=second_at_beginning,
334 )
335 self.fields = CRON_FIELDS[len(self.expanded)]
336 self._is_prev = is_prev
338 @classmethod
339 def _alphaconv(cls, index, key, expressions):
340 try:
341 return cls.ALPHACONV[index][key]
342 except KeyError:
343 raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable")
345 def get_next(self, ret_type=None, start_time=None, update_current=True):
346 if start_time and self._expand_from_start_time:
347 raise ValueError(
348 "start_time is not supported when using expand_from_start_time = True."
349 )
350 return self._get_next(
351 ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current
352 )
354 def get_prev(self, ret_type=None, start_time=None, update_current=True):
355 return self._get_next(
356 ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current
357 )
359 def get_current(self, ret_type=None):
360 ret_type = ret_type or self._ret_type
361 if issubclass(ret_type, datetime.datetime):
362 return self.timestamp_to_datetime(self.cur)
363 return self.cur
365 def set_current(
366 self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True
367 ) -> float:
368 if (force or (self.cur is None)) and start_time is not None:
369 if isinstance(start_time, datetime.datetime):
370 self.tzinfo = start_time.tzinfo
371 start_time = self.datetime_to_timestamp(start_time)
373 self.start_time = start_time
374 self.dst_start_time = start_time
375 self.cur = start_time
376 return self.cur
378 @staticmethod
379 def datetime_to_timestamp(d: datetime.datetime) -> float:
380 """
381 Converts a `datetime` object `d` into a UNIX timestamp.
382 """
383 return datetime_to_timestamp(d)
385 _datetime_to_timestamp = datetime_to_timestamp # retrocompat
387 def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime:
388 """
389 Converts a UNIX `timestamp` into a `datetime` object.
390 """
391 if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set
392 tzinfo = self.tzinfo
393 if OVERFLOW32B_MODE:
394 # degraded mode to workaround Y2038
395 # see https://github.com/python/cpython/issues/101069
396 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp)
397 else:
398 result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None)
399 if tzinfo:
400 result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo)
401 return result
403 _timestamp_to_datetime = timestamp_to_datetime # retrocompat
405 def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None):
406 if update_current is None:
407 update_current = True
408 self.set_current(start_time, force=True)
409 if is_prev is None:
410 is_prev = self._is_prev
411 self._is_prev = is_prev
413 ret_type = ret_type or self._ret_type
415 if not issubclass(ret_type, (float, datetime.datetime)):
416 raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.")
418 result = self._calc_next(is_prev)
419 timestamp = self.datetime_to_timestamp(result)
420 if update_current:
421 self.cur = timestamp
422 if issubclass(ret_type, datetime.datetime):
423 return result
424 return timestamp
426 # iterator protocol, to enable direct use of croniter
427 # objects in a loop, like "for dt in croniter("5 0 * * *'): ..."
428 # or for combining multiple croniters into single
429 # dates feed using 'itertools' module
430 def all_next(self, ret_type=None, start_time=None, update_current=None):
431 """
432 Returns a generator yielding consecutive dates.
434 May be used instead of an implicit call to __iter__ whenever a
435 non-default `ret_type` needs to be specified.
436 """
437 # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could
438 # be used instead
439 try:
440 while True:
441 self._is_prev = False
442 yield self._get_next(
443 ret_type=ret_type, start_time=start_time, update_current=update_current
444 )
445 start_time = None
446 except CroniterBadDateError:
447 if self._max_years_btw_matches_explicitly_set:
448 return
449 raise
451 def all_prev(self, ret_type=None, start_time=None, update_current=None):
452 """
453 Returns a generator yielding previous dates.
454 """
455 try:
456 while True:
457 self._is_prev = True
458 yield self._get_next(
459 ret_type=ret_type, start_time=start_time, update_current=update_current
460 )
461 start_time = None
462 except CroniterBadDateError:
463 if self._max_years_btw_matches_explicitly_set:
464 return
465 raise
467 def iter(self, *args, **kwargs):
468 return self.all_prev if self._is_prev else self.all_next
470 def __iter__(self):
471 return self
473 __next__ = next = _get_next
475 def _calc_next(self, is_prev: bool) -> datetime.datetime:
476 current = self.timestamp_to_datetime(self.cur)
477 expanded = self.expanded[:]
478 nth_weekday_of_month = self.nth_weekday_of_month.copy()
480 # exception to support day of month and day of week as defined in cron
481 if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or:
482 # If requested, handle a bug in vixie cron/ISC cron where day_of_month and
483 # day_of_week form an intersection (AND) instead of a union (OR) if either
484 # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html)
485 if self._implement_cron_bug and (
486 re_star.match(self.expressions[DAY_FIELD])
487 or re_star.match(self.expressions[DOW_FIELD])
488 ):
489 # To produce a schedule identical to the cron bug, we'll bypass the code
490 # that makes a union of DOM and DOW, and instead skip to the code that
491 # does an intersect instead
492 pass
493 else:
494 bak = expanded[DOW_FIELD]
495 expanded[DOW_FIELD] = ["*"]
496 t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
497 expanded[DOW_FIELD] = bak
498 expanded[DAY_FIELD] = ["*"]
500 t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
501 if is_prev:
502 return t1 if t1 > t2 else t2
503 return t1 if t1 < t2 else t2
505 return self._calc(current, expanded, nth_weekday_of_month, is_prev)
507 def _calc(
508 self,
509 now: datetime.datetime,
510 expanded: list[ExpandedExpression],
511 nth_weekday_of_month: dict[int, set[int]],
512 is_prev: bool,
513 ) -> datetime.datetime:
514 if is_prev:
515 nearest_diff_method = self._get_prev_nearest_diff
516 offset = relativedelta(microseconds=-1)
517 else:
518 nearest_diff_method = self._get_next_nearest_diff
519 if len(expanded) > UNIX_CRON_LEN:
520 offset = relativedelta(seconds=1)
521 else:
522 offset = relativedelta(minutes=1)
523 # Calculate the next cron time in local time a.k.a. timezone unaware time.
524 unaware_time = now.replace(tzinfo=None) + offset
525 if len(expanded) > UNIX_CRON_LEN:
526 unaware_time = unaware_time.replace(microsecond=0)
527 else:
528 unaware_time = unaware_time.replace(second=0, microsecond=0)
530 month = unaware_time.month
531 year = current_year = unaware_time.year
533 def proc_year(d):
534 if len(expanded) == YEAR_CRON_LEN:
535 try:
536 expanded[YEAR_FIELD].index("*")
537 except ValueError:
538 # use None as range_val to indicate no loop
539 diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None)
540 if diff_year is None:
541 return None, d
542 if diff_year != 0:
543 if is_prev:
544 d += relativedelta(
545 years=diff_year, month=12, day=31, hour=23, minute=59, second=59
546 )
547 else:
548 d += relativedelta(
549 years=diff_year, month=1, day=1, hour=0, minute=0, second=0
550 )
551 return True, d
552 return False, d
554 def proc_month(d):
555 try:
556 expanded[MONTH_FIELD].index("*")
557 except ValueError:
558 diff_month = nearest_diff_method(
559 d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR
560 )
561 reset_day = 1
563 if diff_month is not None and diff_month != 0:
564 if is_prev:
565 d += relativedelta(months=diff_month)
566 reset_day = _last_day_of_month(d.year, d.month)
567 d += relativedelta(day=reset_day, hour=23, minute=59, second=59)
568 else:
569 d += relativedelta(
570 months=diff_month, day=reset_day, hour=0, minute=0, second=0
571 )
572 return True, d
573 return False, d
575 def proc_day_of_month(d):
576 try:
577 expanded[DAY_FIELD].index("*")
578 except ValueError:
579 days = _last_day_of_month(year, month)
580 if "l" in expanded[DAY_FIELD] and days == d.day:
581 return False, d
583 if is_prev:
584 prev_month = (month - 2) % self.MONTHS_IN_YEAR + 1
585 prev_year = year - 1 if month == 1 else year
586 days_in_prev_month = _last_day_of_month(prev_year, prev_month)
587 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month)
588 else:
589 diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days)
591 if diff_day is not None and diff_day != 0:
592 if is_prev:
593 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
594 else:
595 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
596 return True, d
597 return False, d
599 def proc_day_of_week(d):
600 try:
601 expanded[DOW_FIELD].index("*")
602 except ValueError:
603 diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7)
604 if diff_day_of_week is not None and diff_day_of_week != 0:
605 if is_prev:
606 d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59)
607 else:
608 d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
609 return True, d
610 return False, d
612 def proc_day_of_week_nth(d):
613 if "*" in nth_weekday_of_month:
614 s = nth_weekday_of_month["*"]
615 for i in range(0, 7):
616 if i in nth_weekday_of_month:
617 nth_weekday_of_month[i].update(s)
618 else:
619 nth_weekday_of_month[i] = s
620 del nth_weekday_of_month["*"]
622 candidates = []
623 for wday, nth in nth_weekday_of_month.items():
624 c = self._get_nth_weekday_of_month(d.year, d.month, wday)
625 for n in nth:
626 if n == "l":
627 candidate = c[-1]
628 elif len(c) < n:
629 continue
630 else:
631 candidate = c[n - 1]
632 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
633 candidates.append(candidate)
635 if not candidates:
636 if is_prev:
637 d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
638 else:
639 days = _last_day_of_month(year, month)
640 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
641 return True, d
643 candidates.sort()
644 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
645 if diff_day != 0:
646 if is_prev:
647 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
648 else:
649 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
650 return True, d
651 return False, d
653 def proc_nearest_weekday(d):
654 """Process W (nearest weekday) day-of-month entries."""
655 candidates = []
656 for w_day in self.nearest_weekday:
657 candidate = self._get_nearest_weekday(d.year, d.month, w_day)
658 if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
659 candidates.append(candidate)
661 if not candidates:
662 if is_prev:
663 d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
664 else:
665 days = _last_day_of_month(year, month)
666 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
667 return True, d
669 candidates.sort()
670 diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
671 if diff_day != 0:
672 if is_prev:
673 d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
674 else:
675 d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
676 return True, d
677 return False, d
679 def proc_hour(d):
680 try:
681 expanded[HOUR_FIELD].index("*")
682 except ValueError:
683 diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24)
684 if diff_hour is not None and diff_hour != 0:
685 if is_prev:
686 d += relativedelta(hours=diff_hour, minute=59, second=59)
687 else:
688 d += relativedelta(hours=diff_hour, minute=0, second=0)
689 return True, d
690 return False, d
692 def proc_minute(d):
693 try:
694 expanded[MINUTE_FIELD].index("*")
695 except ValueError:
696 diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60)
697 if diff_min is not None and diff_min != 0:
698 if is_prev:
699 d += relativedelta(minutes=diff_min, second=59)
700 else:
701 d += relativedelta(minutes=diff_min, second=0)
702 return True, d
703 return False, d
705 def proc_second(d):
706 if len(expanded) > UNIX_CRON_LEN:
707 try:
708 expanded[SECOND_FIELD].index("*")
709 except ValueError:
710 diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60)
711 if diff_sec is not None and diff_sec != 0:
712 d += relativedelta(seconds=diff_sec)
713 return True, d
714 else:
715 d += relativedelta(second=0)
716 return False, d
718 procs = [
719 proc_year,
720 proc_month,
721 (proc_nearest_weekday if self.nearest_weekday else proc_day_of_month),
722 (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week),
723 proc_hour,
724 proc_minute,
725 proc_second,
726 ]
728 while abs(year - current_year) <= self._max_years_between_matches:
729 next = False
730 stop = False
731 for proc in procs:
732 (changed, unaware_time) = proc(unaware_time)
733 # `None` can be set mostly for year processing
734 # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff
735 if changed is None:
736 stop = True
737 break
738 if changed:
739 month, year = unaware_time.month, unaware_time.year
740 next = True
741 break
742 if stop:
743 break
744 if next:
745 continue
747 unaware_time = unaware_time.replace(microsecond=0)
748 if now.tzinfo is None:
749 return unaware_time
751 # Add timezone information back and handle DST changes
752 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
754 if not exists and (
755 not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD]
756 ):
757 # The calculated local date does not exist and moving the time forward
758 # to the next valid time isn't the correct solution. Search for the
759 # next matching cron time that exists.
760 while not exists:
761 unaware_time = self._calc(
762 unaware_time, expanded, nth_weekday_of_month, is_prev
763 )
764 aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)
766 offset_delta = _timezone_delta(now, aware_time)
767 if not offset_delta:
768 # There was no DST change.
769 return aware_time
771 # There was a DST change. So check if there is a alternative cron time
772 # for the other UTC offset.
773 alternative_unaware_time = now.replace(tzinfo=None) + offset_delta
774 alternative_unaware_time = self._calc(
775 alternative_unaware_time, expanded, nth_weekday_of_month, is_prev
776 )
777 alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev)
779 if not _is_successor(alternative_aware_time, now, is_prev):
780 # The alternative time is an ancestor of now. Thus it is not an alternative.
781 return aware_time
783 if _is_successor(aware_time, alternative_aware_time, is_prev):
784 return alternative_aware_time
786 return aware_time
788 if is_prev:
789 raise CroniterBadDateError("failed to find prev date")
790 raise CroniterBadDateError("failed to find next date")
792 @staticmethod
793 def _get_next_nearest_diff(x, to_check, range_val):
794 """
795 `range_val` is the range of a field.
796 If no available time, we can move to next loop(like next month).
797 `range_val` can also be set to `None` to indicate that there is no loop.
798 ( Currently, should only used for `year` field )
799 """
800 for i, d in enumerate(to_check):
801 if range_val is not None:
802 if d == "l":
803 # if 'l' then it is the last day of month
804 # => its value of range_val
805 d = range_val
806 elif d > range_val:
807 continue
808 if d >= x:
809 return d - x
810 # When range_val is None and x not exists in to_check,
811 # `None` will be returned to suggest no more available time
812 if range_val is None:
813 return None
814 return to_check[0] - x + range_val
816 @staticmethod
817 def _get_prev_nearest_diff(x, to_check, range_val):
818 """
819 `range_val` is the range of a field.
820 If no available time, we can move to previous loop(like previous month).
821 Range_val can also be set to `None` to indicate that there is no loop.
822 ( Currently should only used for `year` field )
823 """
824 candidates = to_check[:]
825 candidates.reverse()
826 for d in candidates:
827 if d != "l" and d <= x:
828 return d - x
829 if "l" in candidates:
830 return -x
831 # When range_val is None and x not exists in to_check,
832 # `None` will be returned to suggest no more available time
833 if range_val is None:
834 return None
835 candidate = candidates[0]
836 for c in candidates:
837 # fixed: c < range_val
838 # this code will reject all 31 day of month, 12 month, 59 second,
839 # 23 hour and so on.
840 # if candidates has just a element, this will not harmful.
841 # but candidates have multiple elements, then values equal to
842 # range_val will rejected.
843 if c <= range_val:
844 candidate = c
845 break
846 # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error
847 # return 2021-03-02 06:00:00
848 if candidate > range_val:
849 return -range_val
850 return candidate - x - range_val
852 @staticmethod
853 def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]:
854 """For a given year/month return a list of days in nth-day-of-month order.
855 The last weekday of the month is always [-1].
856 """
857 w = (day_of_week + 6) % 7
858 c = calendar.Calendar(w).monthdayscalendar(year, month)
859 if c[0][0] == 0:
860 c.pop(0)
861 return tuple(i[0] for i in c)
863 @staticmethod
864 def _get_nearest_weekday(year, month, day):
865 """Get the nearest weekday (Mon-Fri) to the given day in the given month.
867 Rules:
868 - If the day is a weekday, return it.
869 - If Saturday, return Friday (day-1), unless that crosses into previous month,
870 then return Monday (day+2).
871 - If Sunday, return Monday (day+1), unless that crosses into next month,
872 then return Friday (day-2).
873 """
874 last_day = _last_day_of_month(year, month)
875 day = min(day, last_day)
876 weekday = calendar.weekday(year, month, day) # 0=Mon, 6=Sun
877 if weekday < 5: # Mon-Fri
878 return day
879 if weekday == 5: # Saturday
880 if day > 1:
881 return day - 1 # Friday
882 else:
883 return day + 2 # Monday (1st is Sat, so 3rd is Mon)
884 # Sunday
885 if day < last_day:
886 return day + 1 # Monday
887 else:
888 return day - 2 # Friday (last day is Sun, go back to Fri)
890 @classmethod
891 def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN):
892 if isinstance(len_expressions, (list, dict, tuple, set)):
893 len_expressions = len(len_expressions)
894 if val in cls.LOWMAP[field_index] and not (
895 # do not support 0 as a month either for classical 5 fields cron,
896 # 6fields second repeat form or 7 fields year form
897 # but still let conversion happen if day field is shifted
898 (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN)
899 or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN)
900 or (
901 field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD]
902 and len_expressions == YEAR_CRON_LEN
903 )
904 ):
905 val = cls.LOWMAP[field_index][val]
906 return val
908 # Maximum days in each month (non-leap year for Feb)
909 DAYS_IN_MONTH = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
911 @classmethod
912 def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None, strict=False, strict_year=None):
913 # Split the expression in components, and normalize L -> l, MON -> mon,
914 # etc. Keep expr_format untouched so we can use it in the exception
915 # messages.
916 expr_aliases = {
917 "@midnight": ("0 0 * * *", "h h(0-2) * * * h"),
918 "@hourly": ("0 * * * *", "h * * * * h"),
919 "@daily": ("0 0 * * *", "h h * * * h"),
920 "@weekly": ("0 0 * * 0", "h h * * h h"),
921 "@monthly": ("0 0 1 * *", "h h h * * h"),
922 "@yearly": ("0 0 1 1 *", "h h h h * h"),
923 "@annually": ("0 0 1 1 *", "h h h h * h"),
924 }
926 efl = expr_format.lower()
927 hash_id_expr = 1 if hash_id is not None else 0
928 try:
929 efl = expr_aliases[efl][hash_id_expr]
930 except KeyError:
931 pass
933 expressions = efl.split()
935 if len(expressions) not in VALID_LEN_EXPRESSION:
936 raise CroniterBadCronError(
937 "Exactly 5, 6 or 7 columns has to be specified for iterator expression."
938 )
940 if len(expressions) > UNIX_CRON_LEN and second_at_beginning:
941 # move second to it's own(6th) field to process by same logical
942 expressions.insert(SECOND_FIELD, expressions.pop(0))
944 expanded = []
945 nth_weekday_of_month = {}
946 nearest_weekday = set()
948 for field_index, expr in enumerate(expressions):
949 for expanderid, expander in EXPANDERS.items():
950 expr = expander(cls).expand(
951 efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp
952 )
954 if "?" in expr:
955 if expr != "?":
956 raise CroniterBadCronError(
957 f"[{expr_format}] is not acceptable."
958 f" Question mark can not used with other characters"
959 )
960 if field_index not in [DAY_FIELD, DOW_FIELD]:
961 raise CroniterBadCronError(
962 f"[{expr_format}] is not acceptable. "
963 f"Question mark can only used in day_of_month or day_of_week"
964 )
965 # currently just trade `?` as `*`
966 expr = "*"
968 e_list = expr.split(",")
969 res = []
971 while len(e_list) > 0:
972 e = e_list.pop()
973 nth = None
975 if field_index == DOW_FIELD:
976 # Handle special case in the dow expression: 2#3, l3
977 special_dow_rem = special_dow_re.match(str(e))
978 if special_dow_rem:
979 g = special_dow_rem.groupdict()
980 he, last = g.get("he", ""), g.get("last", "")
981 if he:
982 e = he
983 try:
984 nth = int(last)
985 assert 5 >= nth >= 1
986 except (KeyError, ValueError, AssertionError):
987 raise CroniterBadCronError(
988 f"[{expr_format}] is not acceptable."
989 f" Invalid day_of_week value: '{nth}'"
990 )
991 elif last:
992 e = last
993 nth = g["pre"] # 'l'
995 if field_index == DAY_FIELD:
996 # Handle W (nearest weekday) in day-of-month: 15w, w15
997 w_match = nearest_weekday_re.match(str(e))
998 if w_match:
999 w_day = int(w_match.group(1) or w_match.group(2))
1000 if w_day < 1 or w_day > 31:
1001 raise CroniterBadCronError(
1002 f"[{expr_format}] is not acceptable,"
1003 f" nearest weekday day value '{w_day}' out of range"
1004 )
1005 if len(e_list) > 0 or len(res) > 0:
1006 raise CroniterBadCronError(
1007 f"[{expr_format}] is not acceptable."
1008 f" 'W' can only be used with a single day value,"
1009 f" not in a list or range"
1010 )
1011 nearest_weekday.add(w_day)
1012 res.append(w_day)
1013 continue
1015 # Before matching step_search_re, normalize "*" to "{min}-{max}".
1016 # Example: in the minute field, "*/5" normalizes to "0-59/5"
1017 t = re.sub(
1018 r"^\*(\/.+)$",
1019 r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]),
1020 str(e),
1021 )
1022 m = step_search_re.search(t)
1024 if not m:
1025 # Before matching step_search_re,
1026 # normalize "{start}/{step}" to "{start}-{max}/{step}".
1027 # Example: in the minute field, "10/5" normalizes to "10-59/5"
1028 t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e))
1029 m = step_search_re.search(t)
1031 if m:
1032 # early abort if low/high are out of bounds
1033 (low, high, step) = m.group(1), m.group(2), m.group(4) or 1
1034 if field_index == DAY_FIELD and high == "l":
1035 high = "31"
1037 if not only_int_re.search(low):
1038 low = str(cls._alphaconv(field_index, low, expressions))
1040 if not only_int_re.search(high):
1041 high = str(cls._alphaconv(field_index, high, expressions))
1043 # normally, it's already guarded by the RE that should not accept
1044 # not-int values.
1045 if not only_int_re.search(str(step)):
1046 raise CroniterBadCronError(
1047 f"[{expr_format}] step '{step}'"
1048 f" in field {field_index} is not acceptable"
1049 )
1050 step = int(step)
1052 for band in low, high:
1053 if not only_int_re.search(str(band)):
1054 raise CroniterBadCronError(
1055 f"[{expr_format}] bands '{low}-{high}'"
1056 f" in field {field_index} are not acceptable"
1057 )
1059 low, high = (
1060 cls.value_alias(int(_val), field_index, expressions)
1061 for _val in (low, high)
1062 )
1064 if max(low, high) > max(
1065 cls.RANGES[field_index][0], cls.RANGES[field_index][1]
1066 ):
1067 raise CroniterBadCronError(f"{expr_format} is out of bands")
1069 if from_timestamp:
1070 low = cls._get_low_from_current_date_number(
1071 field_index, int(step), int(from_timestamp)
1072 )
1074 # Handle when the second bound of the range is in backtracking order:
1075 # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH
1076 if low > high:
1077 whole_field_range = list(
1078 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1)
1079 )
1080 # Add FirstBound -> ENDRANGE, respecting step
1081 rng = list(range(low, cls.RANGES[field_index][1] + 1, step))
1082 # Then 0 -> SecondBound, but skipping n first occurences according to step
1083 # EG to respect such expressions : Apr-Jan/3
1084 to_skip = 0
1085 if rng:
1086 already_skipped = list(reversed(whole_field_range)).index(rng[-1])
1087 curpos = whole_field_range.index(rng[-1])
1088 if ((curpos + step) > len(whole_field_range)) and (
1089 already_skipped < step
1090 ):
1091 to_skip = step - already_skipped
1092 rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step))
1093 # if we include a range type: Jan-Jan, or Sun-Sun,
1094 # it means the whole cycle (all days of week, # all monthes of year, etc)
1095 elif low == high:
1096 rng = list(
1097 range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step)
1098 )
1099 else:
1100 try:
1101 rng = list(range(low, high + 1, step))
1102 except ValueError as exc:
1103 raise CroniterBadCronError(f"invalid range: {exc}")
1105 if field_index == DOW_FIELD and nth and nth != "l":
1106 rng = [f"{item}#{nth}" for item in rng]
1107 e_list += [a for a in rng if a not in e_list]
1108 else:
1109 if t.startswith("-"):
1110 raise CroniterBadCronError(
1111 f"[{expr_format}] is not acceptable, negative numbers not allowed"
1112 )
1113 if not star_or_int_re.search(t):
1114 t = cls._alphaconv(field_index, t, expressions)
1116 try:
1117 t = int(t)
1118 except ValueError:
1119 pass
1121 t = cls.value_alias(t, field_index, expressions)
1123 if t not in ["*", "l"] and (
1124 int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1]
1125 ):
1126 raise CroniterBadCronError(
1127 f"[{expr_format}] is not acceptable, out of range"
1128 )
1130 res.append(t)
1132 if field_index == DOW_FIELD and nth:
1133 if t not in nth_weekday_of_month:
1134 nth_weekday_of_month[t] = set()
1135 nth_weekday_of_month[t].add(nth)
1137 res = set(res)
1138 res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i)
1139 if len(res) == cls.LEN_MEANS_ALL[field_index]:
1140 # Make sure the wildcard is used in the correct way (avoid over-optimization)
1141 if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or (
1142 field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD]
1143 ):
1144 pass
1145 else:
1146 res = ["*"]
1148 expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res)
1150 # Check to make sure the dow combo in use is supported
1151 if nth_weekday_of_month:
1152 dow_expanded_set = set(expanded[DOW_FIELD])
1153 dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys())
1154 dow_expanded_set.discard("*")
1155 # Skip: if it's all weeks instead of wildcard
1156 if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]:
1157 raise CroniterUnsupportedSyntaxError(
1158 f"day-of-week field does not support mixing literal values and nth"
1159 f" day of week syntax. Cron: '{expr_format}'"
1160 f" dow={dow_expanded_set} vs nth={nth_weekday_of_month}"
1161 )
1163 if strict:
1164 # Cross-validate day-of-month against month (and optionally year)
1165 # to reject impossible combinations like "0 0 31 2 *" (Feb 31st).
1166 days = expanded[DAY_FIELD]
1167 months = expanded[MONTH_FIELD]
1168 if days != ["*"] and days != ["l"] and months != ["*"]:
1169 int_days = [d for d in days if isinstance(d, int)]
1170 int_months = [m for m in months if isinstance(m, int)]
1171 if int_days and int_months:
1172 # Determine max days per month, accounting for leap years
1173 days_in_month = dict(cls.DAYS_IN_MONTH)
1174 if 2 in int_months:
1175 has_leap_year = True # assume possible by default
1176 if strict_year is not None:
1177 # Year explicitly provided as parameter
1178 if isinstance(strict_year, int):
1179 has_leap_year = calendar.isleap(strict_year)
1180 else:
1181 has_leap_year = any(calendar.isleap(y) for y in strict_year)
1182 elif len(expanded) > YEAR_FIELD:
1183 years = expanded[YEAR_FIELD]
1184 if years != ["*"]:
1185 int_years = [y for y in years if isinstance(y, int)]
1186 if int_years:
1187 has_leap_year = any(calendar.isleap(y) for y in int_years)
1188 if has_leap_year:
1189 days_in_month[2] = 29
1190 min_day = min(int_days)
1191 max_possible = max(days_in_month[m] for m in int_months)
1192 if min_day > max_possible:
1193 raise CroniterBadCronError(
1194 f"[{expr_format}] is not acceptable. Day(s) {int_days}"
1195 f" can never occur in month(s) {int_months}"
1196 )
1198 return expanded, nth_weekday_of_month, expressions, nearest_weekday
1200 @classmethod
1201 def expand(
1202 cls,
1203 expr_format: str,
1204 hash_id: Optional[Union[bytes, str]] = None,
1205 second_at_beginning: bool = False,
1206 from_timestamp: Optional[float] = None,
1207 strict: bool = False,
1208 strict_year: Optional[Union[int, list[int]]] = None,
1209 ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]:
1210 """
1211 Expand a cron expression format into a noramlized format of
1212 list[list[int | 'l' | '*']]. The first list representing each element
1213 of the epxression, and each sub-list representing the allowed values
1214 for that expression component.
1216 A tuple is returned, the first value being the expanded epxression
1217 list, and the second being a `nth_weekday_of_month` mapping.
1219 Examples:
1221 # Every minute
1222 >>> croniter.expand('* * * * *')
1223 ([['*'], ['*'], ['*'], ['*'], ['*']], {})
1225 # On the hour
1226 >>> croniter.expand('0 0 * * *')
1227 ([[0], [0], ['*'], ['*'], ['*']], {})
1229 # Hours 0-5 and 10 monday through friday
1230 >>> croniter.expand('0-5,10 * * * mon-fri')
1231 ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {})
1233 Note that some special values such as nth day of week are expanded to a
1234 special mapping format for later processing:
1236 # Every minute on the 3rd tuesday of the month
1237 >>> croniter.expand('* * * * 2#3')
1238 ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}})
1240 # Every hour on the last day of the month
1241 >>> croniter.expand('0 * l * *')
1242 ([[0], ['*'], ['l'], ['*'], ['*']], {})
1244 # On the hour every 15 seconds
1245 >>> croniter.expand('0 0 * * * */15')
1246 ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {})
1247 """
1248 try:
1249 expanded, nth_weekday_of_month, _expressions, _nearest_weekday = cls._expand(
1250 expr_format,
1251 hash_id=hash_id,
1252 second_at_beginning=second_at_beginning,
1253 from_timestamp=from_timestamp,
1254 strict=strict,
1255 strict_year=strict_year,
1256 )
1257 return expanded, nth_weekday_of_month
1258 except (ValueError,) as exc:
1259 if isinstance(exc, CroniterError):
1260 raise
1261 trace = _traceback.format_exc()
1262 raise CroniterBadCronError(trace)
1264 @classmethod
1265 def _get_low_from_current_date_number(cls, field_index, step, from_timestamp):
1266 dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT)
1267 if field_index == MINUTE_FIELD:
1268 return dt.minute % step
1269 if field_index == HOUR_FIELD:
1270 return dt.hour % step
1271 if field_index == DAY_FIELD:
1272 return ((dt.day - 1) % step) + 1
1273 if field_index == MONTH_FIELD:
1274 return dt.month % step
1275 if field_index == DOW_FIELD:
1276 return (dt.weekday() + 1) % step
1278 raise ValueError("Can't get current date number for index larger than 4")
1280 @classmethod
1281 def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False, strict=False, strict_year=None):
1282 if hash_id:
1283 if not isinstance(hash_id, (bytes, str)):
1284 raise TypeError("hash_id must be bytes or UTF-8 string")
1285 if not isinstance(hash_id, bytes):
1286 hash_id = hash_id.encode(encoding)
1287 try:
1288 cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning, strict=strict, strict_year=strict_year)
1289 except CroniterError:
1290 return False
1291 return True
1293 @classmethod
1294 def match(
1295 cls,
1296 cron_expression,
1297 testdate,
1298 day_or=True,
1299 second_at_beginning=False,
1300 precision_in_seconds=None,
1301 ):
1302 return cls.match_range(
1303 cron_expression, testdate, testdate, day_or, second_at_beginning, precision_in_seconds
1304 )
1306 @classmethod
1307 def match_range(
1308 cls,
1309 cron_expression,
1310 from_datetime,
1311 to_datetime,
1312 day_or=True,
1313 second_at_beginning=False,
1314 precision_in_seconds=None,
1315 ):
1316 cron = cls(
1317 cron_expression,
1318 to_datetime,
1319 ret_type=datetime.datetime,
1320 day_or=day_or,
1321 second_at_beginning=second_at_beginning,
1322 )
1323 tdp = cron.get_current(datetime.datetime)
1324 if not tdp.microsecond:
1325 tdp += relativedelta(microseconds=1)
1326 cron.set_current(tdp, force=True)
1327 try:
1328 tdt = cron.get_prev()
1329 except CroniterBadDateError:
1330 return False
1331 if precision_in_seconds is None:
1332 precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60
1333 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds
1334 return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second
1337def croniter_range(
1338 start,
1339 stop,
1340 expr_format,
1341 ret_type=None,
1342 day_or=True,
1343 exclude_ends=False,
1344 _croniter=None,
1345 second_at_beginning=False,
1346 expand_from_start_time=False,
1347):
1348 """
1349 Generator that provides all times from start to stop matching the given cron expression.
1350 If the cron expression matches either 'start' and/or 'stop', those times will be returned as
1351 well unless 'exclude_ends=True' is passed.
1353 You can think of this function as sibling to the builtin range function for datetime objects.
1354 Like range(start,stop,step), except that here 'step' is a cron expression.
1355 """
1356 _croniter = _croniter or croniter
1357 auto_rt = datetime.datetime
1358 # type is used in first if branch for perfs reasons
1359 if type(start) is not type(stop) and not (
1360 isinstance(start, type(stop)) or isinstance(stop, type(start))
1361 ):
1362 raise CroniterBadTypeRangeError(
1363 f"The start and stop must be same type. {type(start)} != {type(stop)}"
1364 )
1365 if isinstance(start, (float, int)):
1366 start, stop = (
1367 datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop)
1368 )
1369 auto_rt = float
1370 if ret_type is None:
1371 ret_type = auto_rt
1372 if not exclude_ends:
1373 ms1 = relativedelta(microseconds=1)
1374 if start < stop: # Forward (normal) time order
1375 start -= ms1
1376 stop += ms1
1377 else: # Reverse time order
1378 start += ms1
1379 stop -= ms1
1380 year_span = math.floor(abs(stop.year - start.year)) + 1
1381 ic = _croniter(
1382 expr_format,
1383 start,
1384 ret_type=datetime.datetime,
1385 day_or=day_or,
1386 max_years_between_matches=year_span,
1387 second_at_beginning=second_at_beginning,
1388 expand_from_start_time=expand_from_start_time,
1389 )
1390 # define a continue (cont) condition function and step function for the main while loop
1391 if start < stop: # Forward
1393 def cont(v):
1394 return v < stop
1396 step = ic.get_next
1397 else: # Reverse
1399 def cont(v):
1400 return v > stop
1402 step = ic.get_prev
1403 try:
1404 dt = step()
1405 while cont(dt):
1406 if ret_type is float:
1407 yield ic.get_current(float)
1408 else:
1409 yield dt
1410 dt = step()
1411 except CroniterBadDateError:
1412 # Stop iteration when this exception is raised; no match found within the given year range
1413 return
1416class HashExpander:
1417 def __init__(self, cronit):
1418 self.cron = cronit
1420 def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None):
1421 """Return a hashed/random integer given range/hash information"""
1422 if range_end is None:
1423 range_end = self.cron.RANGES[idx][1]
1424 if range_begin is None:
1425 range_begin = self.cron.RANGES[idx][0]
1426 if hash_type == "r":
1427 crc = random.randint(0, 0xFFFFFFFF)
1428 else:
1429 crc = binascii.crc32(hash_id) & 0xFFFFFFFF
1430 return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin
1432 def match(self, efl, idx, expr, hash_id=None, **kw):
1433 return hash_expression_re.match(expr)
1435 def expand(self, efl, idx, expr, hash_id=None, match="", **kw):
1436 """Expand a hashed/random expression to its normal representation"""
1437 if match == "":
1438 match = self.match(efl, idx, expr, hash_id, **kw)
1439 if not match:
1440 return expr
1441 m = match.groupdict()
1443 if m["hash_type"] == "h" and hash_id is None:
1444 raise CroniterBadCronError("Hashed definitions must include hash_id")
1446 if m["range_begin"] and m["range_end"]:
1447 if int(m["range_begin"]) >= int(m["range_end"]):
1448 raise CroniterBadCronError("Range end must be greater than range begin")
1450 if m["range_begin"] and m["range_end"] and m["divisor"]:
1451 # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54)
1452 if int(m["divisor"]) == 0:
1453 raise CroniterBadCronError(f"Bad expression: {expr}")
1455 x = self.do(
1456 idx,
1457 hash_type=m["hash_type"],
1458 hash_id=hash_id,
1459 range_begin=int(m["range_begin"]),
1460 range_end=int(m["divisor"]) - 1 + int(m["range_begin"]),
1461 )
1462 return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}"
1463 if m["range_begin"] and m["range_end"]:
1464 # Example: H(0-29) -> 12
1465 return str(
1466 self.do(
1467 idx,
1468 hash_type=m["hash_type"],
1469 hash_id=hash_id,
1470 range_end=int(m["range_end"]),
1471 range_begin=int(m["range_begin"]),
1472 )
1473 )
1474 if m["divisor"]:
1475 # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52)
1476 if int(m["divisor"]) == 0:
1477 raise CroniterBadCronError(f"Bad expression: {expr}")
1479 x = self.do(
1480 idx,
1481 hash_type=m["hash_type"],
1482 hash_id=hash_id,
1483 range_begin=self.cron.RANGES[idx][0],
1484 range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0],
1485 )
1486 return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}"
1488 # Example: H -> 32
1489 return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id))
1492EXPANDERS = {"hash": HashExpander}