Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/http/request.py: 30%

399 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1import codecs 

2import copy 

3from io import BytesIO 

4from itertools import chain 

5from urllib.parse import parse_qsl, quote, urlencode, urljoin, urlsplit 

6 

7from django.conf import settings 

8from django.core import signing 

9from django.core.exceptions import ( 

10 DisallowedHost, 

11 ImproperlyConfigured, 

12 RequestDataTooBig, 

13 TooManyFieldsSent, 

14) 

15from django.core.files import uploadhandler 

16from django.http.multipartparser import MultiPartParser, MultiPartParserError 

17from django.utils.datastructures import ( 

18 CaseInsensitiveMapping, 

19 ImmutableList, 

20 MultiValueDict, 

21) 

22from django.utils.encoding import escape_uri_path, iri_to_uri 

23from django.utils.functional import cached_property 

24from django.utils.http import is_same_domain, parse_header_parameters 

25from django.utils.regex_helper import _lazy_re_compile 

26 

27RAISE_ERROR = object() 

28host_validation_re = _lazy_re_compile( 

29 r"^([a-z0-9.-]+|\[[a-f0-9]*:[a-f0-9\.:]+\])(:[0-9]+)?$" 

30) 

31 

32 

33class UnreadablePostError(OSError): 

34 pass 

35 

36 

37class RawPostDataException(Exception): 

38 """ 

39 You cannot access raw_post_data from a request that has 

40 multipart/* POST data if it has been accessed via POST, 

41 FILES, etc.. 

42 """ 

43 

44 pass 

45 

46 

47class HttpRequest: 

48 """A basic HTTP request.""" 

49 

50 # The encoding used in GET/POST dicts. None means use default setting. 

51 _encoding = None 

52 _upload_handlers = [] 

53 

54 non_picklable_attrs = frozenset(["resolver_match", "_stream"]) 

55 

56 def __init__(self): 

57 # WARNING: The `WSGIRequest` subclass doesn't call `super`. 

58 # Any variable assignment made here should also happen in 

59 # `WSGIRequest.__init__()`. 

60 

61 self.GET = QueryDict(mutable=True) 

62 self.POST = QueryDict(mutable=True) 

63 self.COOKIES = {} 

64 self.META = {} 

65 self.FILES = MultiValueDict() 

66 

67 self.path = "" 

68 self.path_info = "" 

69 self.method = None 

70 self.resolver_match = None 

71 self.content_type = None 

72 self.content_params = None 

73 

74 def __repr__(self): 

75 if self.method is None or not self.get_full_path(): 

76 return "<%s>" % self.__class__.__name__ 

77 return "<%s: %s %r>" % ( 

78 self.__class__.__name__, 

79 self.method, 

80 self.get_full_path(), 

81 ) 

82 

83 def __getstate__(self): 

84 obj_dict = self.__dict__.copy() 

85 for attr in self.non_picklable_attrs: 

86 if attr in obj_dict: 

87 del obj_dict[attr] 

88 return obj_dict 

89 

90 def __deepcopy__(self, memo): 

91 obj = copy.copy(self) 

92 for attr in self.non_picklable_attrs: 

93 if hasattr(self, attr): 

94 setattr(obj, attr, copy.deepcopy(getattr(self, attr), memo)) 

95 memo[id(self)] = obj 

96 return obj 

97 

98 @cached_property 

99 def headers(self): 

100 return HttpHeaders(self.META) 

101 

102 @cached_property 

103 def accepted_types(self): 

104 """Return a list of MediaType instances.""" 

105 return parse_accept_header(self.headers.get("Accept", "*/*")) 

106 

107 def accepts(self, media_type): 

108 return any( 

109 accepted_type.match(media_type) for accepted_type in self.accepted_types 

110 ) 

111 

112 def _set_content_type_params(self, meta): 

113 """Set content_type, content_params, and encoding.""" 

