Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/http/request.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

406 statements  

1import codecs 

2import copy 

3import operator 

4from io import BytesIO 

5from itertools import chain 

6from urllib.parse import parse_qsl, quote, urlencode, urljoin, urlsplit 

7 

8from django.conf import settings 

9from django.core import signing 

10from django.core.exceptions import ( 

11 BadRequest, 

12 DisallowedHost, 

13 ImproperlyConfigured, 

14 RequestDataTooBig, 

15 TooManyFieldsSent, 

16) 

17from django.core.files import uploadhandler 

18from django.http.multipartparser import ( 

19 MultiPartParser, 

20 MultiPartParserError, 

21 TooManyFilesSent, 

22) 

23from django.utils.datastructures import ( 

24 CaseInsensitiveMapping, 

25 ImmutableList, 

26 MultiValueDict, 

27) 

28from django.utils.encoding import escape_uri_path, iri_to_uri 

29from django.utils.functional import cached_property 

30from django.utils.http import is_same_domain, parse_header_parameters 

31from django.utils.regex_helper import _lazy_re_compile 

32 

33RAISE_ERROR = object() 

34host_validation_re = _lazy_re_compile( 

35 r"^([a-z0-9.-]+|\[[a-f0-9]*:[a-f0-9.:]+\])(?::([0-9]+))?$" 

36) 

37 

38 

39class UnreadablePostError(OSError): 

40 pass 

41 

42 

43class RawPostDataException(Exception): 

44 """ 

45 You cannot access raw_post_data from a request that has 

46 multipart/* POST data if it has been accessed via POST, 

47 FILES, etc.. 

48 """ 

49 

50 pass 

51 

52 

53class HttpRequest: 

54 """A basic HTTP request.""" 

55 

56 # The encoding used in GET/POST dicts. None means use default setting. 

57 _encoding = None 

58 _upload_handlers = [] 

59 

60 def __init__(self): 

61 # WARNING: The `WSGIRequest` subclass doesn't call `super`. 

62 # Any variable assignment made here should also happen in 

63 # `WSGIRequest.__init__()`. 

64 

65 self.GET = QueryDict(mutable=True) 

66 self.POST = QueryDict(mutable=True) 

67 self.COOKIES = {} 

68 self.META = {} 

69 self.FILES = MultiValueDict() 

70 

71 self.path = "" 

72 self.path_info = "" 

73 self.method = None 

74 self.resolver_match = None 

75 self.content_type = None 

76 self.content_params = None 

77 

78 def __repr__(self): 

79 if self.method is None or not self.get_full_path(): 

80 return "<%s>" % self.__class__.__name__ 

81 return "<%s: %s %r>" % ( 

82 self.__class__.__name__, 

83 self.method, 

84 self.get_full_path(), 

85 ) 

86 

87 @cached_property 

88 def headers(self): 

89 return HttpHeaders(self.META) 

90 

91 @cached_property 

92 def accepted_types(self): 

93 """Return a list of MediaType instances, in order of preference.""" 

94 header_value = self.headers.get("Accept", "*/*") 

95 return sorted( 

96 (MediaType(token) for token in header_value.split(",") if token.strip()), 

97 key=operator.attrgetter("quality", "specificity"), 

98 reverse=True, 

99 ) 

100 

101 def accepted_type(self, media_type): 

102 """ 

103 Return the preferred MediaType instance which matches the given media type. 

104 """ 

105 return next( 

106 ( 

107 accepted_type 

108 for accepted_type in self.accepted_types 

109 if accepted_type.match(media_type) 

110 ), 

111 None, 

112 ) 

113 

114 def get_preferred_type(self, media_types): 

115 """Select the preferred media type from the provided options.""" 

116 if not media_types or not self.accepted_types: 

117 return None 

118 

119 desired_types = [ 

120 (accepted_type, media_type) 

121 for media_type in media_types 

122 if (accepted_type := self.accepted_type(media_type)) is not None 

123 ] 

124 

125 if not desired_types: 

126 return None 

127 

128 # Of the desired media types, select the one which is most desirable. 

