Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/flask_restx/inputs.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2This module provide some helpers for advanced types parsing.
4You can define you own parser using the same pattern:
6.. code-block:: python
8 def my_type(value):
9 if not condition:
10 raise ValueError('This is not my type')
11 return parse(value)
13 # Swagger documentation
14 my_type.__schema__ = {'type': 'string', 'format': 'my-custom-format'}
16The last line allows you to document properly the type in the Swagger documentation.
17"""
19import re
20import socket
22from datetime import datetime, time, timedelta, timezone
23from email.utils import parsedate_tz, mktime_tz
24from urllib.parse import urlparse
26import aniso8601
28# Constants for upgrading date-based intervals to full datetimes.
29START_OF_DAY = time(0, 0, 0, tzinfo=timezone.utc)
30END_OF_DAY = time(23, 59, 59, 999999, tzinfo=timezone.utc)
33netloc_regex = re.compile(
34 r"(?:(?P<auth>[^:@]+?(?::[^:@]*?)?)@)?" # basic auth
35 r"(?:"
36 r"(?P<localhost>localhost)|" # localhost...
37 r"(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|" # ...or ipv4
38 r"(?:\[?(?P<ipv6>[A-F0-9]*:[A-F0-9:]+)\]?)|" # ...or ipv6
39 r"(?P<domain>(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?))" # domain...
40 r")"
41 r"(?::(?P<port>\d+))?" # optional port
42 r"$",
43 re.IGNORECASE,
44)
47email_regex = re.compile(
48 r"^" "(?P<local>[^@]*[^@.])" r"@" r"(?P<server>[^@\.]+(?:\.[^@\.]+)*)" r"$",
49 re.IGNORECASE,
50)
52time_regex = re.compile(r"\d{2}:\d{2}")
55def ipv4(value):
56 """Validate an IPv4 address"""
57 try:
58 socket.inet_aton(value)
59 if value.count(".") == 3:
60 return value
61 except socket.error:
62 pass
63 raise ValueError("{0} is not a valid ipv4 address".format(value))
66ipv4.__schema__ = {"type": "string", "format": "ipv4"}
69def ipv6(value):
70 """Validate an IPv6 address"""
71 try:
72 socket.inet_pton(socket.AF_INET6, value)
73 return value
74 except socket.error:
75 raise ValueError("{0} is not a valid ipv4 address".format(value))
78ipv6.__schema__ = {"type": "string", "format": "ipv6"}
81def ip(value):
82 """Validate an IP address (both IPv4 and IPv6)"""
83 try:
84 return ipv4(value)
85 except ValueError:
86 pass
87 try:
88 return ipv6(value)
89 except ValueError:
90 raise ValueError("{0} is not a valid ip".format(value))
93ip.__schema__ = {"type": "string", "format": "ip"}
96class URL(object):
97 """
98 Validate an URL.
100 Example::
102 parser = reqparse.RequestParser()
103 parser.add_argument('url', type=inputs.URL(schemes=['http', 'https']))
105 Input to the ``URL`` argument will be rejected
106 if it does not match an URL with specified constraints.
107 If ``check`` is True it will also be rejected if the domain does not exists.
109 :param bool check: Check the domain exists (perform a DNS resolution)
110 :param bool ip: Allow IP (both ipv4/ipv6) as domain
111 :param bool local: Allow localhost (both string or ip) as domain
112 :param bool port: Allow a port to be present
113 :param bool auth: Allow authentication to be present
114 :param list|tuple schemes: Restrict valid schemes to this list
115 :param list|tuple domains: Restrict valid domains to this list
116 :param list|tuple exclude: Exclude some domains
117 """
119 def __init__(
120 self,
121 check=False,
122 ip=False,
123 local=False,
124 port=False,
125 auth=False,
126 schemes=None,
127 domains=None,
128 exclude=None,
129 ):
130 self.check = check
131 self.ip = ip
132 self.local = local
133 self.port = port
134 self.auth = auth
135 self.schemes = schemes
136 self.domains = domains
137 self.exclude = exclude
139 def error(self, value, details=None):
140 msg = "{0} is not a valid URL"
141 if details:
142 msg = ". ".join((msg, details))
143 raise ValueError(msg.format(value))
145 def __call__(self, value):
146 parsed = urlparse(value)
147 netloc_match = netloc_regex.match(parsed.netloc)
148 if not all((parsed.scheme, parsed.netloc)):
149 if netloc_regex.match(
150 parsed.netloc or parsed.path.split("/", 1)[0].split("?", 1)[0]
151 ):
152 self.error(value, "Did you mean: http://{0}")
153 self.error(value)
154 if parsed.scheme and self.schemes and parsed.scheme not in self.schemes:
155 self.error(value, "Protocol is not allowed")
156 if not netloc_match:
157 self.error(value)
158 data = netloc_match.groupdict()
159 if data["ipv4"] or data["ipv6"]:
160 if not self.ip:
161 self.error(value, "IP is not allowed")
162 else:
163 try:
164 ip(data["ipv4"] or data["ipv6"])
165 except ValueError as e:
166 self.error(value, str(e))
167 if not self.local:
168 if data["ipv4"] and data["ipv4"].startswith("127."):
169 self.error(value, "Localhost is not allowed")
170 elif data["ipv6"] == "::1":
171 self.error(value, "Localhost is not allowed")
172 if self.check:
173 pass
174 if data["auth"] and not self.auth:
175 self.error(value, "Authentication is not allowed")
176 if data["localhost"] and not self.local:
177 self.error(value, "Localhost is not allowed")
178 if data["port"]:
179 if not self.port:
180 self.error(value, "Custom port is not allowed")
181 else:
182 port = int(data["port"])
183 if not 0 < port < 65535:
184 self.error(value, "Port is out of range")
185 if data["domain"]:
186 if self.domains and data["domain"] not in self.domains:
187 self.error(value, "Domain is not allowed")
188 elif self.exclude and data["domain"] in self.exclude:
189 self.error(value, "Domain is not allowed")
190 if self.check:
191 try:
192 socket.getaddrinfo(data["domain"], None)
193 except socket.error:
194 self.error(value, "Domain does not exists")
195 return value
197 @property
198 def __schema__(self):
199 return {
200 "type": "string",
201 "format": "url",
202 }
205#: Validate an URL
206#:
207#: Legacy validator, allows, auth, port, ip and local
208#: Only allows schemes 'http', 'https', 'ftp' and 'ftps'
209url = URL(
210 ip=True, auth=True, port=True, local=True, schemes=("http", "https", "ftp", "ftps")
211)
214class email(object):
215 """
216 Validate an email.
218 Example::
220 parser = reqparse.RequestParser()
221 parser.add_argument('email', type=inputs.email(dns=True))
223 Input to the ``email`` argument will be rejected if it does not match an email
224 and if domain does not exists.
226 :param bool check: Check the domain exists (perform a DNS resolution)
227 :param bool ip: Allow IP (both ipv4/ipv6) as domain
228 :param bool local: Allow localhost (both string or ip) as domain
229 :param list|tuple domains: Restrict valid domains to this list
230 :param list|tuple exclude: Exclude some domains
231 """
233 def __init__(self, check=False, ip=False, local=False, domains=None, exclude=None):
234 self.check = check
235 self.ip = ip
236 self.local = local
237 self.domains = domains
238 self.exclude = exclude
240 def error(self, value, msg=None):
241 msg = msg or "{0} is not a valid email"
242 raise ValueError(msg.format(value))
244 def is_ip(self, value):
245 try:
246 ip(value)
247 return True
248 except ValueError:
249 return False
251 def __call__(self, value):
252 match = email_regex.match(value)
253 if not match or ".." in value:
254 self.error(value)
255 server = match.group("server")
256 if self.check:
257 try:
258 socket.getaddrinfo(server, None)
259 except socket.error:
260 self.error(value)
261 if self.domains and server not in self.domains:
262 self.error(value, "{0} does not belong to the authorized domains")
263 if self.exclude and server in self.exclude:
264 self.error(value, "{0} belongs to a forbidden domain")
265 if not self.local and (
266 server in ("localhost", "::1") or server.startswith("127.")
267 ):
268 self.error(value)
269 if self.is_ip(server) and not self.ip:
270 self.error(value)
271 return value
273 @property
274 def __schema__(self):
275 return {
276 "type": "string",
277 "format": "email",
278 }
281class regex(object):
282 """
283 Validate a string based on a regular expression.
285 Example::
287 parser = reqparse.RequestParser()
288 parser.add_argument('example', type=inputs.regex('^[0-9]+$'))
290 Input to the ``example`` argument will be rejected if it contains anything
291 but numbers.
293 :param str pattern: The regular expression the input must match
294 """
296 def __init__(self, pattern):
297 self.pattern = pattern
298 self.re = re.compile(pattern)
300 def __call__(self, value):
301 if not self.re.search(value):
302 message = 'Value does not match pattern: "{0}"'.format(self.pattern)
303 raise ValueError(message)
304 return value
306 def __deepcopy__(self, memo):
307 return regex(self.pattern)
309 @property
310 def __schema__(self):
311 return {
312 "type": "string",
313 "pattern": self.pattern,
314 }
317def _normalize_interval(start, end, value):
318 """
319 Normalize datetime intervals.
321 Given a pair of datetime.date or datetime.datetime objects,
322 returns a 2-tuple of tz-aware UTC datetimes spanning the same interval.
324 For datetime.date objects, the returned interval starts at 00:00:00.0
325 on the first date and ends at 00:00:00.0 on the second.
327 Naive datetimes are upgraded to UTC.
329 Timezone-aware datetimes are normalized to the UTC tzdata.
331 Params:
332 - start: A date or datetime
333 - end: A date or datetime
334 """
335 if not isinstance(start, datetime):
336 start = datetime.combine(start, START_OF_DAY)
337 end = datetime.combine(end, START_OF_DAY)
339 if start.tzinfo is None:
340 start = start.replace(tzinfo=timezone.utc)
341 end = end.replace(tzinfo=timezone.utc)
342 else:
343 start = start.astimezone(timezone.utc)
344 end = end.astimezone(timezone.utc)
346 return start, end
349def _expand_datetime(start, value):
350 if not isinstance(start, datetime):
351 # Expand a single date object to be the interval spanning
352 # that entire day.
353 end = start + timedelta(days=1)
354 else:
355 # Expand a datetime based on the finest resolution provided
356 # in the original input string.
357 time = value.split("T")[1]
358 time_without_offset = re.sub("[+-].+", "", time)
359 num_separators = time_without_offset.count(":")
360 if num_separators == 0:
361 # Hour resolution
362 end = start + timedelta(hours=1)
363 elif num_separators == 1:
364 # Minute resolution:
365 end = start + timedelta(minutes=1)
366 else:
367 # Second resolution
368 end = start + timedelta(seconds=1)
370 return end
373def _parse_interval(value):
374 """
375 Do some nasty try/except voodoo to get some sort of datetime
376 object(s) out of the string.
377 """
378 try:
379 return sorted(aniso8601.parse_interval(value))
380 except ValueError:
381 try:
382 return aniso8601.parse_datetime(value), None
383 except ValueError:
384 return aniso8601.parse_date(value), None
387def iso8601interval(value, argument="argument"):
388 """
389 Parses ISO 8601-formatted datetime intervals into tuples of datetimes.
391 Accepts both a single date(time) or a full interval using either start/end
392 or start/duration notation, with the following behavior:
394 - Intervals are defined as inclusive start, exclusive end
395 - Single datetimes are translated into the interval spanning the
396 largest resolution not specified in the input value, up to the day.
397 - The smallest accepted resolution is 1 second.
398 - All timezones are accepted as values; returned datetimes are
399 localized to UTC. Naive inputs and date inputs will are assumed UTC.
401 Examples::
403 "2013-01-01" -> datetime(2013, 1, 1), datetime(2013, 1, 2)
404 "2013-01-01T12" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 13)
405 "2013-01-01/2013-02-28" -> datetime(2013, 1, 1), datetime(2013, 2, 28)
406 "2013-01-01/P3D" -> datetime(2013, 1, 1), datetime(2013, 1, 4)
407 "2013-01-01T12:00/PT30M" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 12, 30)
408 "2013-01-01T06:00/2013-01-01T12:00" -> datetime(2013, 1, 1, 6), datetime(2013, 1, 1, 12)
410 :param str value: The ISO8601 date time as a string
411 :return: Two UTC datetimes, the start and the end of the specified interval
412 :rtype: A tuple (datetime, datetime)
413 :raises ValueError: if the interval is invalid.
414 """
415 if not value:
416 raise ValueError("Expected a valid ISO8601 date/time interval.")
418 try:
419 start, end = _parse_interval(value)
421 if end is None:
422 end = _expand_datetime(start, value)
424 start, end = _normalize_interval(start, end, value)
426 except ValueError as e:
427 msg = (
428 "Invalid {arg}: {value}. {arg} must be a valid ISO8601 date/time interval."
429 )
430 raise ValueError(msg.format(arg=argument, value=value)) from e
432 return start, end
435iso8601interval.__schema__ = {"type": "string", "format": "iso8601-interval"}
438def date(value):
439 """Parse a valid looking date in the format YYYY-mm-dd"""
440 date = datetime.strptime(value, "%Y-%m-%d")
441 return date
444date.__schema__ = {"type": "string", "format": "date"}
447def _get_integer(value):
448 try:
449 return int(value)
450 except (TypeError, ValueError):
451 raise ValueError("{0} is not a valid integer".format(value))
454def natural(value, argument="argument"):
455 """Restrict input type to the natural numbers (0, 1, 2, 3...)"""
456 value = _get_integer(value)
457 if value < 0:
458 msg = "Invalid {arg}: {value}. {arg} must be a non-negative integer"
459 raise ValueError(msg.format(arg=argument, value=value))
460 return value
463natural.__schema__ = {"type": "integer", "minimum": 0}
466def positive(value, argument="argument"):
467 """Restrict input type to the positive integers (1, 2, 3...)"""
468 value = _get_integer(value)
469 if value < 1:
470 msg = "Invalid {arg}: {value}. {arg} must be a positive integer"
471 raise ValueError(msg.format(arg=argument, value=value))
472 return value
475positive.__schema__ = {"type": "integer", "minimum": 0, "exclusiveMinimum": True}
478class int_range(object):
479 """Restrict input to an integer in a range (inclusive)"""
481 def __init__(self, low, high, argument="argument"):
482 self.low = low
483 self.high = high
484 self.argument = argument
486 def __call__(self, value):
487 value = _get_integer(value)
488 if value < self.low or value > self.high:
489 msg = "Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}"
490 raise ValueError(
491 msg.format(arg=self.argument, val=value, lo=self.low, hi=self.high)
492 )
493 return value
495 @property
496 def __schema__(self):
497 return {
498 "type": "integer",
499 "minimum": self.low,
500 "maximum": self.high,
501 }
504def boolean(value):
505 """
506 Parse the string ``"true"`` or ``"false"`` as a boolean (case insensitive).
508 Also accepts ``"1"`` and ``"0"`` as ``True``/``False`` (respectively).
510 If the input is from the request JSON body, the type is already a native python boolean,
511 and will be passed through without further parsing.
513 :raises ValueError: if the boolean value is invalid
514 """
515 if isinstance(value, bool):
516 return value
518 if value is None:
519 raise ValueError("boolean type must be non-null")
520 elif not value:
521 return False
522 value = str(value).lower()
523 if value in (
524 "true",
525 "1",
526 "on",
527 ):
528 return True
529 if value in (
530 "false",
531 "0",
532 ):
533 return False
534 raise ValueError("Invalid literal for boolean(): {0}".format(value))
537boolean.__schema__ = {"type": "boolean"}
540def datetime_from_rfc822(value):
541 """
542 Turns an RFC822 formatted date into a datetime object.
544 Example::
546 inputs.datetime_from_rfc822('Wed, 02 Oct 2002 08:00:00 EST')
548 :param str value: The RFC822-complying string to transform
549 :return: The parsed datetime
550 :rtype: datetime
551 :raises ValueError: if value is an invalid date literal
553 """
554 raw = value
555 if not time_regex.search(value):
556 value = " ".join((value, "00:00:00"))
557 try:
558 timetuple = parsedate_tz(value)
559 timestamp = mktime_tz(timetuple)
560 if timetuple[-1] is None:
561 return datetime.fromtimestamp(timestamp).replace(tzinfo=timezone.utc)
562 else:
563 return datetime.fromtimestamp(timestamp, timezone.utc)
564 except Exception:
565 raise ValueError('Invalid date literal "{0}"'.format(raw))
568def datetime_from_iso8601(value):
569 """
570 Turns an ISO8601 formatted date into a datetime object.
572 Example::
574 inputs.datetime_from_iso8601("2012-01-01T23:30:00+02:00")
576 :param str value: The ISO8601-complying string to transform
577 :return: A datetime
578 :rtype: datetime
579 :raises ValueError: if value is an invalid date literal
581 """
582 try:
583 try:
584 return aniso8601.parse_datetime(value)
585 except ValueError:
586 date = aniso8601.parse_date(value)
587 return datetime(date.year, date.month, date.day)
588 except Exception:
589 raise ValueError('Invalid date literal "{0}"'.format(value))
592datetime_from_iso8601.__schema__ = {"type": "string", "format": "date-time"}
595def date_from_iso8601(value):
596 """
597 Turns an ISO8601 formatted date into a date object.
599 Example::
601 inputs.date_from_iso8601("2012-01-01")
605 :param str value: The ISO8601-complying string to transform
606 :return: A date
607 :rtype: date
608 :raises ValueError: if value is an invalid date literal
610 """
611 return datetime_from_iso8601(value).date()
614date_from_iso8601.__schema__ = {"type": "string", "format": "date"}