1"""
2This module provide some helpers for advanced types parsing.
3
4You can define you own parser using the same pattern:
5
6.. code-block:: python
7
8 def my_type(value):
9 if not condition:
10 raise ValueError('This is not my type')
11 return parse(value)
12
13 # Swagger documentation
14 my_type.__schema__ = {'type': 'string', 'format': 'my-custom-format'}
15
16The last line allows you to document properly the type in the Swagger documentation.
17"""
18
19import re
20import socket
21
22from datetime import datetime, time, timedelta, timezone
23from email.utils import parsedate_tz, mktime_tz
24from urllib.parse import urlparse
25
26import aniso8601
27
28# Constants for upgrading date-based intervals to full datetimes.
29START_OF_DAY = time(0, 0, 0, tzinfo=timezone.utc)
30END_OF_DAY = time(23, 59, 59, 999999, tzinfo=timezone.utc)
31
32
33netloc_regex = re.compile(
34 r"(?:(?P<auth>[^:@]+?(?::[^:@]*?)?)@)?" # basic auth
35 r"(?:"
36 r"(?P<localhost>localhost)|" # localhost...
37 r"(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|" # ...or ipv4
38 r"(?:\[?(?P<ipv6>[A-F0-9]*:[A-F0-9:]+)\]?)|" # ...or ipv6
39 r"(?P<domain>(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?))" # domain...
40 r")"
41 r"(?::(?P<port>\d+))?" # optional port
42 r"$",
43 re.IGNORECASE,
44)
45
46
47email_regex = re.compile(
48 r"^" "(?P<local>[^@]*[^@.])" r"@" r"(?P<server>[^@\.]+(?:\.[^@\.]+)*)" r"$",
49 re.IGNORECASE,
50)
51
52time_regex = re.compile(r"\d{2}:\d{2}")
53
54
55def ipv4(value):
56 """Validate an IPv4 address"""
57 try:
58 socket.inet_aton(value)
59 if value.count(".") == 3:
60 return value
61 except socket.error:
62 pass
63 raise ValueError("{0} is not a valid ipv4 address".format(value))
64
65
66ipv4.__schema__ = {"type": "string", "format": "ipv4"}
67
68
69def ipv6(value):
70 """Validate an IPv6 address"""
71 try:
72 socket.inet_pton(socket.AF_INET6, value)
73 return value
74 except socket.error:
75 raise ValueError("{0} is not a valid ipv4 address".format(value))
76
77
78ipv6.__schema__ = {"type": "string", "format": "ipv6"}
79
80
81def ip(value):
82 """Validate an IP address (both IPv4 and IPv6)"""
83 try:
84 return ipv4(value)
85 except ValueError:
86 pass
87 try:
88 return ipv6(value)
89 except ValueError:
90 raise ValueError("{0} is not a valid ip".format(value))
91
92
93ip.__schema__ = {"type": "string", "format": "ip"}
94
95
96class URL(object):
97 """
98 Validate an URL.
99
100 Example::
101
102 parser = reqparse.RequestParser()
103 parser.add_argument('url', type=inputs.URL(schemes=['http', 'https']))
104
105 Input to the ``URL`` argument will be rejected
106 if it does not match an URL with specified constraints.
107 If ``check`` is True it will also be rejected if the domain does not exists.
108
109 :param bool check: Check the domain exists (perform a DNS resolution)
110 :param bool ip: Allow IP (both ipv4/ipv6) as domain
111 :param bool local: Allow localhost (both string or ip) as domain
112 :param bool port: Allow a port to be present
113 :param bool auth: Allow authentication to be present
114 :param list|tuple schemes: Restrict valid schemes to this list
115 :param list|tuple domains: Restrict valid domains to this list
116 :param list|tuple exclude: Exclude some domains
117 """
118
119 def __init__(
120 self,
121 check=False,
122 ip=False,
123 local=False,
124 port=False,
125 auth=False,
126 schemes=None,
127 domains=None,
128 exclude=None,
129 ):
130 self.check = check
131 self.ip = ip
132 self.local = local
133 self.port = port
134 self.auth = auth
135 self.schemes = schemes
136 self.domains = domains
137 self.exclude = exclude
138
139 def error(self, value, details=None):
140 msg = "{0} is not a valid URL"
141 if details:
142 msg = ". ".join((msg, details))
143 raise ValueError(msg.format(value))
144
145 def __call__(self, value):
146 parsed = urlparse(value)
147 netloc_match = netloc_regex.match(parsed.netloc)
148 if not all((parsed.scheme, parsed.netloc)):
149 if netloc_regex.match(
150 parsed.netloc or parsed.path.split("/", 1)[0].split("?", 1)[0]
151 ):
152 self.error(value, "Did you mean: http://{0}")
153 self.error(value)
154 if parsed.scheme and self.schemes and parsed.scheme not in self.schemes:
155 self.error(value, "Protocol is not allowed")
156 if not netloc_match:
157 self.error(value)
158 data = netloc_match.groupdict()
159 if data["ipv4"] or data["ipv6"]:
160 if not self.ip:
161 self.error(value, "IP is not allowed")
162 else:
163 try:
164 ip(data["ipv4"] or data["ipv6"])
165 except ValueError as e:
166 self.error(value, str(e))
167 if not self.local:
168 if data["ipv4"] and data["ipv4"].startswith("127."):
169 self.error(value, "Localhost is not allowed")
170 elif data["ipv6"] == "::1":
171 self.error(value, "Localhost is not allowed")
172 if self.check:
173 pass
174 if data["auth"] and not self.auth:
175 self.error(value, "Authentication is not allowed")
176 if data["localhost"] and not self.local:
177 self.error(value, "Localhost is not allowed")
178 if data["port"]:
179 if not self.port:
180 self.error(value, "Custom port is not allowed")
181 else:
182 port = int(data["port"])
183 if not 0 < port < 65535:
184 self.error(value, "Port is out of range")
185 if data["domain"]:
186 if self.domains and data["domain"] not in self.domains:
187 self.error(value, "Domain is not allowed")
188 elif self.exclude and data["domain"] in self.exclude:
189 self.error(value, "Domain is not allowed")
190 if self.check:
191 try:
192 socket.getaddrinfo(data["domain"], None)
193 except socket.error:
194 self.error(value, "Domain does not exists")
195 return value
196
197 @property
198 def __schema__(self):
199 return {
200 "type": "string",
201 "format": "url",
202 }
203
204
205#: Validate an URL
206#:
207#: Legacy validator, allows, auth, port, ip and local
208#: Only allows schemes 'http', 'https', 'ftp' and 'ftps'
209url = URL(
210 ip=True, auth=True, port=True, local=True, schemes=("http", "https", "ftp", "ftps")
211)
212
213
214class email(object):
215 """
216 Validate an email.
217
218 Example::
219
220 parser = reqparse.RequestParser()
221 parser.add_argument('email', type=inputs.email(dns=True))
222
223 Input to the ``email`` argument will be rejected if it does not match an email
224 and if domain does not exists.
225
226 :param bool check: Check the domain exists (perform a DNS resolution)
227 :param bool ip: Allow IP (both ipv4/ipv6) as domain
228 :param bool local: Allow localhost (both string or ip) as domain
229 :param list|tuple domains: Restrict valid domains to this list
230 :param list|tuple exclude: Exclude some domains
231 """
232
233 def __init__(self, check=False, ip=False, local=False, domains=None, exclude=None):
234 self.check = check
235 self.ip = ip
236 self.local = local
237 self.domains = domains
238 self.exclude = exclude
239
240 def error(self, value, msg=None):
241 msg = msg or "{0} is not a valid email"
242 raise ValueError(msg.format(value))
243
244 def is_ip(self, value):
245 try:
246 ip(value)
247 return True
248 except ValueError:
249 return False
250
251 def __call__(self, value):
252 match = email_regex.match(value)
253 if not match or ".." in value:
254 self.error(value)
255 server = match.group("server")
256 if self.check:
257 try:
258 socket.getaddrinfo(server, None)
259 except socket.error:
260 self.error(value)
261 if self.domains and server not in self.domains:
262 self.error(value, "{0} does not belong to the authorized domains")
263 if self.exclude and server in self.exclude:
264 self.error(value, "{0} belongs to a forbidden domain")
265 if not self.local and (
266 server in ("localhost", "::1") or server.startswith("127.")
267 ):
268 self.error(value)
269 if self.is_ip(server) and not self.ip:
270 self.error(value)
271 return value
272
273 @property
274 def __schema__(self):
275 return {
276 "type": "string",
277 "format": "email",
278 }
279
280
281class regex(object):
282 """
283 Validate a string based on a regular expression.
284
285 Example::
286
287 parser = reqparse.RequestParser()
288 parser.add_argument('example', type=inputs.regex('^[0-9]+$'))
289
290 Input to the ``example`` argument will be rejected if it contains anything
291 but numbers.
292
293 :param str pattern: The regular expression the input must match
294 """
295
296 def __init__(self, pattern):
297 self.pattern = pattern
298 self.re = re.compile(pattern)
299
300 def __call__(self, value):
301 if not self.re.search(value):
302 message = 'Value does not match pattern: "{0}"'.format(self.pattern)
303 raise ValueError(message)
304 return value
305
306 def __deepcopy__(self, memo):
307 return regex(self.pattern)
308
309 @property
310 def __schema__(self):
311 return {
312 "type": "string",
313 "pattern": self.pattern,
314 }
315
316
317def _normalize_interval(start, end, value):
318 """
319 Normalize datetime intervals.
320
321 Given a pair of datetime.date or datetime.datetime objects,
322 returns a 2-tuple of tz-aware UTC datetimes spanning the same interval.
323
324 For datetime.date objects, the returned interval starts at 00:00:00.0
325 on the first date and ends at 00:00:00.0 on the second.
326
327 Naive datetimes are upgraded to UTC.
328
329 Timezone-aware datetimes are normalized to the UTC tzdata.
330
331 Params:
332 - start: A date or datetime
333 - end: A date or datetime
334 """
335 if not isinstance(start, datetime):
336 start = datetime.combine(start, START_OF_DAY)
337 end = datetime.combine(end, START_OF_DAY)
338
339 if start.tzinfo is None:
340 start = start.replace(tzinfo=timezone.utc)
341 end = end.replace(tzinfo=timezone.utc)
342 else:
343 start = start.astimezone(timezone.utc)
344 end = end.astimezone(timezone.utc)
345
346 return start, end
347
348
349def _expand_datetime(start, value):
350 if not isinstance(start, datetime):
351 # Expand a single date object to be the interval spanning
352 # that entire day.
353 end = start + timedelta(days=1)
354 else:
355 # Expand a datetime based on the finest resolution provided
356 # in the original input string.
357 time = value.split("T")[1]
358 time_without_offset = re.sub("[+-].+", "", time)
359 num_separators = time_without_offset.count(":")
360 if num_separators == 0:
361 # Hour resolution
362 end = start + timedelta(hours=1)
363 elif num_separators == 1:
364 # Minute resolution:
365 end = start + timedelta(minutes=1)
366 else:
367 # Second resolution
368 end = start + timedelta(seconds=1)
369
370 return end
371
372
373def _parse_interval(value):
374 """
375 Do some nasty try/except voodoo to get some sort of datetime
376 object(s) out of the string.
377 """
378 try:
379 return sorted(aniso8601.parse_interval(value))
380 except ValueError:
381 try:
382 return aniso8601.parse_datetime(value), None
383 except ValueError:
384 return aniso8601.parse_date(value), None
385
386
387def iso8601interval(value, argument="argument"):
388 """
389 Parses ISO 8601-formatted datetime intervals into tuples of datetimes.
390
391 Accepts both a single date(time) or a full interval using either start/end
392 or start/duration notation, with the following behavior:
393
394 - Intervals are defined as inclusive start, exclusive end
395 - Single datetimes are translated into the interval spanning the
396 largest resolution not specified in the input value, up to the day.
397 - The smallest accepted resolution is 1 second.
398 - All timezones are accepted as values; returned datetimes are
399 localized to UTC. Naive inputs and date inputs will are assumed UTC.
400
401 Examples::
402
403 "2013-01-01" -> datetime(2013, 1, 1), datetime(2013, 1, 2)
404 "2013-01-01T12" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 13)
405 "2013-01-01/2013-02-28" -> datetime(2013, 1, 1), datetime(2013, 2, 28)
406 "2013-01-01/P3D" -> datetime(2013, 1, 1), datetime(2013, 1, 4)
407 "2013-01-01T12:00/PT30M" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 12, 30)
408 "2013-01-01T06:00/2013-01-01T12:00" -> datetime(2013, 1, 1, 6), datetime(2013, 1, 1, 12)
409
410 :param str value: The ISO8601 date time as a string
411 :return: Two UTC datetimes, the start and the end of the specified interval
412 :rtype: A tuple (datetime, datetime)
413 :raises ValueError: if the interval is invalid.
414 """
415 if not value:
416 raise ValueError("Expected a valid ISO8601 date/time interval.")
417
418 try:
419 start, end = _parse_interval(value)
420
421 if end is None:
422 end = _expand_datetime(start, value)
423
424 start, end = _normalize_interval(start, end, value)
425
426 except ValueError as e:
427 msg = (
428 "Invalid {arg}: {value}. {arg} must be a valid ISO8601 date/time interval."
429 )
430 raise ValueError(msg.format(arg=argument, value=value)) from e
431
432 return start, end
433
434
435iso8601interval.__schema__ = {"type": "string", "format": "iso8601-interval"}
436
437
438def date(value):
439 """Parse a valid looking date in the format YYYY-mm-dd"""
440 date = datetime.strptime(value, "%Y-%m-%d")
441 return date
442
443
444date.__schema__ = {"type": "string", "format": "date"}
445
446
447def _get_integer(value):
448 try:
449 return int(value)
450 except (TypeError, ValueError):
451 raise ValueError("{0} is not a valid integer".format(value))
452
453
454def natural(value, argument="argument"):
455 """Restrict input type to the natural numbers (0, 1, 2, 3...)"""
456 value = _get_integer(value)
457 if value < 0:
458 msg = "Invalid {arg}: {value}. {arg} must be a non-negative integer"
459 raise ValueError(msg.format(arg=argument, value=value))
460 return value
461
462
463natural.__schema__ = {"type": "integer", "minimum": 0}
464
465
466def positive(value, argument="argument"):
467 """Restrict input type to the positive integers (1, 2, 3...)"""
468 value = _get_integer(value)
469 if value < 1:
470 msg = "Invalid {arg}: {value}. {arg} must be a positive integer"
471 raise ValueError(msg.format(arg=argument, value=value))
472 return value
473
474
475positive.__schema__ = {"type": "integer", "minimum": 0, "exclusiveMinimum": True}
476
477
478class int_range(object):
479 """Restrict input to an integer in a range (inclusive)"""
480
481 def __init__(self, low, high, argument="argument"):
482 self.low = low
483 self.high = high
484 self.argument = argument
485
486 def __call__(self, value):
487 value = _get_integer(value)
488 if value < self.low or value > self.high:
489 msg = "Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}"
490 raise ValueError(
491 msg.format(arg=self.argument, val=value, lo=self.low, hi=self.high)
492 )
493 return value
494
495 @property
496 def __schema__(self):
497 return {
498 "type": "integer",
499 "minimum": self.low,
500 "maximum": self.high,
501 }
502
503
504def boolean(value):
505 """
506 Parse the string ``"true"`` or ``"false"`` as a boolean (case insensitive).
507
508 Also accepts ``"1"`` and ``"0"`` as ``True``/``False`` (respectively).
509
510 If the input is from the request JSON body, the type is already a native python boolean,
511 and will be passed through without further parsing.
512
513 :raises ValueError: if the boolean value is invalid
514 """
515 if isinstance(value, bool):
516 return value
517
518 if value is None:
519 raise ValueError("boolean type must be non-null")
520 elif not value:
521 return False
522 value = str(value).lower()
523 if value in (
524 "true",
525 "1",
526 "on",
527 ):
528 return True
529 if value in (
530 "false",
531 "0",
532 ):
533 return False
534 raise ValueError("Invalid literal for boolean(): {0}".format(value))
535
536
537boolean.__schema__ = {"type": "boolean"}
538
539
540def datetime_from_rfc822(value):
541 """
542 Turns an RFC822 formatted date into a datetime object.
543
544 Example::
545
546 inputs.datetime_from_rfc822('Wed, 02 Oct 2002 08:00:00 EST')
547
548 :param str value: The RFC822-complying string to transform
549 :return: The parsed datetime
550 :rtype: datetime
551 :raises ValueError: if value is an invalid date literal
552
553 """
554 raw = value
555 if not time_regex.search(value):
556 value = " ".join((value, "00:00:00"))
557 try:
558 timetuple = parsedate_tz(value)
559 timestamp = mktime_tz(timetuple)
560 if timetuple[-1] is None:
561 return datetime.fromtimestamp(timestamp).replace(tzinfo=timezone.utc)
562 else:
563 return datetime.fromtimestamp(timestamp, timezone.utc)
564 except Exception:
565 raise ValueError('Invalid date literal "{0}"'.format(raw))
566
567
568def datetime_from_iso8601(value):
569 """
570 Turns an ISO8601 formatted date into a datetime object.
571
572 Example::
573
574 inputs.datetime_from_iso8601("2012-01-01T23:30:00+02:00")
575
576 :param str value: The ISO8601-complying string to transform
577 :return: A datetime
578 :rtype: datetime
579 :raises ValueError: if value is an invalid date literal
580
581 """
582 try:
583 try:
584 return aniso8601.parse_datetime(value)
585 except ValueError:
586 date = aniso8601.parse_date(value)
587 return datetime(date.year, date.month, date.day)
588 except Exception:
589 raise ValueError('Invalid date literal "{0}"'.format(value))
590
591
592datetime_from_iso8601.__schema__ = {"type": "string", "format": "date-time"}
593
594
595def date_from_iso8601(value):
596 """
597 Turns an ISO8601 formatted date into a date object.
598
599 Example::
600
601 inputs.date_from_iso8601("2012-01-01")
602
603
604
605 :param str value: The ISO8601-complying string to transform
606 :return: A date
607 :rtype: date
608 :raises ValueError: if value is an invalid date literal
609
610 """
611 return datetime_from_iso8601(value).date()
612
613
614date_from_iso8601.__schema__ = {"type": "string", "format": "date"}