114 self.content_type, self.content_params = parse_header_parameters( 

115 meta.get("CONTENT_TYPE", "") 

116 ) 

117 if "charset" in self.content_params: 

118 try: 

119 codecs.lookup(self.content_params["charset"]) 

120 except LookupError: 

121 pass 

122 else: 

123 self.encoding = self.content_params["charset"] 

124 

125 def _get_raw_host(self): 

126 """ 

127 Return the HTTP host using the environment or request headers. Skip 

128 allowed hosts protection, so may return an insecure host. 

129 """ 

130 # We try three options, in order of decreasing preference. 

131 if settings.USE_X_FORWARDED_HOST and ("HTTP_X_FORWARDED_HOST" in self.META): 

132 host = self.META["HTTP_X_FORWARDED_HOST"] 

133 elif "HTTP_HOST" in self.META: 

134 host = self.META["HTTP_HOST"] 

135 else: 

136 # Reconstruct the host using the algorithm from PEP 333. 

137 host = self.META["SERVER_NAME"] 

138 server_port = self.get_port() 

139 if server_port != ("443" if self.is_secure() else "80"): 

140 host = "%s:%s" % (host, server_port) 

141 return host 

142 

143 def get_host(self): 

144 """Return the HTTP host using the environment or request headers.""" 

145 host = self._get_raw_host() 

146 

147 # Allow variants of localhost if ALLOWED_HOSTS is empty and DEBUG=True. 

148 allowed_hosts = settings.ALLOWED_HOSTS 

149 if settings.DEBUG and not allowed_hosts: 

150 allowed_hosts = [".localhost", "127.0.0.1", "[::1]"] 

151 

152 domain, port = split_domain_port(host) 

153 if domain and validate_host(domain, allowed_hosts): 

154 return host 

155 else: 

156 msg = "Invalid HTTP_HOST header: %r." % host 

157 if domain: 

158 msg += " You may need to add %r to ALLOWED_HOSTS." % domain 

159 else: 

160 msg += ( 

161 " The domain name provided is not valid according to RFC 1034/1035." 

162 ) 

163 raise DisallowedHost(msg) 

164 

165 def get_port(self): 

166 """Return the port number for the request as a string.""" 

167 if settings.USE_X_FORWARDED_PORT and "HTTP_X_FORWARDED_PORT" in self.META: 

168 port = self.META["HTTP_X_FORWARDED_PORT"] 

169 else: 

170 port = self.META["SERVER_PORT"] 

171 return str(port) 

172 

173 def get_full_path(self, force_append_slash=False): 

174 return self._get_full_path(self.path, force_append_slash) 

175 

176 def get_full_path_info(self, force_append_slash=False): 

177 return self._get_full_path(self.path_info, force_append_slash) 

178 

179 def _get_full_path(self, path, force_append_slash): 

180 # RFC 3986 requires query string arguments to be in the ASCII range. 

181 # Rather than crash if this doesn't happen, we encode defensively. 

182 return "%s%s%s" % ( 

183 escape_uri_path(path), 

184 "/" if force_append_slash and not path.endswith("/") else "", 

185 ("?" + iri_to_uri(self.META.get("QUERY_STRING", ""))) 

186 if self.META.get("QUERY_STRING", "") 

187 else "", 

188 ) 

189 

190 def get_signed_cookie(self, key, default=RAISE_ERROR, salt="", max_age=None): 

191 """ 

192 Attempt to return a signed cookie. If the signature fails or the 

193 cookie has expired, raise an exception, unless the `default` argument 

194 is provided, in which case return that value. 

195 """ 

196 try: 

197 cookie_value = self.COOKIES[key] 

198 except KeyError: 

199 if default is not RAISE_ERROR: 

200 return default 

201 else: 

202 raise 

203 try: 

204 value = signing.get_cookie_signer(salt=key + salt).unsign( 

205 cookie_value, max_age=max_age 

206 ) 

