Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/starlette/responses.py: 29%
187 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import http.cookies
2import json
3import os
4import stat
5import sys
6import typing
7from datetime import datetime
8from email.utils import format_datetime, formatdate
9from functools import partial
10from mimetypes import guess_type as mimetypes_guess_type
11from urllib.parse import quote
13import anyio
15from starlette._compat import md5_hexdigest
16from starlette.background import BackgroundTask
17from starlette.concurrency import iterate_in_threadpool
18from starlette.datastructures import URL, MutableHeaders
19from starlette.types import Receive, Scope, Send
21if sys.version_info >= (3, 8): # pragma: no cover
22 from typing import Literal
23else: # pragma: no cover
24 from typing_extensions import Literal
26# Workaround for adding samesite support to pre 3.8 python
27http.cookies.Morsel._reserved["samesite"] = "SameSite" # type: ignore[attr-defined]
30# Compatibility wrapper for `mimetypes.guess_type` to support `os.PathLike` on <py3.8
31def guess_type(
32 url: typing.Union[str, "os.PathLike[str]"], strict: bool = True
33) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]:
34 if sys.version_info < (3, 8): # pragma: no cover
35 url = os.fspath(url)
36 return mimetypes_guess_type(url, strict)
39class Response:
40 media_type = None
41 charset = "utf-8"
43 def __init__(
44 self,
45 content: typing.Any = None,
46 status_code: int = 200,
47 headers: typing.Optional[typing.Mapping[str, str]] = None,
48 media_type: typing.Optional[str] = None,
49 background: typing.Optional[BackgroundTask] = None,
50 ) -> None:
51 self.status_code = status_code
52 if media_type is not None:
53 self.media_type = media_type
54 self.background = background
55 self.body = self.render(content)
56 self.init_headers(headers)
58 def render(self, content: typing.Any) -> bytes:
59 if content is None:
60 return b""
61 if isinstance(content, bytes):
62 return content
63 return content.encode(self.charset)
65 def init_headers(
66 self, headers: typing.Optional[typing.Mapping[str, str]] = None
67 ) -> None:
68 if headers is None:
69 raw_headers: typing.List[typing.Tuple[bytes, bytes]] = []
70 populate_content_length = True
71 populate_content_type = True
72 else:
73 raw_headers = [
74 (k.lower().encode("latin-1"), v.encode("latin-1"))
75 for k, v in headers.items()
76 ]
77 keys = [h[0] for h in raw_headers]
78 populate_content_length = b"content-length" not in keys
79 populate_content_type = b"content-type" not in keys
81 body = getattr(self, "body", None)
82 if (
83 body is not None
84 and populate_content_length
85 and not (self.status_code < 200 or self.status_code in (204, 304))
86 ):
87 content_length = str(len(body))
88 raw_headers.append((b"content-length", content_length.encode("latin-1")))
90 content_type = self.media_type
91 if content_type is not None and populate_content_type:
92 if content_type.startswith("text/"):
93 content_type += "; charset=" + self.charset
94 raw_headers.append((b"content-type", content_type.encode("latin-1")))
96 self.raw_headers = raw_headers
98 @property
99 def headers(self) -> MutableHeaders:
100 if not hasattr(self, "_headers"):
101 self._headers = MutableHeaders(raw=self.raw_headers)
102 return self._headers
104 def set_cookie(
105 self,
106 key: str,
107 value: str = "",
108 max_age: typing.Optional[int] = None,
109 expires: typing.Optional[typing.Union[datetime, str, int]] = None,
110 path: str = "/",
111 domain: typing.Optional[str] = None,
112 secure: bool = False,
113 httponly: bool = False,
114 samesite: typing.Optional[Literal["lax", "strict", "none"]] = "lax",
115 ) -> None:
116 cookie: "http.cookies.BaseCookie[str]" = http.cookies.SimpleCookie()
117 cookie[key] = value
118 if max_age is not None:
119 cookie[key]["max-age"] = max_age
120 if expires is not None:
121 if isinstance(expires, datetime):
122 cookie[key]["expires"] = format_datetime(expires, usegmt=True)
123 else:
124 cookie[key]["expires"] = expires
125 if path is not None:
126 cookie[key]["path"] = path
127 if domain is not None:
128 cookie[key]["domain"] = domain
129 if secure:
130 cookie[key]["secure"] = True
131 if httponly:
132 cookie[key]["httponly"] = True
133 if samesite is not None:
134 assert samesite.lower() in [
135 "strict",
136 "lax",
137 "none",
138 ], "samesite must be either 'strict', 'lax' or 'none'"
139 cookie[key]["samesite"] = samesite
140 cookie_val = cookie.output(header="").strip()
141 self.raw_headers.append((b"set-cookie", cookie_val.encode("latin-1")))
143 def delete_cookie(
144 self,
145 key: str,
146 path: str = "/",
147 domain: typing.Optional[str] = None,
148 secure: bool = False,
149 httponly: bool = False,
150 samesite: typing.Optional[Literal["lax", "strict", "none"]] = "lax",
151 ) -> None:
152 self.set_cookie(
153 key,
154 max_age=0,
155 expires=0,
156 path=path,
157 domain=domain,
158 secure=secure,
159 httponly=httponly,
160 samesite=samesite,
161 )
163 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
164 await send(
165 {
166 "type": "http.response.start",
167 "status": self.status_code,
168 "headers": self.raw_headers,
169 }
170 )
171 await send({"type": "http.response.body", "body": self.body})
173 if self.background is not None:
174 await self.background()
177class HTMLResponse(Response):
178 media_type = "text/html"
181class PlainTextResponse(Response):
182 media_type = "text/plain"
185class JSONResponse(Response):
186 media_type = "application/json"
188 def __init__(
189 self,
190 content: typing.Any,
191 status_code: int = 200,
192 headers: typing.Optional[typing.Dict[str, str]] = None,
193 media_type: typing.Optional[str] = None,
194 background: typing.Optional[BackgroundTask] = None,
195 ) -> None:
196 super().__init__(content, status_code, headers, media_type, background)
198 def render(self, content: typing.Any) -> bytes:
199 return json.dumps(
200 content,
201 ensure_ascii=False,
202 allow_nan=False,
203 indent=None,
204 separators=(",", ":"),
205 ).encode("utf-8")
208class RedirectResponse(Response):
209 def __init__(
210 self,
211 url: typing.Union[str, URL],
212 status_code: int = 307,
213 headers: typing.Optional[typing.Mapping[str, str]] = None,
214 background: typing.Optional[BackgroundTask] = None,
215 ) -> None:
216 super().__init__(
217 content=b"", status_code=status_code, headers=headers, background=background
218 )
219 self.headers["location"] = quote(str(url), safe=":/%#?=@[]!$&'()*+,;")
222Content = typing.Union[str, bytes]
223SyncContentStream = typing.Iterator[Content]
224AsyncContentStream = typing.AsyncIterable[Content]
225ContentStream = typing.Union[AsyncContentStream, SyncContentStream]
228class StreamingResponse(Response):
229 body_iterator: AsyncContentStream
231 def __init__(
232 self,
233 content: ContentStream,
234 status_code: int = 200,
235 headers: typing.Optional[typing.Mapping[str, str]] = None,
236 media_type: typing.Optional[str] = None,
237 background: typing.Optional[BackgroundTask] = None,
238 ) -> None:
239 if isinstance(content, typing.AsyncIterable):
240 self.body_iterator = content
241 else:
242 self.body_iterator = iterate_in_threadpool(content)
243 self.status_code = status_code
244 self.media_type = self.media_type if media_type is None else media_type
245 self.background = background
246 self.init_headers(headers)
248 async def listen_for_disconnect(self, receive: Receive) -> None:
249 while True:
250 message = await receive()
251 if message["type"] == "http.disconnect":
252 break
254 async def stream_response(self, send: Send) -> None:
255 await send(
256 {
257 "type": "http.response.start",
258 "status": self.status_code,
259 "headers": self.raw_headers,
260 }
261 )
262 async for chunk in self.body_iterator:
263 if not isinstance(chunk, bytes):
264 chunk = chunk.encode(self.charset)
265 await send({"type": "http.response.body", "body": chunk, "more_body": True})
267 await send({"type": "http.response.body", "body": b"", "more_body": False})
269 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
270 async with anyio.create_task_group() as task_group:
272 async def wrap(func: "typing.Callable[[], typing.Awaitable[None]]") -> None:
273 await func()
274 task_group.cancel_scope.cancel()
276 task_group.start_soon(wrap, partial(self.stream_response, send))
277 await wrap(partial(self.listen_for_disconnect, receive))
279 if self.background is not None:
280 await self.background()
283class FileResponse(Response):
284 chunk_size = 64 * 1024
286 def __init__(
287 self,
288 path: typing.Union[str, "os.PathLike[str]"],
289 status_code: int = 200,
290 headers: typing.Optional[typing.Mapping[str, str]] = None,
291 media_type: typing.Optional[str] = None,
292 background: typing.Optional[BackgroundTask] = None,
293 filename: typing.Optional[str] = None,
294 stat_result: typing.Optional[os.stat_result] = None,
295 method: typing.Optional[str] = None,
296 content_disposition_type: str = "attachment",
297 ) -> None:
298 self.path = path
299 self.status_code = status_code
300 self.filename = filename
301 self.send_header_only = method is not None and method.upper() == "HEAD"
302 if media_type is None:
303 media_type = guess_type(filename or path)[0] or "text/plain"
304 self.media_type = media_type
305 self.background = background
306 self.init_headers(headers)
307 if self.filename is not None:
308 content_disposition_filename = quote(self.filename)
309 if content_disposition_filename != self.filename:
310 content_disposition = "{}; filename*=utf-8''{}".format(
311 content_disposition_type, content_disposition_filename
312 )
313 else:
314 content_disposition = '{}; filename="{}"'.format(
315 content_disposition_type, self.filename
316 )
317 self.headers.setdefault("content-disposition", content_disposition)
318 self.stat_result = stat_result
319 if stat_result is not None:
320 self.set_stat_headers(stat_result)
322 def set_stat_headers(self, stat_result: os.stat_result) -> None:
323 content_length = str(stat_result.st_size)
324 last_modified = formatdate(stat_result.st_mtime, usegmt=True)
325 etag_base = str(stat_result.st_mtime) + "-" + str(stat_result.st_size)
326 etag = md5_hexdigest(etag_base.encode(), usedforsecurity=False)
328 self.headers.setdefault("content-length", content_length)
329 self.headers.setdefault("last-modified", last_modified)
330 self.headers.setdefault("etag", etag)
332 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
333 if self.stat_result is None:
334 try:
335 stat_result = await anyio.to_thread.run_sync(os.stat, self.path)
336 self.set_stat_headers(stat_result)
337 except FileNotFoundError:
338 raise RuntimeError(f"File at path {self.path} does not exist.")
339 else:
340 mode = stat_result.st_mode
341 if not stat.S_ISREG(mode):
342 raise RuntimeError(f"File at path {self.path} is not a file.")
343 await send(
344 {
345 "type": "http.response.start",
346 "status": self.status_code,
347 "headers": self.raw_headers,
348 }
349 )
350 if self.send_header_only:
351 await send({"type": "http.response.body", "body": b"", "more_body": False})
352 else:
353 async with await anyio.open_file(self.path, mode="rb") as file:
354 more_body = True
355 while more_body:
356 chunk = await file.read(self.chunk_size)
357 more_body = len(chunk) == self.chunk_size
358 await send(
359 {
360 "type": "http.response.body",
361 "body": chunk,
362 "more_body": more_body,
363 }
364 )
365 if self.background is not None:
366 await self.background()