Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/flask_restx/inputs.py: 26%

274 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:03 +0000

1""" 

2This module provide some helpers for advanced types parsing. 

3 

4You can define you own parser using the same pattern: 

5 

6.. code-block:: python 

7 

8 def my_type(value): 

9 if not condition: 

10 raise ValueError('This is not my type') 

11 return parse(value) 

12 

13 # Swagger documentation 

14 my_type.__schema__ = {'type': 'string', 'format': 'my-custom-format'} 

15 

16The last line allows you to document properly the type in the Swagger documentation. 

17""" 

18 

19import re 

20import socket 

21 

22from datetime import datetime, time, timedelta 

23from email.utils import parsedate_tz, mktime_tz 

24from urllib.parse import urlparse 

25 

26import aniso8601 

27import pytz 

28 

29# Constants for upgrading date-based intervals to full datetimes. 

30START_OF_DAY = time(0, 0, 0, tzinfo=pytz.UTC) 

31END_OF_DAY = time(23, 59, 59, 999999, tzinfo=pytz.UTC) 

32 

33 

34netloc_regex = re.compile( 

35 r"(?:(?P<auth>[^:@]+?(?::[^:@]*?)?)@)?" # basic auth 

36 r"(?:" 

37 r"(?P<localhost>localhost)|" # localhost... 

38 r"(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|" # ...or ipv4 

39 r"(?:\[?(?P<ipv6>[A-F0-9]*:[A-F0-9:]+)\]?)|" # ...or ipv6 

40 r"(?P<domain>(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?))" # domain... 

41 r")" 

42 r"(?::(?P<port>\d+))?" # optional port 

43 r"$", 

44 re.IGNORECASE, 

45) 

46 

47 

48email_regex = re.compile( 

49 r"^" "(?P<local>[^@]*[^@.])" r"@" r"(?P<server>[^@\.]+(?:\.[^@\.]+)*)" r"$", 

50 re.IGNORECASE, 

51) 

52 

53time_regex = re.compile(r"\d{2}:\d{2}") 

54 

55 

56def ipv4(value): 

57 """Validate an IPv4 address""" 

58 try: 

59 socket.inet_aton(value) 

60 if value.count(".") == 3: 

61 return value 

62 except socket.error: 

63 pass 

64 raise ValueError("{0} is not a valid ipv4 address".format(value)) 

65 

66 

67ipv4.__schema__ = {"type": "string", "format": "ipv4"} 

68 

69 

70def ipv6(value): 

71 """Validate an IPv6 address""" 

72 try: 

73 socket.inet_pton(socket.AF_INET6, value) 

74 return value 

75 except socket.error: 

76 raise ValueError("{0} is not a valid ipv4 address".format(value)) 

77 

78 

79ipv6.__schema__ = {"type": "string", "format": "ipv6"} 

80 

81 

82def ip(value): 

83 """Validate an IP address (both IPv4 and IPv6)""" 

84 try: 

85 return ipv4(value) 

86 except ValueError: 

87 pass 

88 try: 

89 return ipv6(value) 

90 except ValueError: 

91 raise ValueError("{0} is not a valid ip".format(value)) 

92 

93 

94ip.__schema__ = {"type": "string", "format": "ip"} 

95 

96 

97class URL(object): 

98 """ 

99 Validate an URL. 

100 

101 Example:: 

102 

103 parser = reqparse.RequestParser() 

104 parser.add_argument('url', type=inputs.URL(schemes=['http', 'https'])) 

105 

106 Input to the ``URL`` argument will be rejected 

107 if it does not match an URL with specified constraints. 

108 If ``check`` is True it will also be rejected if the domain does not exists. 

109 

110 :param bool check: Check the domain exists (perform a DNS resolution) 

111 :param bool ip: Allow IP (both ipv4/ipv6) as domain 

112 :param bool local: Allow localhost (both string or ip) as domain 

113 :param bool port: Allow a port to be present 

114 :param bool auth: Allow authentication to be present 

115 :param list|tuple schemes: Restrict valid schemes to this list 

116 :param list|tuple domains: Restrict valid domains to this list 

117 :param list|tuple exclude: Exclude some domains 

118 """ 

