Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/flask_restx/inputs.py: 26%
274 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
1"""
2This module provide some helpers for advanced types parsing.
4You can define you own parser using the same pattern:
6.. code-block:: python
8 def my_type(value):
9 if not condition:
10 raise ValueError('This is not my type')
11 return parse(value)
13 # Swagger documentation
14 my_type.__schema__ = {'type': 'string', 'format': 'my-custom-format'}
16The last line allows you to document properly the type in the Swagger documentation.
17"""
19import re
20import socket
22from datetime import datetime, time, timedelta
23from email.utils import parsedate_tz, mktime_tz
24from urllib.parse import urlparse
26import aniso8601
27import pytz
29# Constants for upgrading date-based intervals to full datetimes.
30START_OF_DAY = time(0, 0, 0, tzinfo=pytz.UTC)
31END_OF_DAY = time(23, 59, 59, 999999, tzinfo=pytz.UTC)
34netloc_regex = re.compile(
35 r"(?:(?P<auth>[^:@]+?(?::[^:@]*?)?)@)?" # basic auth
36 r"(?:"
37 r"(?P<localhost>localhost)|" # localhost...
38 r"(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|" # ...or ipv4
39 r"(?:\[?(?P<ipv6>[A-F0-9]*:[A-F0-9:]+)\]?)|" # ...or ipv6
40 r"(?P<domain>(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?))" # domain...
41 r")"
42 r"(?::(?P<port>\d+))?" # optional port
43 r"$",
44 re.IGNORECASE,
45)
48email_regex = re.compile(
49 r"^" "(?P<local>[^@]*[^@.])" r"@" r"(?P<server>[^@\.]+(?:\.[^@\.]+)*)" r"$",
50 re.IGNORECASE,
51)
53time_regex = re.compile(r"\d{2}:\d{2}")
56def ipv4(value):
57 """Validate an IPv4 address"""
58 try:
59 socket.inet_aton(value)
60 if value.count(".") == 3:
61 return value
62 except socket.error:
63 pass
64 raise ValueError("{0} is not a valid ipv4 address".format(value))
67ipv4.__schema__ = {"type": "string", "format": "ipv4"}
70def ipv6(value):
71 """Validate an IPv6 address"""
72 try:
73 socket.inet_pton(socket.AF_INET6, value)
74 return value
75 except socket.error:
76 raise ValueError("{0} is not a valid ipv4 address".format(value))
79ipv6.__schema__ = {"type": "string", "format": "ipv6"}
82def ip(value):
83 """Validate an IP address (both IPv4 and IPv6)"""
84 try:
85 return ipv4(value)
86 except ValueError:
87 pass
88 try:
89 return ipv6(value)
90 except ValueError:
91 raise ValueError("{0} is not a valid ip".format(value))
94ip.__schema__ = {"type": "string", "format": "ip"}
97class URL(object):
98 """
99 Validate an URL.
101 Example::
103 parser = reqparse.RequestParser()
104 parser.add_argument('url', type=inputs.URL(schemes=['http', 'https']))
106 Input to the ``URL`` argument will be rejected
107 if it does not match an URL with specified constraints.
108 If ``check`` is True it will also be rejected if the domain does not exists.
110 :param bool check: Check the domain exists (perform a DNS resolution)
111 :param bool ip: Allow IP (both ipv4/ipv6) as domain
112 :param bool local: Allow localhost (both string or ip) as domain
113 :param bool port: Allow a port to be present
114 :param bool auth: Allow authentication to be present
115 :param list|tuple schemes: Restrict valid schemes to this list
116 :param list|tuple domains: Restrict valid domains to this list
117 :param list|tuple exclude: Exclude some domains
118 """
120 def __init__(
121 self,
122 check=False,
123 ip=False,
124 local=False,
125 port=False,
126 auth=False,
127 schemes=None,
128 domains=None,
129 exclude=None,
130 ):
131 self.check = check
132 self.ip = ip
133 self.local = local
134 self.port = port
135 self.auth = auth
136 self.schemes = schemes
137 self.domains = domains
138 self.exclude = exclude
140 def error(self, value, details=None):
141 msg = "{0} is not a valid URL"
142 if details:
143 msg = ". ".join((msg, details))
144 raise ValueError(msg.format(value))
146 def __call__(self, value):
147 parsed = urlparse(value)
148 netloc_match = netloc_regex.match(parsed.netloc)
149 if not all((parsed.scheme, parsed.netloc)):
150 if netloc_regex.match(
151 parsed.netloc or parsed.path.split("/", 1)[0].split("?", 1)[0]
152 ):
153 self.error(value, "Did you mean: http://{0}")
154 self.error(value)
155 if parsed.scheme and self.schemes and parsed.scheme not in self.schemes:
156 self.error(value, "Protocol is not allowed")
157 if not netloc_match:
158 self.error(value)
159 data = netloc_match.groupdict()
160 if data["ipv4"] or data["ipv6"]:
161 if not self.ip:
162 self.error(value, "IP is not allowed")
163 else:
164 try:
165 ip(data["ipv4"] or data["ipv6"])
166 except ValueError as e:
167 self.error(value, str(e))
168 if not self.local:
169 if data["ipv4"] and data["ipv4"].startswith("127."):
170 self.error(value, "Localhost is not allowed")
171 elif data["ipv6"] == "::1":
172 self.error(value, "Localhost is not allowed")
173 if self.check:
174 pass
175 if data["auth"] and not self.auth:
176 self.error(value, "Authentication is not allowed")
177 if data["localhost"] and not self.local:
178 self.error(value, "Localhost is not allowed")
179 if data["port"]:
180 if not self.port:
181 self.error(value, "Custom port is not allowed")
182 else:
183 port = int(data["port"])
184 if not 0 < port < 65535:
185 self.error(value, "Port is out of range")
186 if data["domain"]:
187 if self.domains and data["domain"] not in self.domains:
188 self.error(value, "Domain is not allowed")
189 elif self.exclude and data["domain"] in self.exclude:
190 self.error(value, "Domain is not allowed")
191 if self.check:
192 try:
193 socket.getaddrinfo(data["domain"], None)
194 except socket.error:
195 self.error(value, "Domain does not exists")
196 return value
198 @property
199 def __schema__(self):
200 return {
201 "type": "string",
202 "format": "url",
203 }
206#: Validate an URL
207#:
208#: Legacy validator, allows, auth, port, ip and local
209#: Only allows schemes 'http', 'https', 'ftp' and 'ftps'
210url = URL(
211 ip=True, auth=True, port=True, local=True, schemes=("http", "https", "ftp", "ftps")
212)
215class email(object):
216 """
217 Validate an email.
219 Example::
221 parser = reqparse.RequestParser()
222 parser.add_argument('email', type=inputs.email(dns=True))
224 Input to the ``email`` argument will be rejected if it does not match an email
225 and if domain does not exists.
227 :param bool check: Check the domain exists (perform a DNS resolution)
228 :param bool ip: Allow IP (both ipv4/ipv6) as domain
229 :param bool local: Allow localhost (both string or ip) as domain
230 :param list|tuple domains: Restrict valid domains to this list
231 :param list|tuple exclude: Exclude some domains
232 """
234 def __init__(self, check=False, ip=False, local=False, domains=None, exclude=None):
235 self.check = check
236 self.ip = ip
237 self.local = local
238 self.domains = domains
239 self.exclude = exclude
241 def error(self, value, msg=None):
242 msg = msg or "{0} is not a valid email"
243 raise ValueError(msg.format(value))
245 def is_ip(self, value):
246 try:
247 ip(value)
248 return True
249 except ValueError:
250 return False
252 def __call__(self, value):
253 match = email_regex.match(value)
254 if not match or ".." in value:
255 self.error(value)
256 server = match.group("server")
257 if self.check:
258 try:
259 socket.getaddrinfo(server, None)
260 except socket.error:
261 self.error(value)
262 if self.domains and server not in self.domains:
263 self.error(value, "{0} does not belong to the authorized domains")
264 if self.exclude and server in self.exclude:
265 self.error(value, "{0} belongs to a forbidden domain")
266 if not self.local and (
267 server in ("localhost", "::1") or server.startswith("127.")
268 ):
269 self.error(value)
270 if self.is_ip(server) and not self.ip:
271 self.error(value)
272 return value
274 @property
275 def __schema__(self):
276 return {
277 "type": "string",
278 "format": "email",
279 }
282class regex(object):
283 """
284 Validate a string based on a regular expression.
286 Example::
288 parser = reqparse.RequestParser()
289 parser.add_argument('example', type=inputs.regex('^[0-9]+$'))
291 Input to the ``example`` argument will be rejected if it contains anything
292 but numbers.
294 :param str pattern: The regular expression the input must match
295 """
297 def __init__(self, pattern):
298 self.pattern = pattern
299 self.re = re.compile(pattern)
301 def __call__(self, value):
302 if not self.re.search(value):
303 message = 'Value does not match pattern: "{0}"'.format(self.pattern)
304 raise ValueError(message)
305 return value
307 def __deepcopy__(self, memo):
308 return regex(self.pattern)
310 @property
311 def __schema__(self):
312 return {
313 "type": "string",
314 "pattern": self.pattern,
315 }
318def _normalize_interval(start, end, value):
319 """
320 Normalize datetime intervals.
322 Given a pair of datetime.date or datetime.datetime objects,
323 returns a 2-tuple of tz-aware UTC datetimes spanning the same interval.
325 For datetime.date objects, the returned interval starts at 00:00:00.0
326 on the first date and ends at 00:00:00.0 on the second.
328 Naive datetimes are upgraded to UTC.
330 Timezone-aware datetimes are normalized to the UTC tzdata.
332 Params:
333 - start: A date or datetime
334 - end: A date or datetime
335 """
336 if not isinstance(start, datetime):
337 start = datetime.combine(start, START_OF_DAY)
338 end = datetime.combine(end, START_OF_DAY)
340 if start.tzinfo is None:
341 start = pytz.UTC.localize(start)
342 end = pytz.UTC.localize(end)
343 else:
344 start = start.astimezone(pytz.UTC)
345 end = end.astimezone(pytz.UTC)
347 return start, end
350def _expand_datetime(start, value):
351 if not isinstance(start, datetime):
352 # Expand a single date object to be the interval spanning
353 # that entire day.
354 end = start + timedelta(days=1)
355 else:
356 # Expand a datetime based on the finest resolution provided
357 # in the original input string.
358 time = value.split("T")[1]
359 time_without_offset = re.sub("[+-].+", "", time)
360 num_separators = time_without_offset.count(":")
361 if num_separators == 0:
362 # Hour resolution
363 end = start + timedelta(hours=1)
364 elif num_separators == 1:
365 # Minute resolution:
366 end = start + timedelta(minutes=1)
367 else:
368 # Second resolution
369 end = start + timedelta(seconds=1)
371 return end
374def _parse_interval(value):
375 """
376 Do some nasty try/except voodoo to get some sort of datetime
377 object(s) out of the string.
378 """
379 try:
380 return sorted(aniso8601.parse_interval(value))
381 except ValueError:
382 try:
383 return aniso8601.parse_datetime(value), None
384 except ValueError:
385 return aniso8601.parse_date(value), None
388def iso8601interval(value, argument="argument"):
389 """
390 Parses ISO 8601-formatted datetime intervals into tuples of datetimes.
392 Accepts both a single date(time) or a full interval using either start/end
393 or start/duration notation, with the following behavior:
395 - Intervals are defined as inclusive start, exclusive end
396 - Single datetimes are translated into the interval spanning the
397 largest resolution not specified in the input value, up to the day.
398 - The smallest accepted resolution is 1 second.
399 - All timezones are accepted as values; returned datetimes are
400 localized to UTC. Naive inputs and date inputs will are assumed UTC.
402 Examples::
404 "2013-01-01" -> datetime(2013, 1, 1), datetime(2013, 1, 2)
405 "2013-01-01T12" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 13)
406 "2013-01-01/2013-02-28" -> datetime(2013, 1, 1), datetime(2013, 2, 28)
407 "2013-01-01/P3D" -> datetime(2013, 1, 1), datetime(2013, 1, 4)
408 "2013-01-01T12:00/PT30M" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 12, 30)
409 "2013-01-01T06:00/2013-01-01T12:00" -> datetime(2013, 1, 1, 6), datetime(2013, 1, 1, 12)
411 :param str value: The ISO8601 date time as a string
412 :return: Two UTC datetimes, the start and the end of the specified interval
413 :rtype: A tuple (datetime, datetime)
414 :raises ValueError: if the interval is invalid.
415 """
416 if not value:
417 raise ValueError("Expected a valid ISO8601 date/time interval.")
419 try:
420 start, end = _parse_interval(value)
422 if end is None:
423 end = _expand_datetime(start, value)
425 start, end = _normalize_interval(start, end, value)
427 except ValueError:
428 msg = (
429 "Invalid {arg}: {value}. {arg} must be a valid ISO8601 date/time interval."
430 )
431 raise ValueError(msg.format(arg=argument, value=value))
433 return start, end
436iso8601interval.__schema__ = {"type": "string", "format": "iso8601-interval"}
439def date(value):
440 """Parse a valid looking date in the format YYYY-mm-dd"""
441 date = datetime.strptime(value, "%Y-%m-%d")
442 return date
445date.__schema__ = {"type": "string", "format": "date"}
448def _get_integer(value):
449 try:
450 return int(value)
451 except (TypeError, ValueError):
452 raise ValueError("{0} is not a valid integer".format(value))
455def natural(value, argument="argument"):
456 """Restrict input type to the natural numbers (0, 1, 2, 3...)"""
457 value = _get_integer(value)
458 if value < 0:
459 msg = "Invalid {arg}: {value}. {arg} must be a non-negative integer"
460 raise ValueError(msg.format(arg=argument, value=value))
461 return value
464natural.__schema__ = {"type": "integer", "minimum": 0}
467def positive(value, argument="argument"):
468 """Restrict input type to the positive integers (1, 2, 3...)"""
469 value = _get_integer(value)
470 if value < 1:
471 msg = "Invalid {arg}: {value}. {arg} must be a positive integer"
472 raise ValueError(msg.format(arg=argument, value=value))
473 return value
476positive.__schema__ = {"type": "integer", "minimum": 0, "exclusiveMinimum": True}
479class int_range(object):
480 """Restrict input to an integer in a range (inclusive)"""
482 def __init__(self, low, high, argument="argument"):
483 self.low = low
484 self.high = high
485 self.argument = argument
487 def __call__(self, value):
488 value = _get_integer(value)
489 if value < self.low or value > self.high:
490 msg = "Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}"
491 raise ValueError(
492 msg.format(arg=self.argument, val=value, lo=self.low, hi=self.high)
493 )
494 return value
496 @property
497 def __schema__(self):
498 return {
499 "type": "integer",
500 "minimum": self.low,
501 "maximum": self.high,
502 }
505def boolean(value):
506 """
507 Parse the string ``"true"`` or ``"false"`` as a boolean (case insensitive).
509 Also accepts ``"1"`` and ``"0"`` as ``True``/``False`` (respectively).
511 If the input is from the request JSON body, the type is already a native python boolean,
512 and will be passed through without further parsing.
514 :raises ValueError: if the boolean value is invalid
515 """
516 if isinstance(value, bool):
517 return value
519 if value is None:
520 raise ValueError("boolean type must be non-null")
521 elif not value:
522 return False
523 value = str(value).lower()
524 if value in (
525 "true",
526 "1",
527 "on",
528 ):
529 return True
530 if value in (
531 "false",
532 "0",
533 ):
534 return False
535 raise ValueError("Invalid literal for boolean(): {0}".format(value))
538boolean.__schema__ = {"type": "boolean"}
541def datetime_from_rfc822(value):
542 """
543 Turns an RFC822 formatted date into a datetime object.
545 Example::
547 inputs.datetime_from_rfc822('Wed, 02 Oct 2002 08:00:00 EST')
549 :param str value: The RFC822-complying string to transform
550 :return: The parsed datetime
551 :rtype: datetime
552 :raises ValueError: if value is an invalid date literal
554 """
555 raw = value
556 if not time_regex.search(value):
557 value = " ".join((value, "00:00:00"))
558 try:
559 timetuple = parsedate_tz(value)
560 timestamp = mktime_tz(timetuple)
561 if timetuple[-1] is None:
562 return datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.utc)
563 else:
564 return datetime.fromtimestamp(timestamp, pytz.utc)
565 except Exception:
566 raise ValueError('Invalid date literal "{0}"'.format(raw))
569def datetime_from_iso8601(value):
570 """
571 Turns an ISO8601 formatted date into a datetime object.
573 Example::
575 inputs.datetime_from_iso8601("2012-01-01T23:30:00+02:00")
577 :param str value: The ISO8601-complying string to transform
578 :return: A datetime
579 :rtype: datetime
580 :raises ValueError: if value is an invalid date literal
582 """
583 try:
584 try:
585 return aniso8601.parse_datetime(value)
586 except ValueError:
587 date = aniso8601.parse_date(value)
588 return datetime(date.year, date.month, date.day)
589 except Exception:
590 raise ValueError('Invalid date literal "{0}"'.format(value))
593datetime_from_iso8601.__schema__ = {"type": "string", "format": "date-time"}
596def date_from_iso8601(value):
597 """
598 Turns an ISO8601 formatted date into a date object.
600 Example::
602 inputs.date_from_iso8601("2012-01-01")
606 :param str value: The ISO8601-complying string to transform
607 :return: A date
608 :rtype: date
609 :raises ValueError: if value is an invalid date literal
611 """
612 return datetime_from_iso8601(value).date()
615date_from_iso8601.__schema__ = {"type": "string", "format": "date"}