129 return min(desired_types, key=lambda t: self.accepted_types.index(t[0]))[1] 

130 

131 def accepts(self, media_type): 

132 """Does the client accept a response in the given media type?""" 

133 return self.accepted_type(media_type) is not None 

134 

135 def _set_content_type_params(self, meta): 

136 """Set content_type, content_params, and encoding.""" 

137 self.content_type, self.content_params = parse_header_parameters( 

138 meta.get("CONTENT_TYPE", "") 

139 ) 

140 if "charset" in self.content_params: 

141 try: 

142 codecs.lookup(self.content_params["charset"]) 

143 except LookupError: 

144 pass 

145 else: 

146 self.encoding = self.content_params["charset"] 

147 

148 def _get_raw_host(self): 

149 """ 

150 Return the HTTP host using the environment or request headers. Skip 

151 allowed hosts protection, so may return an insecure host. 

152 """ 

153 # We try three options, in order of decreasing preference. 

154 if settings.USE_X_FORWARDED_HOST and ("HTTP_X_FORWARDED_HOST" in self.META): 

155 host = self.META["HTTP_X_FORWARDED_HOST"] 

156 elif "HTTP_HOST" in self.META: 

157 host = self.META["HTTP_HOST"] 

158 else: 

159 # Reconstruct the host using the algorithm from PEP 333. 

160 host = self.META["SERVER_NAME"] 

161 server_port = self.get_port() 

162 if server_port != ("443" if self.is_secure() else "80"): 

163 host = "%s:%s" % (host, server_port) 

164 return host 

165 

166 def get_host(self): 

167 """Return the HTTP host using the environment or request headers.""" 

168 host = self._get_raw_host() 

169 

170 # Allow variants of localhost if ALLOWED_HOSTS is empty and DEBUG=True. 

171 allowed_hosts = settings.ALLOWED_HOSTS 

172 if settings.DEBUG and not allowed_hosts: 

173 allowed_hosts = [".localhost", "127.0.0.1", "[::1]"] 

174 

175 domain, port = split_domain_port(host) 

176 if domain and validate_host(domain, allowed_hosts): 

177 return host 

178 else: 

179 msg = "Invalid HTTP_HOST header: %r." % host 

180 if domain: 

181 msg += " You may need to add %r to ALLOWED_HOSTS." % domain 

182 else: 

183 msg += ( 

184 " The domain name provided is not valid according to RFC 1034/1035." 

185 ) 

186 raise DisallowedHost(msg) 

187 

188 def get_port(self): 

189 """Return the port number for the request as a string.""" 

190 if settings.USE_X_FORWARDED_PORT and "HTTP_X_FORWARDED_PORT" in self.META: 

191 port = self.META["HTTP_X_FORWARDED_PORT"] 

192 else: 

193 port = self.META["SERVER_PORT"] 

194 return str(port) 

195 

196 def get_full_path(self, force_append_slash=False): 

197 return self._get_full_path(self.path, force_append_slash) 

198 

199 def get_full_path_info(self, force_append_slash=False): 

200 return self._get_full_path(self.path_info, force_append_slash) 

201 

202 def _get_full_path(self, path, force_append_slash): 

203 # RFC 3986 requires query string arguments to be in the ASCII range. 

204 # Rather than crash if this doesn't happen, we encode defensively. 

205 return "%s%s%s" % ( 

206 escape_uri_path(path), 

207 "/" if force_append_slash and not path.endswith("/") else "", 

208 ( 

209 ("?" + iri_to_uri(self.META.get("QUERY_STRING", ""))) 

210 if self.META.get("QUERY_STRING", "") 

211 else "" 

212 ), 

213 ) 

214 

215 def get_signed_cookie(self, key, default=RAISE_ERROR, salt="", max_age=None): 

216 """ 

217 Attempt to return a signed cookie. If the signature fails or the 

218 cookie has expired, raise an exception, unless the `default` argument 

219 is provided, in which case return that value. 

220 """ 

221 try: 

222 cookie_value = self.COOKIES[key] 

223 except KeyError: 