207 except signing.BadSignature: 

208 if default is not RAISE_ERROR: 

209 return default 

210 else: 

211 raise 

212 return value 

213 

214 def build_absolute_uri(self, location=None): 

215 """ 

216 Build an absolute URI from the location and the variables available in 

217 this request. If no ``location`` is specified, build the absolute URI 

218 using request.get_full_path(). If the location is absolute, convert it 

219 to an RFC 3987 compliant URI and return it. If location is relative or 

220 is scheme-relative (i.e., ``//example.com/``), urljoin() it to a base 

221 URL constructed from the request variables. 

222 """ 

223 if location is None: 

224 # Make it an absolute url (but schemeless and domainless) for the 

225 # edge case that the path starts with '//'. 

226 location = "//%s" % self.get_full_path() 

227 else: 

228 # Coerce lazy locations. 

229 location = str(location) 

230 bits = urlsplit(location) 

231 if not (bits.scheme and bits.netloc): 

232 # Handle the simple, most common case. If the location is absolute 

233 # and a scheme or host (netloc) isn't provided, skip an expensive 

234 # urljoin() as long as no path segments are '.' or '..'. 

235 if ( 

236 bits.path.startswith("/") 

237 and not bits.scheme 

238 and not bits.netloc 

239 and "/./" not in bits.path 

240 and "/../" not in bits.path 

241 ): 

242 # If location starts with '//' but has no netloc, reuse the 

243 # schema and netloc from the current request. Strip the double 

244 # slashes and continue as if it wasn't specified. 

245 if location.startswith("//"): 

246 location = location[2:] 

247 location = self._current_scheme_host + location 

248 else: 

249 # Join the constructed URL with the provided location, which 

250 # allows the provided location to apply query strings to the 

251 # base path. 

252 location = urljoin(self._current_scheme_host + self.path, location) 

253 return iri_to_uri(location) 

254 

255 @cached_property 

256 def _current_scheme_host(self): 

257 return "{}://{}".format(self.scheme, self.get_host()) 

258 

259 def _get_scheme(self): 

260 """ 

261 Hook for subclasses like WSGIRequest to implement. Return 'http' by 

262 default. 

263 """ 

264 return "http" 

265 

266 @property 

267 def scheme(self): 

268 if settings.SECURE_PROXY_SSL_HEADER: 

269 try: 

270 header, secure_value = settings.SECURE_PROXY_SSL_HEADER 

271 except ValueError: 

272 raise ImproperlyConfigured( 

273 "The SECURE_PROXY_SSL_HEADER setting must be a tuple containing " 

274 "two values." 

275 ) 

276 header_value = self.META.get(header) 

277 if header_value is not None: 

278 header_value, *_ = header_value.split(",", 1) 

279 return "https" if header_value.strip() == secure_value else "http" 

280 return self._get_scheme() 

281 

282 def is_secure(self): 

283 return self.scheme == "https" 

284 

285 @property 

286 def encoding(self): 

287 return self._encoding 

288 

289 @encoding.setter 

290 def encoding(self, val): 

291 """ 

292 Set the encoding used for GET/POST accesses. If the GET or POST 

293 dictionary has already been created, remove and recreate it on the 

294 next access (so that it is decoded correctly). 

295 """ 

296 self._encoding = val 

297 if hasattr(self, "GET"): 

298 del self.GET 

299 if hasattr(self, "_post"): 

300 del self._post 

301 

302 def _initialize_handlers(self): 

303 self._upload_handlers = [ 

304 uploadhandler.load_handler(handler, self) 

305 for handler in settings.FILE_UPLOAD_HANDLERS 

306 ] 

307 

308 @property 

309 def upload_handlers(self): 

310 if not self._upload_handlers: 

311 # If there are no upload handlers defined, initialize them from settings. 

312 self._initialize_handlers() 

313 return self._upload_handlers 

314 

315 @upload_handlers.setter 