119 

120 def __init__( 

121 self, 

122 check=False, 

123 ip=False, 

124 local=False, 

125 port=False, 

126 auth=False, 

127 schemes=None, 

128 domains=None, 

129 exclude=None, 

130 ): 

131 self.check = check 

132 self.ip = ip 

133 self.local = local 

134 self.port = port 

135 self.auth = auth 

136 self.schemes = schemes 

137 self.domains = domains 

138 self.exclude = exclude 

139 

140 def error(self, value, details=None): 

141 msg = "{0} is not a valid URL" 

142 if details: 

143 msg = ". ".join((msg, details)) 

144 raise ValueError(msg.format(value)) 

145 

146 def __call__(self, value): 

147 parsed = urlparse(value) 

148 netloc_match = netloc_regex.match(parsed.netloc) 

149 if not all((parsed.scheme, parsed.netloc)): 

150 if netloc_regex.match( 

151 parsed.netloc or parsed.path.split("/", 1)[0].split("?", 1)[0] 

152 ): 

153 self.error(value, "Did you mean: http://{0}") 

154 self.error(value) 

155 if parsed.scheme and self.schemes and parsed.scheme not in self.schemes: 

156 self.error(value, "Protocol is not allowed") 

157 if not netloc_match: 

158 self.error(value) 

159 data = netloc_match.groupdict() 

160 if data["ipv4"] or data["ipv6"]: 

161 if not self.ip: 

162 self.error(value, "IP is not allowed") 

163 else: 

164 try: 

165 ip(data["ipv4"] or data["ipv6"]) 

166 except ValueError as e: 

167 self.error(value, str(e)) 

168 if not self.local: 

169 if data["ipv4"] and data["ipv4"].startswith("127."): 

170 self.error(value, "Localhost is not allowed") 

171 elif data["ipv6"] == "::1": 

172 self.error(value, "Localhost is not allowed") 

173 if self.check: 

174 pass 

175 if data["auth"] and not self.auth: 

176 self.error(value, "Authentication is not allowed") 

177 if data["localhost"] and not self.local: 

178 self.error(value, "Localhost is not allowed") 

179 if data["port"]: 

180 if not self.port: 

181 self.error(value, "Custom port is not allowed") 

182 else: 

183 port = int(data["port"]) 

184 if not 0 < port < 65535: 

185 self.error(value, "Port is out of range") 

186 if data["domain"]: 

187 if self.domains and data["domain"] not in self.domains: 

188 self.error(value, "Domain is not allowed") 

189 elif self.exclude and data["domain"] in self.exclude: 

190 self.error(value, "Domain is not allowed") 

191 if self.check: 

192 try: 

193 socket.getaddrinfo(data["domain"], None) 

194 except socket.error: 

195 self.error(value, "Domain does not exists") 

196 return value 

197 

198 @property 

199 def __schema__(self): 

200 return { 

201 "type": "string", 

202 "format": "url", 

203 } 

204 

205 

206#: Validate an URL 

207#: 

208#: Legacy validator, allows, auth, port, ip and local 

209#: Only allows schemes 'http', 'https', 'ftp' and 'ftps' 

210url = URL( 

211 ip=True, auth=True, port=True, local=True, schemes=("http", "https", "ftp", "ftps") 

212) 

213 

214 

215class email(object): 

216 """ 

217 Validate an email. 

218 

219 Example:: 

220 

221 parser = reqparse.RequestParser() 

222 parser.add_argument('email', type=inputs.email(dns=True)) 

223 

224 Input to the ``email`` argument will be rejected if it does not match an email 

225 and if domain does not exists. 

226 

227 :param bool check: Check the domain exists (perform a DNS resolution) 

228 :param bool ip: Allow IP (both ipv4/ipv6) as domain 

229 :param bool local: Allow localhost (both string or ip) as domain 

230 :param list|tuple domains: Restrict valid domains to this list 

231 :param list|tuple exclude: Exclude some domains 

232 """ 

233 

234 def __init__(self, check=False, ip=False, local=False, domains=None, exclude=None): 

235 self.check = check 

236 self.ip = ip 

237 self.local = local 

238 self.domains = domains 

