Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/flask_restx/inputs.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

275 statements  

1""" 

2This module provide some helpers for advanced types parsing. 

3 

4You can define you own parser using the same pattern: 

5 

6.. code-block:: python 

7 

8 def my_type(value): 

9 if not condition: 

10 raise ValueError('This is not my type') 

11 return parse(value) 

12 

13 # Swagger documentation 

14 my_type.__schema__ = {'type': 'string', 'format': 'my-custom-format'} 

15 

16The last line allows you to document properly the type in the Swagger documentation. 

17""" 

18 

19import re 

20import socket 

21 

22from datetime import datetime, time, timedelta, timezone 

23from email.utils import parsedate_tz, mktime_tz 

24from urllib.parse import urlparse 

25 

26import aniso8601 

27 

28# Constants for upgrading date-based intervals to full datetimes. 

29START_OF_DAY = time(0, 0, 0, tzinfo=timezone.utc) 

30END_OF_DAY = time(23, 59, 59, 999999, tzinfo=timezone.utc) 

31 

32 

33netloc_regex = re.compile( 

34 r"(?:(?P<auth>[^:@]+?(?::[^:@]*?)?)@)?" # basic auth 

35 r"(?:" 

36 r"(?P<localhost>localhost)|" # localhost... 

37 r"(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|" # ...or ipv4 

38 r"(?:\[?(?P<ipv6>[A-F0-9]*:[A-F0-9:]+)\]?)|" # ...or ipv6 

39 r"(?P<domain>(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?))" # domain... 

40 r")" 

41 r"(?::(?P<port>\d+))?" # optional port 

42 r"$", 

43 re.IGNORECASE, 

44) 

45 

46 

47email_regex = re.compile( 

48 r"^" "(?P<local>[^@]*[^@.])" r"@" r"(?P<server>[^@\.]+(?:\.[^@\.]+)*)" r"$", 

49 re.IGNORECASE, 

50) 

51 

52time_regex = re.compile(r"\d{2}:\d{2}") 

53 

54 

55def ipv4(value): 

56 """Validate an IPv4 address""" 

57 try: 

58 socket.inet_aton(value) 

59 if value.count(".") == 3: 

60 return value 

61 except socket.error: 

62 pass 

63 raise ValueError("{0} is not a valid ipv4 address".format(value)) 

64 

65 

66ipv4.__schema__ = {"type": "string", "format": "ipv4"} 

67 

68 

69def ipv6(value): 

70 """Validate an IPv6 address""" 

71 try: 

72 socket.inet_pton(socket.AF_INET6, value) 

73 return value 

74 except socket.error: 

75 raise ValueError("{0} is not a valid ipv4 address".format(value)) 

76 

77 

78ipv6.__schema__ = {"type": "string", "format": "ipv6"} 

79 

80 

81def ip(value): 

82 """Validate an IP address (both IPv4 and IPv6)""" 

83 try: 

84 return ipv4(value) 

85 except ValueError: 

86 pass 

87 try: 

88 return ipv6(value) 

89 except ValueError: 

90 raise ValueError("{0} is not a valid ip".format(value)) 

91 

92 

93ip.__schema__ = {"type": "string", "format": "ip"} 

94 

95 

96class URL(object): 

97 """ 

98 Validate an URL. 

99 

100 Example:: 

101 

102 parser = reqparse.RequestParser() 

103 parser.add_argument('url', type=inputs.URL(schemes=['http', 'https'])) 

104 

105 Input to the ``URL`` argument will be rejected 

106 if it does not match an URL with specified constraints. 

107 If ``check`` is True it will also be rejected if the domain does not exists. 

108 

109 :param bool check: Check the domain exists (perform a DNS resolution) 

110 :param bool ip: Allow IP (both ipv4/ipv6) as domain 

111 :param bool local: Allow localhost (both string or ip) as domain 

112 :param bool port: Allow a port to be present 

113 :param bool auth: Allow authentication to be present 

114 :param list|tuple schemes: Restrict valid schemes to this list 

115 :param list|tuple domains: Restrict valid domains to this list 

116 :param list|tuple exclude: Exclude some domains 

117 """ 