316 def upload_handlers(self, upload_handlers): 

317 if hasattr(self, "_files"): 

318 raise AttributeError( 

319 "You cannot set the upload handlers after the upload has been " 

320 "processed." 

321 ) 

322 self._upload_handlers = upload_handlers 

323 

324 def parse_file_upload(self, META, post_data): 

325 """Return a tuple of (POST QueryDict, FILES MultiValueDict).""" 

326 self.upload_handlers = ImmutableList( 

327 self.upload_handlers, 

328 warning=( 

329 "You cannot alter upload handlers after the upload has been " 

330 "processed." 

331 ), 

332 ) 

333 parser = MultiPartParser(META, post_data, self.upload_handlers, self.encoding) 

334 return parser.parse() 

335 

336 @property 

337 def body(self): 

338 if not hasattr(self, "_body"): 

339 if self._read_started: 

340 raise RawPostDataException( 

341 "You cannot access body after reading from request's data stream" 

342 ) 

343 

344 # Limit the maximum request data size that will be handled in-memory. 

345 if ( 

346 settings.DATA_UPLOAD_MAX_MEMORY_SIZE is not None 

347 and int(self.META.get("CONTENT_LENGTH") or 0) 

348 > settings.DATA_UPLOAD_MAX_MEMORY_SIZE 

349 ): 

350 raise RequestDataTooBig( 

351 "Request body exceeded settings.DATA_UPLOAD_MAX_MEMORY_SIZE." 

352 ) 

353 

354 try: 

355 self._body = self.read() 

356 except OSError as e: 

357 raise UnreadablePostError(*e.args) from e 

358 finally: 

359 self._stream.close() 

360 self._stream = BytesIO(self._body) 

361 return self._body 

362 

363 def _mark_post_parse_error(self): 

364 self._post = QueryDict() 

365 self._files = MultiValueDict() 

366 

367 def _load_post_and_files(self): 

368 """Populate self._post and self._files if the content-type is a form type""" 

369 if self.method != "POST": 

370 self._post, self._files = ( 

371 QueryDict(encoding=self._encoding), 

372 MultiValueDict(), 

373 ) 

374 return 

375 if self._read_started and not hasattr(self, "_body"): 

376 self._mark_post_parse_error() 

377 return 

378 

379 if self.content_type == "multipart/form-data": 

380 if hasattr(self, "_body"): 

381 # Use already read data 

382 data = BytesIO(self._body) 

383 else: 

384 data = self 

385 try: 

386 self._post, self._files = self.parse_file_upload(self.META, data) 

387 except MultiPartParserError: 

388 # An error occurred while parsing POST data. Since when 

389 # formatting the error the request handler might access 

390 # self.POST, set self._post and self._file to prevent 

391 # attempts to parse POST data again. 

392 self._mark_post_parse_error() 

393 raise 

394 elif self.content_type == "application/x-www-form-urlencoded": 

395 self._post, self._files = ( 

396 QueryDict(self.body, encoding=self._encoding), 

397 MultiValueDict(), 

398 ) 

399 else: 

400 self._post, self._files = ( 

401 QueryDict(encoding=self._encoding), 

402 MultiValueDict(), 

403 ) 

404 

405 def close(self): 

406 if hasattr(self, "_files"): 

407 for f in chain.from_iterable(list_[1] for list_ in self._files.lists()): 

408 f.close() 

409 

410 # File-like and iterator interface. 

411 # 

412 # Expects self._stream to be set to an appropriate source of bytes by 

413 # a corresponding request subclass (e.g. WSGIRequest). 

414 # Also when request data has already been read by request.POST or 

415 # request.body, self._stream points to a BytesIO instance 

416 # containing that data. 

417 

418 def read(self, *args, **kwargs): 

419 self._read_started = True 

420 try: 

421 return self._stream.read(*args, **kwargs) 

422 except OSError as e: 

