Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/http/request.py: 30%
399 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1import codecs
2import copy
3from io import BytesIO
4from itertools import chain
5from urllib.parse import parse_qsl, quote, urlencode, urljoin, urlsplit
7from django.conf import settings
8from django.core import signing
9from django.core.exceptions import (
10 DisallowedHost,
11 ImproperlyConfigured,
12 RequestDataTooBig,
13 TooManyFieldsSent,
14)
15from django.core.files import uploadhandler
16from django.http.multipartparser import MultiPartParser, MultiPartParserError
17from django.utils.datastructures import (
18 CaseInsensitiveMapping,
19 ImmutableList,
20 MultiValueDict,
21)
22from django.utils.encoding import escape_uri_path, iri_to_uri
23from django.utils.functional import cached_property
24from django.utils.http import is_same_domain, parse_header_parameters
25from django.utils.regex_helper import _lazy_re_compile
27RAISE_ERROR = object()
28host_validation_re = _lazy_re_compile(
29 r"^([a-z0-9.-]+|\[[a-f0-9]*:[a-f0-9\.:]+\])(:[0-9]+)?$"
30)
33class UnreadablePostError(OSError):
34 pass
37class RawPostDataException(Exception):
38 """
39 You cannot access raw_post_data from a request that has
40 multipart/* POST data if it has been accessed via POST,
41 FILES, etc..
42 """
44 pass
47class HttpRequest:
48 """A basic HTTP request."""
50 # The encoding used in GET/POST dicts. None means use default setting.
51 _encoding = None
52 _upload_handlers = []
54 non_picklable_attrs = frozenset(["resolver_match", "_stream"])
56 def __init__(self):
57 # WARNING: The `WSGIRequest` subclass doesn't call `super`.
58 # Any variable assignment made here should also happen in
59 # `WSGIRequest.__init__()`.
61 self.GET = QueryDict(mutable=True)
62 self.POST = QueryDict(mutable=True)
63 self.COOKIES = {}
64 self.META = {}
65 self.FILES = MultiValueDict()
67 self.path = ""
68 self.path_info = ""
69 self.method = None
70 self.resolver_match = None
71 self.content_type = None
72 self.content_params = None
74 def __repr__(self):
75 if self.method is None or not self.get_full_path():
76 return "<%s>" % self.__class__.__name__
77 return "<%s: %s %r>" % (
78 self.__class__.__name__,
79 self.method,
80 self.get_full_path(),
81 )
83 def __getstate__(self):
84 obj_dict = self.__dict__.copy()
85 for attr in self.non_picklable_attrs:
86 if attr in obj_dict:
87 del obj_dict[attr]
88 return obj_dict
90 def __deepcopy__(self, memo):
91 obj = copy.copy(self)
92 for attr in self.non_picklable_attrs:
93 if hasattr(self, attr):
94 setattr(obj, attr, copy.deepcopy(getattr(self, attr), memo))
95 memo[id(self)] = obj
96 return obj
98 @cached_property
99 def headers(self):
100 return HttpHeaders(self.META)
102 @cached_property
103 def accepted_types(self):
104 """Return a list of MediaType instances."""
105 return parse_accept_header(self.headers.get("Accept", "*/*"))
107 def accepts(self, media_type):
108 return any(
109 accepted_type.match(media_type) for accepted_type in self.accepted_types
110 )
112 def _set_content_type_params(self, meta):
113 """Set content_type, content_params, and encoding."""
114 self.content_type, self.content_params = parse_header_parameters(
115 meta.get("CONTENT_TYPE", "")
116 )
117 if "charset" in self.content_params:
118 try:
119 codecs.lookup(self.content_params["charset"])
120 except LookupError:
121 pass
122 else:
123 self.encoding = self.content_params["charset"]
125 def _get_raw_host(self):
126 """
127 Return the HTTP host using the environment or request headers. Skip
128 allowed hosts protection, so may return an insecure host.
129 """
130 # We try three options, in order of decreasing preference.
131 if settings.USE_X_FORWARDED_HOST and ("HTTP_X_FORWARDED_HOST" in self.META):
132 host = self.META["HTTP_X_FORWARDED_HOST"]
133 elif "HTTP_HOST" in self.META:
134 host = self.META["HTTP_HOST"]
135 else:
136 # Reconstruct the host using the algorithm from PEP 333.
137 host = self.META["SERVER_NAME"]
138 server_port = self.get_port()
139 if server_port != ("443" if self.is_secure() else "80"):
140 host = "%s:%s" % (host, server_port)
141 return host
143 def get_host(self):
144 """Return the HTTP host using the environment or request headers."""
145 host = self._get_raw_host()
147 # Allow variants of localhost if ALLOWED_HOSTS is empty and DEBUG=True.
148 allowed_hosts = settings.ALLOWED_HOSTS
149 if settings.DEBUG and not allowed_hosts:
150 allowed_hosts = [".localhost", "127.0.0.1", "[::1]"]
152 domain, port = split_domain_port(host)
153 if domain and validate_host(domain, allowed_hosts):
154 return host
155 else:
156 msg = "Invalid HTTP_HOST header: %r." % host
157 if domain:
158 msg += " You may need to add %r to ALLOWED_HOSTS." % domain
159 else:
160 msg += (
161 " The domain name provided is not valid according to RFC 1034/1035."
162 )
163 raise DisallowedHost(msg)
165 def get_port(self):
166 """Return the port number for the request as a string."""
167 if settings.USE_X_FORWARDED_PORT and "HTTP_X_FORWARDED_PORT" in self.META:
168 port = self.META["HTTP_X_FORWARDED_PORT"]
169 else:
170 port = self.META["SERVER_PORT"]
171 return str(port)
173 def get_full_path(self, force_append_slash=False):
174 return self._get_full_path(self.path, force_append_slash)
176 def get_full_path_info(self, force_append_slash=False):
177 return self._get_full_path(self.path_info, force_append_slash)
179 def _get_full_path(self, path, force_append_slash):
180 # RFC 3986 requires query string arguments to be in the ASCII range.
181 # Rather than crash if this doesn't happen, we encode defensively.
182 return "%s%s%s" % (
183 escape_uri_path(path),
184 "/" if force_append_slash and not path.endswith("/") else "",
185 ("?" + iri_to_uri(self.META.get("QUERY_STRING", "")))
186 if self.META.get("QUERY_STRING", "")
187 else "",
188 )
190 def get_signed_cookie(self, key, default=RAISE_ERROR, salt="", max_age=None):
191 """
192 Attempt to return a signed cookie. If the signature fails or the
193 cookie has expired, raise an exception, unless the `default` argument
194 is provided, in which case return that value.
195 """
196 try:
197 cookie_value = self.COOKIES[key]
198 except KeyError:
199 if default is not RAISE_ERROR:
200 return default
201 else:
202 raise
203 try:
204 value = signing.get_cookie_signer(salt=key + salt).unsign(
205 cookie_value, max_age=max_age
206 )
207 except signing.BadSignature:
208 if default is not RAISE_ERROR:
209 return default
210 else:
211 raise
212 return value
214 def build_absolute_uri(self, location=None):
215 """
216 Build an absolute URI from the location and the variables available in
217 this request. If no ``location`` is specified, build the absolute URI
218 using request.get_full_path(). If the location is absolute, convert it
219 to an RFC 3987 compliant URI and return it. If location is relative or
220 is scheme-relative (i.e., ``//example.com/``), urljoin() it to a base
221 URL constructed from the request variables.
222 """
223 if location is None:
224 # Make it an absolute url (but schemeless and domainless) for the
225 # edge case that the path starts with '//'.
226 location = "//%s" % self.get_full_path()
227 else:
228 # Coerce lazy locations.
229 location = str(location)
230 bits = urlsplit(location)
231 if not (bits.scheme and bits.netloc):
232 # Handle the simple, most common case. If the location is absolute
233 # and a scheme or host (netloc) isn't provided, skip an expensive
234 # urljoin() as long as no path segments are '.' or '..'.
235 if (
236 bits.path.startswith("/")
237 and not bits.scheme
238 and not bits.netloc
239 and "/./" not in bits.path
240 and "/../" not in bits.path
241 ):
242 # If location starts with '//' but has no netloc, reuse the
243 # schema and netloc from the current request. Strip the double
244 # slashes and continue as if it wasn't specified.
245 if location.startswith("//"):
246 location = location[2:]
247 location = self._current_scheme_host + location
248 else:
249 # Join the constructed URL with the provided location, which
250 # allows the provided location to apply query strings to the
251 # base path.
252 location = urljoin(self._current_scheme_host + self.path, location)
253 return iri_to_uri(location)
255 @cached_property
256 def _current_scheme_host(self):
257 return "{}://{}".format(self.scheme, self.get_host())
259 def _get_scheme(self):
260 """
261 Hook for subclasses like WSGIRequest to implement. Return 'http' by
262 default.
263 """
264 return "http"
266 @property
267 def scheme(self):
268 if settings.SECURE_PROXY_SSL_HEADER:
269 try:
270 header, secure_value = settings.SECURE_PROXY_SSL_HEADER
271 except ValueError:
272 raise ImproperlyConfigured(
273 "The SECURE_PROXY_SSL_HEADER setting must be a tuple containing "
274 "two values."
275 )
276 header_value = self.META.get(header)
277 if header_value is not None:
278 header_value, *_ = header_value.split(",", 1)
279 return "https" if header_value.strip() == secure_value else "http"
280 return self._get_scheme()
282 def is_secure(self):
283 return self.scheme == "https"
285 @property
286 def encoding(self):
287 return self._encoding
289 @encoding.setter
290 def encoding(self, val):
291 """
292 Set the encoding used for GET/POST accesses. If the GET or POST
293 dictionary has already been created, remove and recreate it on the
294 next access (so that it is decoded correctly).
295 """
296 self._encoding = val
297 if hasattr(self, "GET"):
298 del self.GET
299 if hasattr(self, "_post"):
300 del self._post
302 def _initialize_handlers(self):
303 self._upload_handlers = [
304 uploadhandler.load_handler(handler, self)
305 for handler in settings.FILE_UPLOAD_HANDLERS
306 ]
308 @property
309 def upload_handlers(self):
310 if not self._upload_handlers:
311 # If there are no upload handlers defined, initialize them from settings.
312 self._initialize_handlers()
313 return self._upload_handlers
315 @upload_handlers.setter
316 def upload_handlers(self, upload_handlers):
317 if hasattr(self, "_files"):
318 raise AttributeError(
319 "You cannot set the upload handlers after the upload has been "
320 "processed."
321 )
322 self._upload_handlers = upload_handlers
324 def parse_file_upload(self, META, post_data):
325 """Return a tuple of (POST QueryDict, FILES MultiValueDict)."""
326 self.upload_handlers = ImmutableList(
327 self.upload_handlers,
328 warning=(
329 "You cannot alter upload handlers after the upload has been "
330 "processed."
331 ),
332 )
333 parser = MultiPartParser(META, post_data, self.upload_handlers, self.encoding)
334 return parser.parse()
336 @property
337 def body(self):
338 if not hasattr(self, "_body"):
339 if self._read_started:
340 raise RawPostDataException(
341 "You cannot access body after reading from request's data stream"
342 )
344 # Limit the maximum request data size that will be handled in-memory.
345 if (
346 settings.DATA_UPLOAD_MAX_MEMORY_SIZE is not None
347 and int(self.META.get("CONTENT_LENGTH") or 0)
348 > settings.DATA_UPLOAD_MAX_MEMORY_SIZE
349 ):
350 raise RequestDataTooBig(
351 "Request body exceeded settings.DATA_UPLOAD_MAX_MEMORY_SIZE."
352 )
354 try:
355 self._body = self.read()
356 except OSError as e:
357 raise UnreadablePostError(*e.args) from e
358 finally:
359 self._stream.close()
360 self._stream = BytesIO(self._body)
361 return self._body
363 def _mark_post_parse_error(self):
364 self._post = QueryDict()
365 self._files = MultiValueDict()
367 def _load_post_and_files(self):
368 """Populate self._post and self._files if the content-type is a form type"""
369 if self.method != "POST":
370 self._post, self._files = (
371 QueryDict(encoding=self._encoding),
372 MultiValueDict(),
373 )
374 return
375 if self._read_started and not hasattr(self, "_body"):
376 self._mark_post_parse_error()
377 return
379 if self.content_type == "multipart/form-data":
380 if hasattr(self, "_body"):
381 # Use already read data
382 data = BytesIO(self._body)
383 else:
384 data = self
385 try:
386 self._post, self._files = self.parse_file_upload(self.META, data)
387 except MultiPartParserError:
388 # An error occurred while parsing POST data. Since when
389 # formatting the error the request handler might access
390 # self.POST, set self._post and self._file to prevent
391 # attempts to parse POST data again.
392 self._mark_post_parse_error()
393 raise
394 elif self.content_type == "application/x-www-form-urlencoded":
395 self._post, self._files = (
396 QueryDict(self.body, encoding=self._encoding),
397 MultiValueDict(),
398 )
399 else:
400 self._post, self._files = (
401 QueryDict(encoding=self._encoding),
402 MultiValueDict(),
403 )
405 def close(self):
406 if hasattr(self, "_files"):
407 for f in chain.from_iterable(list_[1] for list_ in self._files.lists()):
408 f.close()
410 # File-like and iterator interface.
411 #
412 # Expects self._stream to be set to an appropriate source of bytes by
413 # a corresponding request subclass (e.g. WSGIRequest).
414 # Also when request data has already been read by request.POST or
415 # request.body, self._stream points to a BytesIO instance
416 # containing that data.
418 def read(self, *args, **kwargs):
419 self._read_started = True
420 try:
421 return self._stream.read(*args, **kwargs)
422 except OSError as e:
423 raise UnreadablePostError(*e.args) from e
425 def readline(self, *args, **kwargs):
426 self._read_started = True
427 try:
428 return self._stream.readline(*args, **kwargs)
429 except OSError as e:
430 raise UnreadablePostError(*e.args) from e
432 def __iter__(self):
433 return iter(self.readline, b"")
435 def readlines(self):
436 return list(self)
439class HttpHeaders(CaseInsensitiveMapping):
440 HTTP_PREFIX = "HTTP_"
441 # PEP 333 gives two headers which aren't prepended with HTTP_.
442 UNPREFIXED_HEADERS = {"CONTENT_TYPE", "CONTENT_LENGTH"}
444 def __init__(self, environ):
445 headers = {}
446 for header, value in environ.items():
447 name = self.parse_header_name(header)
448 if name:
449 headers[name] = value
450 super().__init__(headers)
452 def __getitem__(self, key):
453 """Allow header lookup using underscores in place of hyphens."""
454 return super().__getitem__(key.replace("_", "-"))
456 @classmethod
457 def parse_header_name(cls, header):
458 if header.startswith(cls.HTTP_PREFIX):
459 header = header[len(cls.HTTP_PREFIX) :]
460 elif header not in cls.UNPREFIXED_HEADERS:
461 return None
462 return header.replace("_", "-").title()
464 @classmethod
465 def to_wsgi_name(cls, header):
466 header = header.replace("-", "_").upper()
467 if header in cls.UNPREFIXED_HEADERS:
468 return header
469 return f"{cls.HTTP_PREFIX}{header}"
471 @classmethod
472 def to_asgi_name(cls, header):
473 return header.replace("-", "_").upper()
475 @classmethod
476 def to_wsgi_names(cls, headers):
477 return {
478 cls.to_wsgi_name(header_name): value
479 for header_name, value in headers.items()
480 }
482 @classmethod
483 def to_asgi_names(cls, headers):
484 return {
485 cls.to_asgi_name(header_name): value
486 for header_name, value in headers.items()
487 }
490class QueryDict(MultiValueDict):
491 """
492 A specialized MultiValueDict which represents a query string.
494 A QueryDict can be used to represent GET or POST data. It subclasses
495 MultiValueDict since keys in such data can be repeated, for instance
496 in the data from a form with a <select multiple> field.
498 By default QueryDicts are immutable, though the copy() method
499 will always return a mutable copy.
501 Both keys and values set on this class are converted from the given encoding
502 (DEFAULT_CHARSET by default) to str.
503 """
505 # These are both reset in __init__, but is specified here at the class
506 # level so that unpickling will have valid values
507 _mutable = True
508 _encoding = None
510 def __init__(self, query_string=None, mutable=False, encoding=None):
511 super().__init__()
512 self.encoding = encoding or settings.DEFAULT_CHARSET
513 query_string = query_string or ""
514 parse_qsl_kwargs = {
515 "keep_blank_values": True,
516 "encoding": self.encoding,
517 "max_num_fields": settings.DATA_UPLOAD_MAX_NUMBER_FIELDS,
518 }
519 if isinstance(query_string, bytes):
520 # query_string normally contains URL-encoded data, a subset of ASCII.
521 try:
522 query_string = query_string.decode(self.encoding)
523 except UnicodeDecodeError:
524 # ... but some user agents are misbehaving :-(
525 query_string = query_string.decode("iso-8859-1")
526 try:
527 for key, value in parse_qsl(query_string, **parse_qsl_kwargs):
528 self.appendlist(key, value)
529 except ValueError as e:
530 # ValueError can also be raised if the strict_parsing argument to
531 # parse_qsl() is True. As that is not used by Django, assume that
532 # the exception was raised by exceeding the value of max_num_fields
533 # instead of fragile checks of exception message strings.
534 raise TooManyFieldsSent(
535 "The number of GET/POST parameters exceeded "
536 "settings.DATA_UPLOAD_MAX_NUMBER_FIELDS."
537 ) from e
538 self._mutable = mutable
540 @classmethod
541 def fromkeys(cls, iterable, value="", mutable=False, encoding=None):
542 """
543 Return a new QueryDict with keys (may be repeated) from an iterable and
544 values from value.
545 """
546 q = cls("", mutable=True, encoding=encoding)
547 for key in iterable:
548 q.appendlist(key, value)
549 if not mutable:
550 q._mutable = False
551 return q
553 @property
554 def encoding(self):
555 if self._encoding is None:
556 self._encoding = settings.DEFAULT_CHARSET
557 return self._encoding
559 @encoding.setter
560 def encoding(self, value):
561 self._encoding = value
563 def _assert_mutable(self):
564 if not self._mutable:
565 raise AttributeError("This QueryDict instance is immutable")
567 def __setitem__(self, key, value):
568 self._assert_mutable()
569 key = bytes_to_text(key, self.encoding)
570 value = bytes_to_text(value, self.encoding)
571 super().__setitem__(key, value)
573 def __delitem__(self, key):
574 self._assert_mutable()
575 super().__delitem__(key)
577 def __copy__(self):
578 result = self.__class__("", mutable=True, encoding=self.encoding)
579 for key, value in self.lists():
580 result.setlist(key, value)
581 return result
583 def __deepcopy__(self, memo):
584 result = self.__class__("", mutable=True, encoding=self.encoding)
585 memo[id(self)] = result
586 for key, value in self.lists():
587 result.setlist(copy.deepcopy(key, memo), copy.deepcopy(value, memo))
588 return result
590 def setlist(self, key, list_):
591 self._assert_mutable()
592 key = bytes_to_text(key, self.encoding)
593 list_ = [bytes_to_text(elt, self.encoding) for elt in list_]
594 super().setlist(key, list_)
596 def setlistdefault(self, key, default_list=None):
597 self._assert_mutable()
598 return super().setlistdefault(key, default_list)
600 def appendlist(self, key, value):
601 self._assert_mutable()
602 key = bytes_to_text(key, self.encoding)
603 value = bytes_to_text(value, self.encoding)
604 super().appendlist(key, value)
606 def pop(self, key, *args):
607 self._assert_mutable()
608 return super().pop(key, *args)
610 def popitem(self):
611 self._assert_mutable()
612 return super().popitem()
614 def clear(self):
615 self._assert_mutable()
616 super().clear()
618 def setdefault(self, key, default=None):
619 self._assert_mutable()
620 key = bytes_to_text(key, self.encoding)
621 default = bytes_to_text(default, self.encoding)
622 return super().setdefault(key, default)
624 def copy(self):
625 """Return a mutable copy of this object."""
626 return self.__deepcopy__({})
628 def urlencode(self, safe=None):
629 """
630 Return an encoded string of all query string arguments.
632 `safe` specifies characters which don't require quoting, for example::
634 >>> q = QueryDict(mutable=True)
635 >>> q['next'] = '/a&b/'
636 >>> q.urlencode()
637 'next=%2Fa%26b%2F'
638 >>> q.urlencode(safe='/')
639 'next=/a%26b/'
640 """
641 output = []
642 if safe:
643 safe = safe.encode(self.encoding)
645 def encode(k, v):
646 return "%s=%s" % ((quote(k, safe), quote(v, safe)))
648 else:
650 def encode(k, v):
651 return urlencode({k: v})
653 for k, list_ in self.lists():
654 output.extend(
655 encode(k.encode(self.encoding), str(v).encode(self.encoding))
656 for v in list_
657 )
658 return "&".join(output)
661class MediaType:
662 def __init__(self, media_type_raw_line):
663 full_type, self.params = parse_header_parameters(
664 media_type_raw_line if media_type_raw_line else ""
665 )
666 self.main_type, _, self.sub_type = full_type.partition("/")
668 def __str__(self):
669 params_str = "".join("; %s=%s" % (k, v) for k, v in self.params.items())
670 return "%s%s%s" % (
671 self.main_type,
672 ("/%s" % self.sub_type) if self.sub_type else "",
673 params_str,
674 )
676 def __repr__(self):
677 return "<%s: %s>" % (self.__class__.__qualname__, self)
679 @property
680 def is_all_types(self):
681 return self.main_type == "*" and self.sub_type == "*"
683 def match(self, other):
684 if self.is_all_types:
685 return True
686 other = MediaType(other)
687 if self.main_type == other.main_type and self.sub_type in {"*", other.sub_type}:
688 return True
689 return False
692# It's neither necessary nor appropriate to use
693# django.utils.encoding.force_str() for parsing URLs and form inputs. Thus,
694# this slightly more restricted function, used by QueryDict.
695def bytes_to_text(s, encoding):
696 """
697 Convert bytes objects to strings, using the given encoding. Illegally
698 encoded input characters are replaced with Unicode "unknown" codepoint
699 (\ufffd).
701 Return any non-bytes objects without change.
702 """
703 if isinstance(s, bytes):
704 return str(s, encoding, "replace")
705 else:
706 return s
709def split_domain_port(host):
710 """
711 Return a (domain, port) tuple from a given host.
713 Returned domain is lowercased. If the host is invalid, the domain will be
714 empty.
715 """
716 host = host.lower()
718 if not host_validation_re.match(host):
719 return "", ""
721 if host[-1] == "]":
722 # It's an IPv6 address without a port.
723 return host, ""
724 bits = host.rsplit(":", 1)
725 domain, port = bits if len(bits) == 2 else (bits[0], "")
726 # Remove a trailing dot (if present) from the domain.
727 domain = domain[:-1] if domain.endswith(".") else domain
728 return domain, port
731def validate_host(host, allowed_hosts):
732 """
733 Validate the given host for this site.
735 Check that the host looks valid and matches a host or host pattern in the
736 given list of ``allowed_hosts``. Any pattern beginning with a period
737 matches a domain and all its subdomains (e.g. ``.example.com`` matches
738 ``example.com`` and any subdomain), ``*`` matches anything, and anything
739 else must match exactly.
741 Note: This function assumes that the given host is lowercased and has
742 already had the port, if any, stripped off.
744 Return ``True`` for a valid host, ``False`` otherwise.
745 """
746 return any(
747 pattern == "*" or is_same_domain(host, pattern) for pattern in allowed_hosts
748 )
751def parse_accept_header(header):
752 return [MediaType(token) for token in header.split(",") if token.strip()]