224 if default is not RAISE_ERROR: 

225 return default 

226 else: 

227 raise 

228 try: 

229 value = signing.get_cookie_signer(salt=key + salt).unsign( 

230 cookie_value, max_age=max_age 

231 ) 

232 except signing.BadSignature: 

233 if default is not RAISE_ERROR: 

234 return default 

235 else: 

236 raise 

237 return value 

238 

239 def build_absolute_uri(self, location=None): 

240 """ 

241 Build an absolute URI from the location and the variables available in 

242 this request. If no ``location`` is specified, build the absolute URI 

243 using request.get_full_path(). If the location is absolute, convert it 

244 to an RFC 3987 compliant URI and return it. If location is relative or 

245 is scheme-relative (i.e., ``//example.com/``), urljoin() it to a base 

246 URL constructed from the request variables. 

247 """ 

248 if location is None: 

249 # Make it an absolute url (but schemeless and domainless) for the 

250 # edge case that the path starts with '//'. 

251 location = "//%s" % self.get_full_path() 

252 else: 

253 # Coerce lazy locations. 

254 location = str(location) 

255 bits = urlsplit(location) 

256 if not (bits.scheme and bits.netloc): 

257 # Handle the simple, most common case. If the location is absolute 

258 # and a scheme or host (netloc) isn't provided, skip an expensive 

259 # urljoin() as long as no path segments are '.' or '..'. 

260 if ( 

261 bits.path.startswith("/") 

262 and not bits.scheme 

263 and not bits.netloc 

264 and "/./" not in bits.path 

265 and "/../" not in bits.path 

266 ): 

267 # If location starts with '//' but has no netloc, reuse the 

268 # schema and netloc from the current request. Strip the double 

269 # slashes and continue as if it wasn't specified. 

270 location = self._current_scheme_host + location.removeprefix("//") 

271 else: 

272 # Join the constructed URL with the provided location, which 

273 # allows the provided location to apply query strings to the 

274 # base path. 

275 location = urljoin(self._current_scheme_host + self.path, location) 

276 return iri_to_uri(location) 

277 

278 @cached_property 

279 def _current_scheme_host(self): 

280 return "{}://{}".format(self.scheme, self.get_host()) 

281 

282 def _get_scheme(self): 

283 """ 

284 Hook for subclasses like WSGIRequest to implement. Return 'http' by 

285 default. 

286 """ 

287 return "http" 

288 

289 @property 

290 def scheme(self): 

291 if settings.SECURE_PROXY_SSL_HEADER: 

292 try: 

293 header, secure_value = settings.SECURE_PROXY_SSL_HEADER 

294 except ValueError: 

295 raise ImproperlyConfigured( 

296 "The SECURE_PROXY_SSL_HEADER setting must be a tuple containing " 

297 "two values." 

298 ) 

299 header_value = self.META.get(header) 

300 if header_value is not None: 

301 header_value, *_ = header_value.split(",", 1) 

302 return "https" if header_value.strip() == secure_value else "http" 

303 return self._get_scheme() 

304 

305 def is_secure(self): 

306 return self.scheme == "https" 

307 

308 @property 

309 def encoding(self): 

310 return self._encoding 

311 

312 @encoding.setter 

313 def encoding(self, val): 

314 """ 

315 Set the encoding used for GET/POST accesses. If the GET or POST 

316 dictionary has already been created, remove and recreate it on the 

317 next access (so that it is decoded correctly). 

318 """ 

319 self._encoding = val 

320 if hasattr(self, "GET"): 

321 del self.GET 

322 if hasattr(self, "_post"): 

323 del self._post 

324 

325 def _initialize_handlers(self): 

326 self._upload_handlers = [ 

327 uploadhandler.load_handler(handler, self) 

328 for handler in settings.FILE_UPLOAD_HANDLERS 

329 ] 

330 

331 @property 

332 def upload_handlers(self): 

333 if not self._upload_handlers: 

334 # If there are no upload handlers defined, initialize them from settings. 

335 self._initialize_handlers() 

336 return self._upload_handlers 

337 

338 @upload_handlers.setter 