423 raise UnreadablePostError(*e.args) from e 

424 

425 def readline(self, *args, **kwargs): 

426 self._read_started = True 

427 try: 

428 return self._stream.readline(*args, **kwargs) 

429 except OSError as e: 

430 raise UnreadablePostError(*e.args) from e 

431 

432 def __iter__(self): 

433 return iter(self.readline, b"") 

434 

435 def readlines(self): 

436 return list(self) 

437 

438 

439class HttpHeaders(CaseInsensitiveMapping): 

440 HTTP_PREFIX = "HTTP_" 

441 # PEP 333 gives two headers which aren't prepended with HTTP_. 

442 UNPREFIXED_HEADERS = {"CONTENT_TYPE", "CONTENT_LENGTH"} 

443 

444 def __init__(self, environ): 

445 headers = {} 

446 for header, value in environ.items(): 

447 name = self.parse_header_name(header) 

448 if name: 

449 headers[name] = value 

450 super().__init__(headers) 

451 

452 def __getitem__(self, key): 

453 """Allow header lookup using underscores in place of hyphens.""" 

454 return super().__getitem__(key.replace("_", "-")) 

455 

456 @classmethod 

457 def parse_header_name(cls, header): 

458 if header.startswith(cls.HTTP_PREFIX): 

459 header = header[len(cls.HTTP_PREFIX) :] 

460 elif header not in cls.UNPREFIXED_HEADERS: 

461 return None 

462 return header.replace("_", "-").title() 

463 

464 @classmethod 

465 def to_wsgi_name(cls, header): 

466 header = header.replace("-", "_").upper() 

467 if header in cls.UNPREFIXED_HEADERS: 

468 return header 

469 return f"{cls.HTTP_PREFIX}{header}" 

470 

471 @classmethod 

472 def to_asgi_name(cls, header): 

473 return header.replace("-", "_").upper() 

474 

475 @classmethod 

476 def to_wsgi_names(cls, headers): 

477 return { 

478 cls.to_wsgi_name(header_name): value 

479 for header_name, value in headers.items() 

480 } 

481 

482 @classmethod 

483 def to_asgi_names(cls, headers): 

484 return { 

485 cls.to_asgi_name(header_name): value 

486 for header_name, value in headers.items() 

487 } 

488 

489 

490class QueryDict(MultiValueDict): 

491 """ 

492 A specialized MultiValueDict which represents a query string. 

493 

494 A QueryDict can be used to represent GET or POST data. It subclasses 

495 MultiValueDict since keys in such data can be repeated, for instance 

496 in the data from a form with a <select multiple> field. 

497 

498 By default QueryDicts are immutable, though the copy() method 

499 will always return a mutable copy. 

500 

501 Both keys and values set on this class are converted from the given encoding 

502 (DEFAULT_CHARSET by default) to str. 

503 """ 

504 

505 # These are both reset in __init__, but is specified here at the class 

506 # level so that unpickling will have valid values 

507 _mutable = True 

508 _encoding = None 

509 

510 def __init__(self, query_string=None, mutable=False, encoding=None): 

511 super().__init__() 

512 self.encoding = encoding or settings.DEFAULT_CHARSET 

513 query_string = query_string or "" 

514 parse_qsl_kwargs = { 

515 "keep_blank_values": True, 

516 "encoding": self.encoding, 

517 "max_num_fields": settings.DATA_UPLOAD_MAX_NUMBER_FIELDS, 

518 } 

519 if isinstance(query_string, bytes): 

520 # query_string normally contains URL-encoded data, a subset of ASCII. 

521 try: 

522 query_string = query_string.decode(self.encoding) 

523 except UnicodeDecodeError: 

524 # ... but some user agents are misbehaving :-( 

525 query_string = query_string.decode("iso-8859-1") 

526 try: 

527 for key, value in parse_qsl(query_string, **parse_qsl_kwargs): 

528 self.appendlist(key, value) 