118 

119 def __init__( 

120 self, 

121 check=False, 

122 ip=False, 

123 local=False, 

124 port=False, 

125 auth=False, 

126 schemes=None, 

127 domains=None, 

128 exclude=None, 

129 ): 

130 self.check = check 

131 self.ip = ip 

132 self.local = local 

133 self.port = port 

134 self.auth = auth 

135 self.schemes = schemes 

136 self.domains = domains 

137 self.exclude = exclude 

138 

139 def error(self, value, details=None): 

140 msg = "{0} is not a valid URL" 

141 if details: 

142 msg = ". ".join((msg, details)) 

143 raise ValueError(msg.format(value)) 

144 

145 def __call__(self, value): 

146 parsed = urlparse(value) 

147 netloc_match = netloc_regex.match(parsed.netloc) 

148 if not all((parsed.scheme, parsed.netloc)): 

149 if netloc_regex.match( 

150 parsed.netloc or parsed.path.split("/", 1)[0].split("?", 1)[0] 

151 ): 

152 self.error(value, "Did you mean: http://{0}") 

153 self.error(value) 

154 if parsed.scheme and self.schemes and parsed.scheme not in self.schemes: 

155 self.error(value, "Protocol is not allowed") 

156 if not netloc_match: 

157 self.error(value) 

158 data = netloc_match.groupdict() 

159 if data["ipv4"] or data["ipv6"]: 

160 if not self.ip: 

161 self.error(value, "IP is not allowed") 

162 else: 

163 try: 

164 ip(data["ipv4"] or data["ipv6"]) 

165 except ValueError as e: 

166 self.error(value, str(e)) 

167 if not self.local: 

168 if data["ipv4"] and data["ipv4"].startswith("127."): 

169 self.error(value, "Localhost is not allowed") 

170 elif data["ipv6"] == "::1": 

171 self.error(value, "Localhost is not allowed") 

172 if self.check: 

173 pass 

174 if data["auth"] and not self.auth: 

175 self.error(value, "Authentication is not allowed") 

176 if data["localhost"] and not self.local: 

177 self.error(value, "Localhost is not allowed") 

178 if data["port"]: 

179 if not self.port: 

180 self.error(value, "Custom port is not allowed") 

181 else: 

182 port = int(data["port"]) 

183 if not 0 < port < 65535: 

184 self.error(value, "Port is out of range") 

185 if data["domain"]: 

186 if self.domains and data["domain"] not in self.domains: 

187 self.error(value, "Domain is not allowed") 

188 elif self.exclude and data["domain"] in self.exclude: 

189 self.error(value, "Domain is not allowed") 

190 if self.check: 

191 try: 

192 socket.getaddrinfo(data["domain"], None) 

193 except socket.error: 

194 self.error(value, "Domain does not exists") 

195 return value 

196 

197 @property 

198 def __schema__(self): 

199 return { 

200 "type": "string", 

201 "format": "url", 

202 } 

203 

204 

205#: Validate an URL 

206#: 

207#: Legacy validator, allows, auth, port, ip and local 

208#: Only allows schemes 'http', 'https', 'ftp' and 'ftps' 

209url = URL( 

210 ip=True, auth=True, port=True, local=True, schemes=("http", "https", "ftp", "ftps") 

211) 

212 

213 

214class email(object): 

215 """ 

216 Validate an email. 

217 

218 Example:: 

219 

220 parser = reqparse.RequestParser() 

221 parser.add_argument('email', type=inputs.email(dns=True)) 

222 

223 Input to the ``email`` argument will be rejected if it does not match an email 

224 and if domain does not exists. 

225 

226 :param bool check: Check the domain exists (perform a DNS resolution) 

227 :param bool ip: Allow IP (both ipv4/ipv6) as domain 

228 :param bool local: Allow localhost (both string or ip) as domain 

229 :param list|tuple domains: Restrict valid domains to this list 

230 :param list|tuple exclude: Exclude some domains 

231 """ 

232 

233 def __init__(self, check=False, ip=False, local=False, domains=None, exclude=None): 

234 self.check = check 

235 self.ip = ip 

236 self.local = local 