339 def upload_handlers(self, upload_handlers): 

340 if hasattr(self, "_files"): 

341 raise AttributeError( 

342 "You cannot set the upload handlers after the upload has been " 

343 "processed." 

344 ) 

345 self._upload_handlers = upload_handlers 

346 

347 def parse_file_upload(self, META, post_data): 

348 """Return a tuple of (POST QueryDict, FILES MultiValueDict).""" 

349 self.upload_handlers = ImmutableList( 

350 self.upload_handlers, 

351 warning=( 

352 "You cannot alter upload handlers after the upload has been " 

353 "processed." 

354 ), 

355 ) 

356 parser = MultiPartParser(META, post_data, self.upload_handlers, self.encoding) 

357 return parser.parse() 

358 

359 @property 

360 def body(self): 

361 if not hasattr(self, "_body"): 

362 if self._read_started: 

363 raise RawPostDataException( 

364 "You cannot access body after reading from request's data stream" 

365 ) 

366 

367 # Limit the maximum request data size that will be handled in-memory. 

368 if ( 

369 settings.DATA_UPLOAD_MAX_MEMORY_SIZE is not None 

370 and int(self.META.get("CONTENT_LENGTH") or 0) 

371 > settings.DATA_UPLOAD_MAX_MEMORY_SIZE 

372 ): 

373 raise RequestDataTooBig( 

374 "Request body exceeded settings.DATA_UPLOAD_MAX_MEMORY_SIZE." 

375 ) 

376 

377 try: 

378 self._body = self.read() 

379 except OSError as e: 

380 raise UnreadablePostError(*e.args) from e 

381 finally: 

382 self._stream.close() 

383 self._stream = BytesIO(self._body) 

384 return self._body 

385 

386 def _mark_post_parse_error(self): 

387 self._post = QueryDict() 

388 self._files = MultiValueDict() 

389 

390 def _load_post_and_files(self): 

391 """Populate self._post and self._files if the content-type is a form type""" 

392 if self.method != "POST": 

393 self._post, self._files = ( 

394 QueryDict(encoding=self._encoding), 

395 MultiValueDict(), 

396 ) 

397 return 

398 if self._read_started and not hasattr(self, "_body"): 

399 self._mark_post_parse_error() 

400 return 

401 

402 if self.content_type == "multipart/form-data": 

403 if hasattr(self, "_body"): 

404 # Use already read data 

405 data = BytesIO(self._body) 

406 else: 

407 data = self 

408 try: 

409 self._post, self._files = self.parse_file_upload(self.META, data) 

410 except (MultiPartParserError, TooManyFilesSent): 

411 # An error occurred while parsing POST data. Since when 

412 # formatting the error the request handler might access 

413 # self.POST, set self._post and self._file to prevent 

414 # attempts to parse POST data again. 

415 self._mark_post_parse_error() 

416 raise 

417 elif self.content_type == "application/x-www-form-urlencoded": 

418 # According to RFC 1866, the "application/x-www-form-urlencoded" 

419 # content type does not have a charset and should be always treated 

420 # as UTF-8. 

421 if self._encoding is not None and self._encoding.lower() != "utf-8": 

422 raise BadRequest( 

423 "HTTP requests with the 'application/x-www-form-urlencoded' " 

424 "content type must be UTF-8 encoded." 

425 ) 

426 self._post = QueryDict(self.body, encoding="utf-8") 

427 self._files = MultiValueDict() 

428 else: 

429 self._post, self._files = ( 

430 QueryDict(encoding=self._encoding), 

431 MultiValueDict(), 

432 ) 

433 

434 def close(self): 

435 if hasattr(self, "_files"): 

436 for f in chain.from_iterable(list_[1] for list_ in self._files.lists()): 

437 f.close() 

438 

439 # File-like and iterator interface. 

440 # 

441 # Expects self._stream to be set to an appropriate source of bytes by 

442 # a corresponding request subclass (e.g. WSGIRequest). 

443 # Also when request data has already been read by request.POST or 

444 # request.body, self._stream points to a BytesIO instance 