529 except ValueError as e: 

530 # ValueError can also be raised if the strict_parsing argument to 

531 # parse_qsl() is True. As that is not used by Django, assume that 

532 # the exception was raised by exceeding the value of max_num_fields 

533 # instead of fragile checks of exception message strings. 

534 raise TooManyFieldsSent( 

535 "The number of GET/POST parameters exceeded " 

536 "settings.DATA_UPLOAD_MAX_NUMBER_FIELDS." 

537 ) from e 

538 self._mutable = mutable 

539 

540 @classmethod 

541 def fromkeys(cls, iterable, value="", mutable=False, encoding=None): 

542 """ 

543 Return a new QueryDict with keys (may be repeated) from an iterable and 

544 values from value. 

545 """ 

546 q = cls("", mutable=True, encoding=encoding) 

547 for key in iterable: 

548 q.appendlist(key, value) 

549 if not mutable: 

550 q._mutable = False 

551 return q 

552 

553 @property 

554 def encoding(self): 

555 if self._encoding is None: 

556 self._encoding = settings.DEFAULT_CHARSET 

557 return self._encoding 

558 

559 @encoding.setter 

560 def encoding(self, value): 

561 self._encoding = value 

562 

563 def _assert_mutable(self): 

564 if not self._mutable: 

565 raise AttributeError("This QueryDict instance is immutable") 

566 

567 def __setitem__(self, key, value): 

568 self._assert_mutable() 

569 key = bytes_to_text(key, self.encoding) 

570 value = bytes_to_text(value, self.encoding) 

571 super().__setitem__(key, value) 

572 

573 def __delitem__(self, key): 

574 self._assert_mutable() 

575 super().__delitem__(key) 

576 

577 def __copy__(self): 

578 result = self.__class__("", mutable=True, encoding=self.encoding) 

579 for key, value in self.lists(): 

580 result.setlist(key, value) 

581 return result 

582 

583 def __deepcopy__(self, memo): 

584 result = self.__class__("", mutable=True, encoding=self.encoding) 

585 memo[id(self)] = result 

586 for key, value in self.lists(): 

587 result.setlist(copy.deepcopy(key, memo), copy.deepcopy(value, memo)) 

588 return result 

589 

590 def setlist(self, key, list_): 

591 self._assert_mutable() 

592 key = bytes_to_text(key, self.encoding) 

593 list_ = [bytes_to_text(elt, self.encoding) for elt in list_] 

594 super().setlist(key, list_) 

595 

596 def setlistdefault(self, key, default_list=None): 

597 self._assert_mutable() 

598 return super().setlistdefault(key, default_list) 

599 

600 def appendlist(self, key, value): 

601 self._assert_mutable() 

602 key = bytes_to_text(key, self.encoding) 

603 value = bytes_to_text(value, self.encoding) 

604 super().appendlist(key, value) 

605 

606 def pop(self, key, *args): 

607 self._assert_mutable() 

608 return super().pop(key, *args) 

609 

610 def popitem(self): 

611 self._assert_mutable() 

612 return super().popitem() 

613 

614 def clear(self): 

615 self._assert_mutable() 

616 super().clear() 

617 

618 def setdefault(self, key, default=None): 

619 self._assert_mutable() 

620 key = bytes_to_text(key, self.encoding) 

621 default = bytes_to_text(default, self.encoding) 

622 return super().setdefault(key, default) 

623 

624 def copy(self): 

625 """Return a mutable copy of this object.""" 

626 return self.__deepcopy__({}) 

627 

628 def urlencode(self, safe=None): 

629 """ 

630 Return an encoded string of all query string arguments. 

631 

632 `safe` specifies characters which don't require quoting, for example:: 

633 

634 >>> q = QueryDict(mutable=True) 

635 >>> q['next'] = '/a&b/' 

636 >>> q.urlencode() 

637 'next=%2Fa%26b%2F' 

638 >>> q.urlencode(safe='/') 

639 'next=/a%26b/' 

640 """ 