237 self.domains = domains 

238 self.exclude = exclude 

239 

240 def error(self, value, msg=None): 

241 msg = msg or "{0} is not a valid email" 

242 raise ValueError(msg.format(value)) 

243 

244 def is_ip(self, value): 

245 try: 

246 ip(value) 

247 return True 

248 except ValueError: 

249 return False 

250 

251 def __call__(self, value): 

252 match = email_regex.match(value) 

253 if not match or ".." in value: 

254 self.error(value) 

255 server = match.group("server") 

256 if self.check: 

257 try: 

258 socket.getaddrinfo(server, None) 

259 except socket.error: 

260 self.error(value) 

261 if self.domains and server not in self.domains: 

262 self.error(value, "{0} does not belong to the authorized domains") 

263 if self.exclude and server in self.exclude: 

264 self.error(value, "{0} belongs to a forbidden domain") 

265 if not self.local and ( 

266 server in ("localhost", "::1") or server.startswith("127.") 

267 ): 

268 self.error(value) 

269 if self.is_ip(server) and not self.ip: 

270 self.error(value) 

271 return value 

272 

273 @property 

274 def __schema__(self): 

275 return { 

276 "type": "string", 

277 "format": "email", 

278 } 

279 

280 

281class regex(object): 

282 """ 

283 Validate a string based on a regular expression. 

284 

285 Example:: 

286 

287 parser = reqparse.RequestParser() 

288 parser.add_argument('example', type=inputs.regex('^[0-9]+$')) 

289 

290 Input to the ``example`` argument will be rejected if it contains anything 

291 but numbers. 

292 

293 :param str pattern: The regular expression the input must match 

294 """ 

295 

296 def __init__(self, pattern): 

297 self.pattern = pattern 

298 self.re = re.compile(pattern) 

299 

300 def __call__(self, value): 

301 if not self.re.search(value): 

302 message = 'Value does not match pattern: "{0}"'.format(self.pattern) 

303 raise ValueError(message) 

304 return value 

305 

306 def __deepcopy__(self, memo): 

307 return regex(self.pattern) 

308 

309 @property 

310 def __schema__(self): 

311 return { 

312 "type": "string", 

313 "pattern": self.pattern, 

314 } 

315 

316 

317def _normalize_interval(start, end, value): 

318 """ 

319 Normalize datetime intervals. 

320 

321 Given a pair of datetime.date or datetime.datetime objects, 

322 returns a 2-tuple of tz-aware UTC datetimes spanning the same interval. 

323 

324 For datetime.date objects, the returned interval starts at 00:00:00.0 

325 on the first date and ends at 00:00:00.0 on the second. 

326 

327 Naive datetimes are upgraded to UTC. 

328 

329 Timezone-aware datetimes are normalized to the UTC tzdata. 

330 

331 Params: 

332 - start: A date or datetime 

333 - end: A date or datetime 

334 """ 

335 if not isinstance(start, datetime): 

336 start = datetime.combine(start, START_OF_DAY) 

337 end = datetime.combine(end, START_OF_DAY) 

338 

339 if start.tzinfo is None: 

340 start = start.replace(tzinfo=timezone.utc) 

341 end = end.replace(tzinfo=timezone.utc) 

342 else: 

343 start = start.astimezone(timezone.utc) 

344 end = end.astimezone(timezone.utc) 

345 

346 return start, end 

347 

348 

349def _expand_datetime(start, value): 

350 if not isinstance(start, datetime): 

351 # Expand a single date object to be the interval spanning 

352 # that entire day. 

353 end = start + timedelta(days=1) 

354 else: 

355 # Expand a datetime based on the finest resolution provided 

356 # in the original input string. 

357 time = value.split("T")[1] 

358 time_without_offset = re.sub("[+-].+", "", time) 

359 num_separators = time_without_offset.count(":") 

360 if num_separators == 0: 

361 # Hour resolution 

362 end = start + timedelta(hours=1) 

363 elif num_separators == 1: 

364 # Minute resolution: 

365 end = start + timedelta(minutes=1) 

366 else: 

367 # Second resolution 

368 end = start + timedelta(seconds=1) 