445 # containing that data. 

446 

447 def read(self, *args, **kwargs): 

448 self._read_started = True 

449 try: 

450 return self._stream.read(*args, **kwargs) 

451 except OSError as e: 

452 raise UnreadablePostError(*e.args) from e 

453 

454 def readline(self, *args, **kwargs): 

455 self._read_started = True 

456 try: 

457 return self._stream.readline(*args, **kwargs) 

458 except OSError as e: 

459 raise UnreadablePostError(*e.args) from e 

460 

461 def __iter__(self): 

462 return iter(self.readline, b"") 

463 

464 def readlines(self): 

465 return list(self) 

466 

467 

468class HttpHeaders(CaseInsensitiveMapping): 

469 HTTP_PREFIX = "HTTP_" 

470 # PEP 333 gives two headers which aren't prepended with HTTP_. 

471 UNPREFIXED_HEADERS = {"CONTENT_TYPE", "CONTENT_LENGTH"} 

472 

473 def __init__(self, environ): 

474 headers = {} 

475 for header, value in environ.items(): 

476 name = self.parse_header_name(header) 

477 if name: 

478 headers[name] = value 

479 super().__init__(headers) 

480 

481 def __getitem__(self, key): 

482 """Allow header lookup using underscores in place of hyphens.""" 

483 return super().__getitem__(key.replace("_", "-")) 

484 

485 @classmethod 

486 def parse_header_name(cls, header): 

487 if header.startswith(cls.HTTP_PREFIX): 

488 header = header.removeprefix(cls.HTTP_PREFIX) 

489 elif header not in cls.UNPREFIXED_HEADERS: 

490 return None 

491 return header.replace("_", "-").title() 

492 

493 @classmethod 

494 def to_wsgi_name(cls, header): 

495 header = header.replace("-", "_").upper() 

496 if header in cls.UNPREFIXED_HEADERS: 

497 return header 

498 return f"{cls.HTTP_PREFIX}{header}" 

499 

500 @classmethod 

501 def to_asgi_name(cls, header): 

502 return header.replace("-", "_").upper() 

503 

504 @classmethod 

505 def to_wsgi_names(cls, headers): 

506 return { 

507 cls.to_wsgi_name(header_name): value 

508 for header_name, value in headers.items() 

509 } 

510 

511 @classmethod 

512 def to_asgi_names(cls, headers): 

513 return { 

514 cls.to_asgi_name(header_name): value 

515 for header_name, value in headers.items() 

516 } 

517 

518 

519class QueryDict(MultiValueDict): 

520 """ 

521 A specialized MultiValueDict which represents a query string. 

522 

523 A QueryDict can be used to represent GET or POST data. It subclasses 

524 MultiValueDict since keys in such data can be repeated, for instance 

525 in the data from a form with a <select multiple> field. 

526 

527 By default QueryDicts are immutable, though the copy() method 

528 will always return a mutable copy. 

529 

530 Both keys and values set on this class are converted from the given encoding 

531 (DEFAULT_CHARSET by default) to str. 

532 """ 

533 

534 # These are both reset in __init__, but is specified here at the class 

535 # level so that unpickling will have valid values 

536 _mutable = True 

537 _encoding = None 

538 

539 def __init__(self, query_string=None, mutable=False, encoding=None): 

540 super().__init__() 

541 self.encoding = encoding or settings.DEFAULT_CHARSET 

542 query_string = query_string or "" 

543 parse_qsl_kwargs = { 

544 "keep_blank_values": True, 

545 "encoding": self.encoding, 

546 "max_num_fields": settings.DATA_UPLOAD_MAX_NUMBER_FIELDS, 

547 } 

548 if isinstance(query_string, bytes): 

549 # query_string normally contains URL-encoded data, a subset of ASCII. 

550 try: 

551 query_string = query_string.decode(self.encoding) 

552 except UnicodeDecodeError: 

553 # ... but some user agents are misbehaving :-( 

554 query_string = query_string.decode("iso-8859-1") 

555 try: 

556 for key, value in parse_qsl(query_string, **parse_qsl_kwargs): 