641 output = [] 

642 if safe: 

643 safe = safe.encode(self.encoding) 

644 

645 def encode(k, v): 

646 return "%s=%s" % ((quote(k, safe), quote(v, safe))) 

647 

648 else: 

649 

650 def encode(k, v): 

651 return urlencode({k: v}) 

652 

653 for k, list_ in self.lists(): 

654 output.extend( 

655 encode(k.encode(self.encoding), str(v).encode(self.encoding)) 

656 for v in list_ 

657 ) 

658 return "&".join(output) 

659 

660 

661class MediaType: 

662 def __init__(self, media_type_raw_line): 

663 full_type, self.params = parse_header_parameters( 

664 media_type_raw_line if media_type_raw_line else "" 

665 ) 

666 self.main_type, _, self.sub_type = full_type.partition("/") 

667 

668 def __str__(self): 

669 params_str = "".join("; %s=%s" % (k, v) for k, v in self.params.items()) 

670 return "%s%s%s" % ( 

671 self.main_type, 

672 ("/%s" % self.sub_type) if self.sub_type else "", 

673 params_str, 

674 ) 

675 

676 def __repr__(self): 

677 return "<%s: %s>" % (self.__class__.__qualname__, self) 

678 

679 @property 

680 def is_all_types(self): 

681 return self.main_type == "*" and self.sub_type == "*" 

682 

683 def match(self, other): 

684 if self.is_all_types: 

685 return True 

686 other = MediaType(other) 

687 if self.main_type == other.main_type and self.sub_type in {"*", other.sub_type}: 

688 return True 

689 return False 

690 

691 

692# It's neither necessary nor appropriate to use 

693# django.utils.encoding.force_str() for parsing URLs and form inputs. Thus, 

694# this slightly more restricted function, used by QueryDict. 

695def bytes_to_text(s, encoding): 

696 """ 

697 Convert bytes objects to strings, using the given encoding. Illegally 

698 encoded input characters are replaced with Unicode "unknown" codepoint 

699 (\ufffd). 

700 

701 Return any non-bytes objects without change. 

702 """ 

703 if isinstance(s, bytes): 

704 return str(s, encoding, "replace") 

705 else: 

706 return s 

707 

708 

709def split_domain_port(host): 

710 """ 

711 Return a (domain, port) tuple from a given host. 

712 

713 Returned domain is lowercased. If the host is invalid, the domain will be 

714 empty. 

715 """ 

716 host = host.lower() 

717 

718 if not host_validation_re.match(host): 

719 return "", "" 

720 

721 if host[-1] == "]": 

722 # It's an IPv6 address without a port. 

723 return host, "" 

724 bits = host.rsplit(":", 1) 

725 domain, port = bits if len(bits) == 2 else (bits[0], "") 

726 # Remove a trailing dot (if present) from the domain. 

727 domain = domain[:-1] if domain.endswith(".") else domain 

728 return domain, port 

729 

730 

731def validate_host(host, allowed_hosts): 

732 """ 

733 Validate the given host for this site. 

734 

735 Check that the host looks valid and matches a host or host pattern in the 

736 given list of ``allowed_hosts``. Any pattern beginning with a period 

737 matches a domain and all its subdomains (e.g. ``.example.com`` matches 

738 ``example.com`` and any subdomain), ``*`` matches anything, and anything 

739 else must match exactly. 

740 

741 Note: This function assumes that the given host is lowercased and has 

742 already had the port, if any, stripped off. 

743 

744 Return ``True`` for a valid host, ``False`` otherwise. 

745 """ 

746 return any( 

747 pattern == "*" or is_same_domain(host, pattern) for pattern in allowed_hosts 

748 ) 

749 

750 

751def parse_accept_header(header): 

752 return [MediaType(token) for token in header.split(",") if token.strip()]