369 

370 return end 

371 

372 

373def _parse_interval(value): 

374 """ 

375 Do some nasty try/except voodoo to get some sort of datetime 

376 object(s) out of the string. 

377 """ 

378 try: 

379 return sorted(aniso8601.parse_interval(value)) 

380 except ValueError: 

381 try: 

382 return aniso8601.parse_datetime(value), None 

383 except ValueError: 

384 return aniso8601.parse_date(value), None 

385 

386 

387def iso8601interval(value, argument="argument"): 

388 """ 

389 Parses ISO 8601-formatted datetime intervals into tuples of datetimes. 

390 

391 Accepts both a single date(time) or a full interval using either start/end 

392 or start/duration notation, with the following behavior: 

393 

394 - Intervals are defined as inclusive start, exclusive end 

395 - Single datetimes are translated into the interval spanning the 

396 largest resolution not specified in the input value, up to the day. 

397 - The smallest accepted resolution is 1 second. 

398 - All timezones are accepted as values; returned datetimes are 

399 localized to UTC. Naive inputs and date inputs will are assumed UTC. 

400 

401 Examples:: 

402 

403 "2013-01-01" -> datetime(2013, 1, 1), datetime(2013, 1, 2) 

404 "2013-01-01T12" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 13) 

405 "2013-01-01/2013-02-28" -> datetime(2013, 1, 1), datetime(2013, 2, 28) 

406 "2013-01-01/P3D" -> datetime(2013, 1, 1), datetime(2013, 1, 4) 

407 "2013-01-01T12:00/PT30M" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 12, 30) 

408 "2013-01-01T06:00/2013-01-01T12:00" -> datetime(2013, 1, 1, 6), datetime(2013, 1, 1, 12) 

409 

410 :param str value: The ISO8601 date time as a string 

411 :return: Two UTC datetimes, the start and the end of the specified interval 

412 :rtype: A tuple (datetime, datetime) 

413 :raises ValueError: if the interval is invalid. 

414 """ 

415 if not value: 

416 raise ValueError("Expected a valid ISO8601 date/time interval.") 

417 

418 try: 

419 start, end = _parse_interval(value) 

420 

421 if end is None: 

422 end = _expand_datetime(start, value) 

423 

424 start, end = _normalize_interval(start, end, value) 

425 

426 except ValueError as e: 

427 msg = ( 

428 "Invalid {arg}: {value}. {arg} must be a valid ISO8601 date/time interval." 

429 ) 

430 raise ValueError(msg.format(arg=argument, value=value)) from e 

431 

432 return start, end 

433 

434 

435iso8601interval.__schema__ = {"type": "string", "format": "iso8601-interval"} 

436 

437 

438def date(value): 

439 """Parse a valid looking date in the format YYYY-mm-dd""" 

440 date = datetime.strptime(value, "%Y-%m-%d") 

441 return date 

442 

443 

444date.__schema__ = {"type": "string", "format": "date"} 

445 

446 

447def _get_integer(value): 

448 try: 

449 return int(value) 

450 except (TypeError, ValueError): 

451 raise ValueError("{0} is not a valid integer".format(value)) 

452 

453 

454def natural(value, argument="argument"): 

455 """Restrict input type to the natural numbers (0, 1, 2, 3...)""" 

456 value = _get_integer(value) 

457 if value < 0: 

458 msg = "Invalid {arg}: {value}. {arg} must be a non-negative integer" 

459 raise ValueError(msg.format(arg=argument, value=value)) 

460 return value 

461 

462 

463natural.__schema__ = {"type": "integer", "minimum": 0} 

464 

465 

466def positive(value, argument="argument"): 

467 """Restrict input type to the positive integers (1, 2, 3...)""" 

468 value = _get_integer(value) 

469 if value < 1: 

470 msg = "Invalid {arg}: {value}. {arg} must be a positive integer" 

471 raise ValueError(msg.format(arg=argument, value=value)) 

472 return value 

473 

474 

475positive.__schema__ = {"type": "integer", "minimum": 0, "exclusiveMinimum": True} 

476 

477 

478class int_range(object): 