557 self.appendlist(key, value) 

558 except ValueError as e: 

559 # ValueError can also be raised if the strict_parsing argument to 

560 # parse_qsl() is True. As that is not used by Django, assume that 

561 # the exception was raised by exceeding the value of max_num_fields 

562 # instead of fragile checks of exception message strings. 

563 raise TooManyFieldsSent( 

564 "The number of GET/POST parameters exceeded " 

565 "settings.DATA_UPLOAD_MAX_NUMBER_FIELDS." 

566 ) from e 

567 self._mutable = mutable 

568 

569 @classmethod 

570 def fromkeys(cls, iterable, value="", mutable=False, encoding=None): 

571 """ 

572 Return a new QueryDict with keys (may be repeated) from an iterable and 

573 values from value. 

574 """ 

575 q = cls("", mutable=True, encoding=encoding) 

576 for key in iterable: 

577 q.appendlist(key, value) 

578 if not mutable: 

579 q._mutable = False 

580 return q 

581 

582 @property 

583 def encoding(self): 

584 if self._encoding is None: 

585 self._encoding = settings.DEFAULT_CHARSET 

586 return self._encoding 

587 

588 @encoding.setter 

589 def encoding(self, value): 

590 self._encoding = value 

591 

592 def _assert_mutable(self): 

593 if not self._mutable: 

594 raise AttributeError("This QueryDict instance is immutable") 

595 

596 def __setitem__(self, key, value): 

597 self._assert_mutable() 

598 key = bytes_to_text(key, self.encoding) 

599 value = bytes_to_text(value, self.encoding) 

600 super().__setitem__(key, value) 

601 

602 def __delitem__(self, key): 

603 self._assert_mutable() 

604 super().__delitem__(key) 

605 

606 def __copy__(self): 

607 result = self.__class__("", mutable=True, encoding=self.encoding) 

608 for key, value in self.lists(): 

609 result.setlist(key, value) 

610 return result 

611 

612 def __deepcopy__(self, memo): 

613 result = self.__class__("", mutable=True, encoding=self.encoding) 

614 memo[id(self)] = result 

615 for key, value in self.lists(): 

616 result.setlist(copy.deepcopy(key, memo), copy.deepcopy(value, memo)) 

617 return result 

618 

619 def setlist(self, key, list_): 

620 self._assert_mutable() 

621 key = bytes_to_text(key, self.encoding) 

622 list_ = [bytes_to_text(elt, self.encoding) for elt in list_] 

623 super().setlist(key, list_) 

624 

625 def setlistdefault(self, key, default_list=None): 

626 self._assert_mutable() 

627 return super().setlistdefault(key, default_list) 

628 

629 def appendlist(self, key, value): 

630 self._assert_mutable() 

631 key = bytes_to_text(key, self.encoding) 

632 value = bytes_to_text(value, self.encoding) 

633 super().appendlist(key, value) 

634 

635 def pop(self, key, *args): 

636 self._assert_mutable() 

637 return super().pop(key, *args) 

638 

639 def popitem(self): 

640 self._assert_mutable() 

641 return super().popitem() 

642 

643 def clear(self): 

644 self._assert_mutable() 

645 super().clear() 

646 

647 def setdefault(self, key, default=None): 

648 self._assert_mutable() 

649 key = bytes_to_text(key, self.encoding) 

650 default = bytes_to_text(default, self.encoding) 

651 return super().setdefault(key, default) 

652 

653 def copy(self): 

654 """Return a mutable copy of this object.""" 

655 return self.__deepcopy__({}) 

656 

657 def urlencode(self, safe=None): 

658 """ 

659 Return an encoded string of all query string arguments. 

660 

661 `safe` specifies characters which don't require quoting, for example:: 

662 

663 >>> q = QueryDict(mutable=True) 

664 >>> q['next'] = '/a&b/' 

665 >>> q.urlencode() 

666 'next=%2Fa%26b%2F' 

667 >>> q.urlencode(safe='/') 

668 'next=/a%26b/' 

669 """ 

670 output = [] 

671 if safe: 

