Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cron_descriptor/ExpressionParser.py: 57%
84 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# The MIT License (MIT)
2#
3# Copyright (c) 2016 Adam Schubert
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in all
13# copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
23import re
25from .Exception import MissingFieldException, FormatException
28class ExpressionParser(object):
29 _expression = ''
30 _options = None
32 _cron_days = {
33 0: 'SUN',
34 1: 'MON',
35 2: 'TUE',
36 3: 'WED',
37 4: 'THU',
38 5: 'FRI',
39 6: 'SAT'
40 }
42 _cron_months = {
43 1: 'JAN',
44 2: 'FEB',
45 3: 'MAR',
46 4: 'APR',
47 5: 'MAY',
48 6: 'JUN',
49 7: 'JUL',
50 8: 'AUG',
51 9: 'SEP',
52 10: 'OCT',
53 11: 'NOV',
54 12: 'DEC'
55 }
57 def __init__(self, expression, options):
58 """Initializes a new instance of the ExpressionParser class
59 Args:
60 expression: The cron expression string
61 options: Parsing options
63 """
64 self._expression = expression
65 self._options = options
67 def parse(self):
68 """Parses the cron expression string
69 Returns:
70 A 7 part string array, one part for each component of the cron expression (seconds, minutes, etc.)
71 Raises:
72 MissingFieldException: if _expression is empty or None
73 FormatException: if _expression has wrong format
74 """
75 # Initialize all elements of parsed array to empty strings
76 parsed = ['', '', '', '', '', '', '']
78 if self._expression is None or len(self._expression) == 0:
79 raise MissingFieldException("ExpressionDescriptor.expression")
80 else:
81 expression_parts_temp = self._expression.split()
82 expression_parts_temp_length = len(expression_parts_temp)
83 if expression_parts_temp_length < 5:
84 raise FormatException(
85 "Error: Expression only has {0} parts. At least 5 part are required.".format(
86 expression_parts_temp_length
87 )
88 )
89 elif expression_parts_temp_length == 5:
90 # 5 part cron so shift array past seconds element
91 for i, expression_part_temp in enumerate(expression_parts_temp):
92 parsed[i + 1] = expression_part_temp
93 elif expression_parts_temp_length == 6:
94 # We will detect if this 6 part expression has a year specified and if so we will shift the parts and treat the
95 # first part as a minute part rather than a second part.
96 # Ways we detect:
97 # 1. Last part is a literal year (i.e. 2020)
98 # 2. 3rd or 5th part is specified as "?" (DOM or DOW)
99 year_regex = re.compile(r"\d{4}$")
100 is_year_with_no_seconds_part = bool(year_regex.search(expression_parts_temp[5])) or "?" in [expression_parts_temp[4], expression_parts_temp[2]]
101 for i, expression_part_temp in enumerate(expression_parts_temp):
102 if is_year_with_no_seconds_part:
103 # Shift parts over by one
104 parsed[i + 1] = expression_part_temp
105 else:
106 parsed[i] = expression_part_temp
108 elif expression_parts_temp_length == 7:
109 parsed = expression_parts_temp
110 else:
111 raise FormatException(
112 "Error: Expression has too many parts ({0}). Expression must not have more than 7 parts.".format(
113 expression_parts_temp_length
114 )
115 )
116 self.normalize_expression(parsed)
118 return parsed
120 def normalize_expression(self, expression_parts):
121 """Converts cron expression components into consistent, predictable formats.
122 Args:
123 expression_parts: A 7 part string array, one part for each component of the cron expression
124 Returns:
125 None
126 """
127 # convert ? to * only for DOM and DOW
128 expression_parts[3] = expression_parts[3].replace("?", "*")
129 expression_parts[5] = expression_parts[5].replace("?", "*")
131 # convert 0/, 1/ to */
132 if expression_parts[0].startswith("0/"):
133 expression_parts[0] = expression_parts[0].replace("0/", "*/") # seconds
135 if expression_parts[1].startswith("0/"):
136 expression_parts[1] = expression_parts[1].replace("0/", "*/") # minutes
138 if expression_parts[2].startswith("0/"):
139 expression_parts[2] = expression_parts[2].replace("0/", "*/") # hours
141 if expression_parts[3].startswith("1/"):
142 expression_parts[3] = expression_parts[3].replace("1/", "*/") # DOM
144 if expression_parts[4].startswith("1/"):
145 expression_parts[4] = expression_parts[4].replace("1/", "*/") # Month
147 if expression_parts[5].startswith("1/"):
148 expression_parts[5] = expression_parts[5].replace("1/", "*/") # DOW
150 if expression_parts[6].startswith("1/"):
151 expression_parts[6] = expression_parts[6].replace("1/", "*/") # Years
153 # Adjust DOW based on dayOfWeekStartIndexZero option
154 def digit_replace(match):
155 match_value = match.group()
156 dow_digits = re.sub(r'\D', "", match_value)
157 dow_digits_adjusted = dow_digits
158 if self._options.day_of_week_start_index_zero:
159 if dow_digits == "7":
160 dow_digits_adjusted = "0"
161 else:
162 dow_digits_adjusted = str(int(dow_digits) - 1)
164 return match_value.replace(dow_digits, dow_digits_adjusted)
166 expression_parts[5] = re.sub(r'(^\d)|([^#/\s]\d)', digit_replace, expression_parts[5])
168 # Convert DOM '?' to '*'
169 if expression_parts[3] == "?":
170 expression_parts[3] = "*"
172 # convert SUN-SAT format to 0-6 format
173 for day_number in self._cron_days:
174 expression_parts[5] = expression_parts[5].upper().replace(self._cron_days[day_number], str(day_number))
176 # convert JAN-DEC format to 1-12 format
177 for month_number in self._cron_months:
178 expression_parts[4] = expression_parts[4].upper().replace(
179 self._cron_months[month_number], str(month_number))
181 # convert 0 second to (empty)
182 if expression_parts[0] == "0":
183 expression_parts[0] = ''
185 # If time interval is specified for seconds or minutes and next time part is single item, make it a "self-range" so
186 # the expression can be interpreted as an interval 'between' range.
187 # For example:
188 # 0-20/3 9 * * * => 0-20/3 9-9 * * * (9 => 9-9)
189 # */5 3 * * * => */5 3-3 * * * (3 => 3-3)
190 star_and_slash = ['*', '/']
191 has_part_zero_star_and_slash = any(ext in expression_parts[0] for ext in star_and_slash)
192 has_part_one_star_and_slash = any(ext in expression_parts[1] for ext in star_and_slash)
193 has_part_two_special_chars = any(ext in expression_parts[2] for ext in ['*', '-', ',', '/'])
194 if not has_part_two_special_chars and (has_part_zero_star_and_slash or has_part_one_star_and_slash):
195 expression_parts[2] += '-{}'.format(expression_parts[2])
197 # Loop through all parts and apply global normalization
198 length = len(expression_parts)
199 for i in range(length):
201 # convert all '*/1' to '*'
202 if expression_parts[i] == "*/1":
203 expression_parts[i] = "*"
205 """
206 Convert Month,DOW,Year step values with a starting value (i.e. not '*') to between expressions.
207 This allows us to reuse the between expression handling for step values.
209 For Example:
210 - month part '3/2' will be converted to '3-12/2' (every 2 months between March and December)
211 - DOW part '3/2' will be converted to '3-6/2' (every 2 days between Tuesday and Saturday)
212 """
214 if "/" in expression_parts[i] and not any(exp in expression_parts[i] for exp in ['*', '-', ',']):
215 choices = {
216 4: "12",
217 5: "6",
218 6: "9999"
219 }
221 step_range_through = choices.get(i)
223 if step_range_through is not None:
224 parts = expression_parts[i].split('/')
225 expression_parts[i] = "{0}-{1}/{2}".format(parts[0], step_range_through, parts[1])