239 self.exclude = exclude 

240 

241 def error(self, value, msg=None): 

242 msg = msg or "{0} is not a valid email" 

243 raise ValueError(msg.format(value)) 

244 

245 def is_ip(self, value): 

246 try: 

247 ip(value) 

248 return True 

249 except ValueError: 

250 return False 

251 

252 def __call__(self, value): 

253 match = email_regex.match(value) 

254 if not match or ".." in value: 

255 self.error(value) 

256 server = match.group("server") 

257 if self.check: 

258 try: 

259 socket.getaddrinfo(server, None) 

260 except socket.error: 

261 self.error(value) 

262 if self.domains and server not in self.domains: 

263 self.error(value, "{0} does not belong to the authorized domains") 

264 if self.exclude and server in self.exclude: 

265 self.error(value, "{0} belongs to a forbidden domain") 

266 if not self.local and ( 

267 server in ("localhost", "::1") or server.startswith("127.") 

268 ): 

269 self.error(value) 

270 if self.is_ip(server) and not self.ip: 

271 self.error(value) 

272 return value 

273 

274 @property 

275 def __schema__(self): 

276 return { 

277 "type": "string", 

278 "format": "email", 

279 } 

280 

281 

282class regex(object): 

283 """ 

284 Validate a string based on a regular expression. 

285 

286 Example:: 

287 

288 parser = reqparse.RequestParser() 

289 parser.add_argument('example', type=inputs.regex('^[0-9]+$')) 

290 

291 Input to the ``example`` argument will be rejected if it contains anything 

292 but numbers. 

293 

294 :param str pattern: The regular expression the input must match 

295 """ 

296 

297 def __init__(self, pattern): 

298 self.pattern = pattern 

299 self.re = re.compile(pattern) 

300 

301 def __call__(self, value): 

302 if not self.re.search(value): 

303 message = 'Value does not match pattern: "{0}"'.format(self.pattern) 

304 raise ValueError(message) 

305 return value 

306 

307 def __deepcopy__(self, memo): 

308 return regex(self.pattern) 

309 

310 @property 

311 def __schema__(self): 

312 return { 

313 "type": "string", 

314 "pattern": self.pattern, 

315 } 

316 

317 

318def _normalize_interval(start, end, value): 

319 """ 

320 Normalize datetime intervals. 

321 

322 Given a pair of datetime.date or datetime.datetime objects, 

323 returns a 2-tuple of tz-aware UTC datetimes spanning the same interval. 

324 

325 For datetime.date objects, the returned interval starts at 00:00:00.0 

326 on the first date and ends at 00:00:00.0 on the second. 

327 

328 Naive datetimes are upgraded to UTC. 

329 

330 Timezone-aware datetimes are normalized to the UTC tzdata. 

331 

332 Params: 

333 - start: A date or datetime 

334 - end: A date or datetime 

335 """ 

336 if not isinstance(start, datetime): 

337 start = datetime.combine(start, START_OF_DAY) 

338 end = datetime.combine(end, START_OF_DAY) 

339 

340 if start.tzinfo is None: 

341 start = pytz.UTC.localize(start) 

342 end = pytz.UTC.localize(end) 

343 else: 

344 start = start.astimezone(pytz.UTC) 

345 end = end.astimezone(pytz.UTC) 

346 

347 return start, end 

348 

349 

350def _expand_datetime(start, value): 

351 if not isinstance(start, datetime): 

352 # Expand a single date object to be the interval spanning 

353 # that entire day. 

354 end = start + timedelta(days=1) 

355 else: 

356 # Expand a datetime based on the finest resolution provided 

357 # in the original input string. 

358 time = value.split("T")[1] 

359 time_without_offset = re.sub("[+-].+", "", time) 

360 num_separators = time_without_offset.count(":") 

361 if num_separators == 0: 

362 # Hour resolution 

363 end = start + timedelta(hours=1) 

364 elif num_separators == 1: 

365 # Minute resolution: 

366 end = start + timedelta(minutes=1) 

367 else: 

368 # Second resolution 

369 end = start + timedelta(seconds=1) 

370 

371 return end 

372 

373 

374def _parse_interval(value): 