672 safe = safe.encode(self.encoding) 

673 

674 def encode(k, v): 

675 return "%s=%s" % ((quote(k, safe), quote(v, safe))) 

676 

677 else: 

678 

679 def encode(k, v): 

680 return urlencode({k: v}) 

681 

682 for k, list_ in self.lists(): 

683 output.extend( 

684 encode(k.encode(self.encoding), str(v).encode(self.encoding)) 

685 for v in list_ 

686 ) 

687 return "&".join(output) 

688 

689 

690class MediaType: 

691 def __init__(self, media_type_raw_line): 

692 full_type, self.params = parse_header_parameters( 

693 media_type_raw_line if media_type_raw_line else "" 

694 ) 

695 self.main_type, _, self.sub_type = full_type.partition("/") 

696 

697 def __str__(self): 

698 params_str = "".join("; %s=%s" % (k, v) for k, v in self.params.items()) 

699 return "%s%s%s" % ( 

700 self.main_type, 

701 ("/%s" % self.sub_type) if self.sub_type else "", 

702 params_str, 

703 ) 

704 

705 def __repr__(self): 

706 return "<%s: %s>" % (self.__class__.__qualname__, self) 

707 

708 @property 

709 def is_all_types(self): 

710 return self.main_type == "*" and self.sub_type == "*" 

711 

712 def match(self, other): 

713 if self.is_all_types: 

714 return True 

715 other = MediaType(other) 

716 return self.main_type == other.main_type and self.sub_type in { 

717 "*", 

718 other.sub_type, 

719 } 

720 

721 @cached_property 

722 def quality(self): 

723 try: 

724 quality = float(self.params.get("q", 1)) 

725 except ValueError: 

726 # Discard invalid values. 

727 return 1 

728 

729 # Valid quality values must be between 0 and 1. 

730 if quality < 0 or quality > 1: 

731 return 1 

732 

733 return round(quality, 3) 

734 

735 @property 

736 def specificity(self): 

737 """ 

738 Return a value from 0-3 for how specific the media type is. 

739 """ 

740 if self.main_type == "*": 

741 return 0 

742 elif self.sub_type == "*": 

743 return 1 

744 elif self.quality == 1: 

745 return 2 

746 return 3 

747 

748 

749# It's neither necessary nor appropriate to use 

750# django.utils.encoding.force_str() for parsing URLs and form inputs. Thus, 

751# this slightly more restricted function, used by QueryDict. 

752def bytes_to_text(s, encoding): 

753 """ 

754 Convert bytes objects to strings, using the given encoding. Illegally 

755 encoded input characters are replaced with Unicode "unknown" codepoint 

756 (\ufffd). 

757 

758 Return any non-bytes objects without change. 

759 """ 

760 if isinstance(s, bytes): 

761 return str(s, encoding, "replace") 

762 else: 

763 return s 

764 

765 

766def split_domain_port(host): 

767 """ 

768 Return a (domain, port) tuple from a given host. 

769 

770 Returned domain is lowercased. If the host is invalid, the domain will be 

771 empty. 

772 """ 

773 if match := host_validation_re.fullmatch(host.lower()): 

774 domain, port = match.groups(default="") 

775 # Remove a trailing dot (if present) from the domain. 

776 return domain.removesuffix("."), port 

777 return "", "" 

778 

779 

780def validate_host(host, allowed_hosts): 

781 """ 

782 Validate the given host for this site. 

783 

784 Check that the host looks valid and matches a host or host pattern in the 

785 given list of ``allowed_hosts``. Any pattern beginning with a period 

786 matches a domain and all its subdomains (e.g. ``.example.com`` matches 

787 ``example.com`` and any subdomain), ``*`` matches anything, and anything 

788 else must match exactly. 

789 

790 Note: This function assumes that the given host is lowercased and has 

791 already had the port, if any, stripped off. 

792 

793 Return ``True`` for a valid host, ``False`` otherwise. 

794 """ 

795 return any( 

796 pattern == "*" or is_same_domain(host, pattern) for pattern in allowed_hosts 

797 )