479 """Restrict input to an integer in a range (inclusive)""" 

480 

481 def __init__(self, low, high, argument="argument"): 

482 self.low = low 

483 self.high = high 

484 self.argument = argument 

485 

486 def __call__(self, value): 

487 value = _get_integer(value) 

488 if value < self.low or value > self.high: 

489 msg = "Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}" 

490 raise ValueError( 

491 msg.format(arg=self.argument, val=value, lo=self.low, hi=self.high) 

492 ) 

493 return value 

494 

495 @property 

496 def __schema__(self): 

497 return { 

498 "type": "integer", 

499 "minimum": self.low, 

500 "maximum": self.high, 

501 } 

502 

503 

504def boolean(value): 

505 """ 

506 Parse the string ``"true"`` or ``"false"`` as a boolean (case insensitive). 

507 

508 Also accepts ``"1"`` and ``"0"`` as ``True``/``False`` (respectively). 

509 

510 If the input is from the request JSON body, the type is already a native python boolean, 

511 and will be passed through without further parsing. 

512 

513 :raises ValueError: if the boolean value is invalid 

514 """ 

515 if isinstance(value, bool): 

516 return value 

517 

518 if value is None: 

519 raise ValueError("boolean type must be non-null") 

520 elif not value: 

521 return False 

522 value = str(value).lower() 

523 if value in ( 

524 "true", 

525 "1", 

526 "on", 

527 ): 

528 return True 

529 if value in ( 

530 "false", 

531 "0", 

532 ): 

533 return False 

534 raise ValueError("Invalid literal for boolean(): {0}".format(value)) 

535 

536 

537boolean.__schema__ = {"type": "boolean"} 

538 

539 

540def datetime_from_rfc822(value): 

541 """ 

542 Turns an RFC822 formatted date into a datetime object. 

543 

544 Example:: 

545 

546 inputs.datetime_from_rfc822('Wed, 02 Oct 2002 08:00:00 EST') 

547 

548 :param str value: The RFC822-complying string to transform 

549 :return: The parsed datetime 

550 :rtype: datetime 

551 :raises ValueError: if value is an invalid date literal 

552 

553 """ 

554 raw = value 

555 if not time_regex.search(value): 

556 value = " ".join((value, "00:00:00")) 

557 try: 

558 timetuple = parsedate_tz(value) 

559 timestamp = mktime_tz(timetuple) 

560 if timetuple[-1] is None: 

561 return datetime.fromtimestamp(timestamp).replace(tzinfo=timezone.utc) 

562 else: 

563 return datetime.fromtimestamp(timestamp, timezone.utc) 

564 except Exception: 

565 raise ValueError('Invalid date literal "{0}"'.format(raw)) 

566 

567 

568def datetime_from_iso8601(value): 

569 """ 

570 Turns an ISO8601 formatted date into a datetime object. 

571 

572 Example:: 

573 

574 inputs.datetime_from_iso8601("2012-01-01T23:30:00+02:00") 

575 

576 :param str value: The ISO8601-complying string to transform 

577 :return: A datetime 

578 :rtype: datetime 

579 :raises ValueError: if value is an invalid date literal 

580 

581 """ 

582 try: 

583 try: 

584 return aniso8601.parse_datetime(value) 

585 except ValueError: 

586 date = aniso8601.parse_date(value) 

587 return datetime(date.year, date.month, date.day) 

588 except Exception: 

589 raise ValueError('Invalid date literal "{0}"'.format(value)) 

590 

591 

592datetime_from_iso8601.__schema__ = {"type": "string", "format": "date-time"} 

593 

594 

595def date_from_iso8601(value): 

596 """ 

597 Turns an ISO8601 formatted date into a date object. 

598 

599 Example:: 

600 

601 inputs.date_from_iso8601("2012-01-01") 

602 

603 

604 

605 :param str value: The ISO8601-complying string to transform 

606 :return: A date 

607 :rtype: date 

608 :raises ValueError: if value is an invalid date literal 

609 

610 """ 

611 return datetime_from_iso8601(value).date() 

612 

613 

614date_from_iso8601.__schema__ = {"type": "string", "format": "date"}