375 """ 

376 Do some nasty try/except voodoo to get some sort of datetime 

377 object(s) out of the string. 

378 """ 

379 try: 

380 return sorted(aniso8601.parse_interval(value)) 

381 except ValueError: 

382 try: 

383 return aniso8601.parse_datetime(value), None 

384 except ValueError: 

385 return aniso8601.parse_date(value), None 

386 

387 

388def iso8601interval(value, argument="argument"): 

389 """ 

390 Parses ISO 8601-formatted datetime intervals into tuples of datetimes. 

391 

392 Accepts both a single date(time) or a full interval using either start/end 

393 or start/duration notation, with the following behavior: 

394 

395 - Intervals are defined as inclusive start, exclusive end 

396 - Single datetimes are translated into the interval spanning the 

397 largest resolution not specified in the input value, up to the day. 

398 - The smallest accepted resolution is 1 second. 

399 - All timezones are accepted as values; returned datetimes are 

400 localized to UTC. Naive inputs and date inputs will are assumed UTC. 

401 

402 Examples:: 

403 

404 "2013-01-01" -> datetime(2013, 1, 1), datetime(2013, 1, 2) 

405 "2013-01-01T12" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 13) 

406 "2013-01-01/2013-02-28" -> datetime(2013, 1, 1), datetime(2013, 2, 28) 

407 "2013-01-01/P3D" -> datetime(2013, 1, 1), datetime(2013, 1, 4) 

408 "2013-01-01T12:00/PT30M" -> datetime(2013, 1, 1, 12), datetime(2013, 1, 1, 12, 30) 

409 "2013-01-01T06:00/2013-01-01T12:00" -> datetime(2013, 1, 1, 6), datetime(2013, 1, 1, 12) 

410 

411 :param str value: The ISO8601 date time as a string 

412 :return: Two UTC datetimes, the start and the end of the specified interval 

413 :rtype: A tuple (datetime, datetime) 

414 :raises ValueError: if the interval is invalid. 

415 """ 

416 if not value: 

417 raise ValueError("Expected a valid ISO8601 date/time interval.") 

418 

419 try: 

420 start, end = _parse_interval(value) 

421 

422 if end is None: 

423 end = _expand_datetime(start, value) 

424 

425 start, end = _normalize_interval(start, end, value) 

426 

427 except ValueError: 

428 msg = ( 

429 "Invalid {arg}: {value}. {arg} must be a valid ISO8601 date/time interval." 

430 ) 

431 raise ValueError(msg.format(arg=argument, value=value)) 

432 

433 return start, end 

434 

435 

436iso8601interval.__schema__ = {"type": "string", "format": "iso8601-interval"} 

437 

438 

439def date(value): 

440 """Parse a valid looking date in the format YYYY-mm-dd""" 

441 date = datetime.strptime(value, "%Y-%m-%d") 

442 return date 

443 

444 

445date.__schema__ = {"type": "string", "format": "date"} 

446 

447 

448def _get_integer(value): 

449 try: 

450 return int(value) 

451 except (TypeError, ValueError): 

452 raise ValueError("{0} is not a valid integer".format(value)) 

453 

454 

455def natural(value, argument="argument"): 

456 """Restrict input type to the natural numbers (0, 1, 2, 3...)""" 

457 value = _get_integer(value) 

458 if value < 0: 

459 msg = "Invalid {arg}: {value}. {arg} must be a non-negative integer" 

460 raise ValueError(msg.format(arg=argument, value=value)) 

461 return value 

462 

463 

464natural.__schema__ = {"type": "integer", "minimum": 0} 

465 

466 

467def positive(value, argument="argument"): 

468 """Restrict input type to the positive integers (1, 2, 3...)""" 

469 value = _get_integer(value) 

470 if value < 1: 

471 msg = "Invalid {arg}: {value}. {arg} must be a positive integer" 

472 raise ValueError(msg.format(arg=argument, value=value)) 

473 return value 

474 

475 

476positive.__schema__ = {"type": "integer", "minimum": 0, "exclusiveMinimum": True} 

477 

478 

479class int_range(object): 

480 """Restrict input to an integer in a range (inclusive)""" 

481 

482 def __init__(self, low, high, argument="argument"): 

483 self.low = low 

484 self.high = high 

485 self.argument = argument 

486 

487 def __call__(self, value): 

488 value = _get_integer(value) 

489 if value < self.low or value > self.high: 

490 msg = "Invalid {arg}: {val}. {arg} must be within the range {lo} - {hi}" 

491 raise ValueError( 

492 msg.format(arg=self.argument, val=value, lo=self.low, hi=self.high) 

493 ) 

494 return value 

495 

496 @property 

497 def __schema__(self): 

498 return { 

499 "type": "integer", 

500 "minimum": self.low, 

501 "maximum": self.high, 

502 } 

503 

504 

505def boolean(value): 

506 """ 

507 Parse the string ``"true"`` or ``"false"`` as a boolean (case insensitive). 

508 

509 Also accepts ``"1"`` and ``"0"`` as ``True``/``False`` (respectively). 

510 

511 If the input is from the request JSON body, the type is already a native python boolean, 

512 and will be passed through without further parsing. 

513 

514 :raises ValueError: if the boolean value is invalid 

515 """ 

516 if isinstance(value, bool): 

517 return value 

518 

519 if value is None: 

520 raise ValueError("boolean type must be non-null") 

521 elif not value: 

522 return False 

523 value = str(value).lower() 

524 if value in ( 

525 "true", 

526 "1", 

527 "on", 

528 ): 

529 return True 

530 if value in ( 

531 "false", 

532 "0", 

533 ): 

534 return False 

535 raise ValueError("Invalid literal for boolean(): {0}".format(value)) 

536 

537 

538boolean.__schema__ = {"type": "boolean"} 

539 

540 

541def datetime_from_rfc822(value): 

542 """ 

543 Turns an RFC822 formatted date into a datetime object. 

544 

545 Example:: 

546 

547 inputs.datetime_from_rfc822('Wed, 02 Oct 2002 08:00:00 EST') 

548 

549 :param str value: The RFC822-complying string to transform 

550 :return: The parsed datetime 

551 :rtype: datetime 

552 :raises ValueError: if value is an invalid date literal 

553 

554 """ 

555 raw = value 

556 if not time_regex.search(value): 

557 value = " ".join((value, "00:00:00")) 

558 try: 

559 timetuple = parsedate_tz(value) 

560 timestamp = mktime_tz(timetuple) 

561 if timetuple[-1] is None: 

562 return datetime.fromtimestamp(timestamp).replace(tzinfo=pytz.utc) 

563 else: 

564 return datetime.fromtimestamp(timestamp, pytz.utc) 

565 except Exception: 

566 raise ValueError('Invalid date literal "{0}"'.format(raw)) 

567 

568 

569def datetime_from_iso8601(value): 

570 """ 

571 Turns an ISO8601 formatted date into a datetime object. 

572 

573 Example:: 

574 

575 inputs.datetime_from_iso8601("2012-01-01T23:30:00+02:00") 

576 

577 :param str value: The ISO8601-complying string to transform 

578 :return: A datetime 

579 :rtype: datetime 

580 :raises ValueError: if value is an invalid date literal 

581 

582 """ 

583 try: 

584 try: 

585 return aniso8601.parse_datetime(value) 

586 except ValueError: 

587 date = aniso8601.parse_date(value) 

588 return datetime(date.year, date.month, date.day) 

589 except Exception: 

590 raise ValueError('Invalid date literal "{0}"'.format(value)) 

591 

592 

593datetime_from_iso8601.__schema__ = {"type": "string", "format": "date-time"} 

594 

595 

596def date_from_iso8601(value): 

597 """ 

598 Turns an ISO8601 formatted date into a date object. 

599 

600 Example:: 

601 

602 inputs.date_from_iso8601("2012-01-01") 

603 

604 

605 

606 :param str value: The ISO8601-complying string to transform 

607 :return: A date 

608 :rtype: date 

609 :raises ValueError: if value is an invalid date literal 

610 

611 """ 

612 return datetime_from_iso8601(value).date() 

613 

614 

615date_from_iso8601.__schema__ = {"type": "string", "format": "date"}