Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/starlette/responses.py: 29%

187 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import http.cookies 

2import json 

3import os 

4import stat 

5import sys 

6import typing 

7from datetime import datetime 

8from email.utils import format_datetime, formatdate 

9from functools import partial 

10from mimetypes import guess_type as mimetypes_guess_type 

11from urllib.parse import quote 

12 

13import anyio 

14 

15from starlette._compat import md5_hexdigest 

16from starlette.background import BackgroundTask 

17from starlette.concurrency import iterate_in_threadpool 

18from starlette.datastructures import URL, MutableHeaders 

19from starlette.types import Receive, Scope, Send 

20 

21if sys.version_info >= (3, 8): # pragma: no cover 

22 from typing import Literal 

23else: # pragma: no cover 

24 from typing_extensions import Literal 

25 

26# Workaround for adding samesite support to pre 3.8 python 

27http.cookies.Morsel._reserved["samesite"] = "SameSite" # type: ignore[attr-defined] 

28 

29 

30# Compatibility wrapper for `mimetypes.guess_type` to support `os.PathLike` on <py3.8 

31def guess_type( 

32 url: typing.Union[str, "os.PathLike[str]"], strict: bool = True 

33) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]: 

34 if sys.version_info < (3, 8): # pragma: no cover 

35 url = os.fspath(url) 

36 return mimetypes_guess_type(url, strict) 

37 

38 

39class Response: 

40 media_type = None 

41 charset = "utf-8" 

42 

43 def __init__( 

44 self, 

45 content: typing.Any = None, 

46 status_code: int = 200, 

47 headers: typing.Optional[typing.Mapping[str, str]] = None, 

48 media_type: typing.Optional[str] = None, 

49 background: typing.Optional[BackgroundTask] = None, 

50 ) -> None: 

51 self.status_code = status_code 

52 if media_type is not None: 

53 self.media_type = media_type 

54 self.background = background 

55 self.body = self.render(content) 

56 self.init_headers(headers) 

57 

58 def render(self, content: typing.Any) -> bytes: 

59 if content is None: 

60 return b"" 

61 if isinstance(content, bytes): 

62 return content 

63 return content.encode(self.charset) 

64 

65 def init_headers( 

66 self, headers: typing.Optional[typing.Mapping[str, str]] = None 

67 ) -> None: 

68 if headers is None: 

69 raw_headers: typing.List[typing.Tuple[bytes, bytes]] = [] 

70 populate_content_length = True 

71 populate_content_type = True 

72 else: 

73 raw_headers = [ 

74 (k.lower().encode("latin-1"), v.encode("latin-1")) 

75 for k, v in headers.items() 

76 ] 

77 keys = [h[0] for h in raw_headers] 

78 populate_content_length = b"content-length" not in keys 

79 populate_content_type = b"content-type" not in keys 

80 

81 body = getattr(self, "body", None) 

82 if ( 

83 body is not None 

84 and populate_content_length 

85 and not (self.status_code < 200 or self.status_code in (204, 304)) 

86 ): 

87 content_length = str(len(body)) 

88 raw_headers.append((b"content-length", content_length.encode("latin-1"))) 

89 

90 content_type = self.media_type 

91 if content_type is not None and populate_content_type: 

92 if content_type.startswith("text/"): 

93 content_type += "; charset=" + self.charset 

94 raw_headers.append((b"content-type", content_type.encode("latin-1"))) 

95 

96 self.raw_headers = raw_headers 

97 

98 @property 

99 def headers(self) -> MutableHeaders: 

100 if not hasattr(self, "_headers"): 

101 self._headers = MutableHeaders(raw=self.raw_headers) 

102 return self._headers 

103 

104 def set_cookie( 

105 self, 

106 key: str, 

107 value: str = "", 

108 max_age: typing.Optional[int] = None, 

109 expires: typing.Optional[typing.Union[datetime, str, int]] = None, 

110 path: str = "/", 

111 domain: typing.Optional[str] = None, 

112 secure: bool = False, 

113 httponly: bool = False, 

114 samesite: typing.Optional[Literal["lax", "strict", "none"]] = "lax", 

115 ) -> None: 

116 cookie: "http.cookies.BaseCookie[str]" = http.cookies.SimpleCookie() 

117 cookie[key] = value 

118 if max_age is not None: 

119 cookie[key]["max-age"] = max_age 

120 if expires is not None: 

121 if isinstance(expires, datetime): 

122 cookie[key]["expires"] = format_datetime(expires, usegmt=True) 

123 else: 

124 cookie[key]["expires"] = expires 

125 if path is not None: 

126 cookie[key]["path"] = path 

127 if domain is not None: 

128 cookie[key]["domain"] = domain 

129 if secure: 

130 cookie[key]["secure"] = True 

131 if httponly: 

132 cookie[key]["httponly"] = True 

133 if samesite is not None: 

134 assert samesite.lower() in [ 

135 "strict", 

136 "lax", 

137 "none", 

138 ], "samesite must be either 'strict', 'lax' or 'none'" 

139 cookie[key]["samesite"] = samesite 

140 cookie_val = cookie.output(header="").strip() 

141 self.raw_headers.append((b"set-cookie", cookie_val.encode("latin-1"))) 

142 

143 def delete_cookie( 

144 self, 

145 key: str, 

146 path: str = "/", 

147 domain: typing.Optional[str] = None, 

148 secure: bool = False, 

149 httponly: bool = False, 

150 samesite: typing.Optional[Literal["lax", "strict", "none"]] = "lax", 

151 ) -> None: 

152 self.set_cookie( 

153 key, 

154 max_age=0, 

155 expires=0, 

156 path=path, 

157 domain=domain, 

158 secure=secure, 

159 httponly=httponly, 

160 samesite=samesite, 

161 ) 

162 

163 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

164 await send( 

165 { 

166 "type": "http.response.start", 

167 "status": self.status_code, 

168 "headers": self.raw_headers, 

169 } 

170 ) 

171 await send({"type": "http.response.body", "body": self.body}) 

172 

173 if self.background is not None: 

174 await self.background() 

175 

176 

177class HTMLResponse(Response): 

178 media_type = "text/html" 

179 

180 

181class PlainTextResponse(Response): 

182 media_type = "text/plain" 

183 

184 

185class JSONResponse(Response): 

186 media_type = "application/json" 

187 

188 def __init__( 

189 self, 

190 content: typing.Any, 

191 status_code: int = 200, 

192 headers: typing.Optional[typing.Dict[str, str]] = None, 

193 media_type: typing.Optional[str] = None, 

194 background: typing.Optional[BackgroundTask] = None, 

195 ) -> None: 

196 super().__init__(content, status_code, headers, media_type, background) 

197 

198 def render(self, content: typing.Any) -> bytes: 

199 return json.dumps( 

200 content, 

201 ensure_ascii=False, 

202 allow_nan=False, 

203 indent=None, 

204 separators=(",", ":"), 

205 ).encode("utf-8") 

206 

207 

208class RedirectResponse(Response): 

209 def __init__( 

210 self, 

211 url: typing.Union[str, URL], 

212 status_code: int = 307, 

213 headers: typing.Optional[typing.Mapping[str, str]] = None, 

214 background: typing.Optional[BackgroundTask] = None, 

215 ) -> None: 

216 super().__init__( 

217 content=b"", status_code=status_code, headers=headers, background=background 

218 ) 

219 self.headers["location"] = quote(str(url), safe=":/%#?=@[]!$&'()*+,;") 

220 

221 

222Content = typing.Union[str, bytes] 

223SyncContentStream = typing.Iterator[Content] 

224AsyncContentStream = typing.AsyncIterable[Content] 

225ContentStream = typing.Union[AsyncContentStream, SyncContentStream] 

226 

227 

228class StreamingResponse(Response): 

229 body_iterator: AsyncContentStream 

230 

231 def __init__( 

232 self, 

233 content: ContentStream, 

234 status_code: int = 200, 

235 headers: typing.Optional[typing.Mapping[str, str]] = None, 

236 media_type: typing.Optional[str] = None, 

237 background: typing.Optional[BackgroundTask] = None, 

238 ) -> None: 

239 if isinstance(content, typing.AsyncIterable): 

240 self.body_iterator = content 

241 else: 

242 self.body_iterator = iterate_in_threadpool(content) 

243 self.status_code = status_code 

244 self.media_type = self.media_type if media_type is None else media_type 

245 self.background = background 

246 self.init_headers(headers) 

247 

248 async def listen_for_disconnect(self, receive: Receive) -> None: 

249 while True: 

250 message = await receive() 

251 if message["type"] == "http.disconnect": 

252 break 

253 

254 async def stream_response(self, send: Send) -> None: 

255 await send( 

256 { 

257 "type": "http.response.start", 

258 "status": self.status_code, 

259 "headers": self.raw_headers, 

260 } 

261 ) 

262 async for chunk in self.body_iterator: 

263 if not isinstance(chunk, bytes): 

264 chunk = chunk.encode(self.charset) 

265 await send({"type": "http.response.body", "body": chunk, "more_body": True}) 

266 

267 await send({"type": "http.response.body", "body": b"", "more_body": False}) 

268 

269 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

270 async with anyio.create_task_group() as task_group: 

271 

272 async def wrap(func: "typing.Callable[[], typing.Awaitable[None]]") -> None: 

273 await func() 

274 task_group.cancel_scope.cancel() 

275 

276 task_group.start_soon(wrap, partial(self.stream_response, send)) 

277 await wrap(partial(self.listen_for_disconnect, receive)) 

278 

279 if self.background is not None: 

280 await self.background() 

281 

282 

283class FileResponse(Response): 

284 chunk_size = 64 * 1024 

285 

286 def __init__( 

287 self, 

288 path: typing.Union[str, "os.PathLike[str]"], 

289 status_code: int = 200, 

290 headers: typing.Optional[typing.Mapping[str, str]] = None, 

291 media_type: typing.Optional[str] = None, 

292 background: typing.Optional[BackgroundTask] = None, 

293 filename: typing.Optional[str] = None, 

294 stat_result: typing.Optional[os.stat_result] = None, 

295 method: typing.Optional[str] = None, 

296 content_disposition_type: str = "attachment", 

297 ) -> None: 

298 self.path = path 

299 self.status_code = status_code 

300 self.filename = filename 

301 self.send_header_only = method is not None and method.upper() == "HEAD" 

302 if media_type is None: 

303 media_type = guess_type(filename or path)[0] or "text/plain" 

304 self.media_type = media_type 

305 self.background = background 

306 self.init_headers(headers) 

307 if self.filename is not None: 

308 content_disposition_filename = quote(self.filename) 

309 if content_disposition_filename != self.filename: 

310 content_disposition = "{}; filename*=utf-8''{}".format( 

311 content_disposition_type, content_disposition_filename 

312 ) 

313 else: 

314 content_disposition = '{}; filename="{}"'.format( 

315 content_disposition_type, self.filename 

316 ) 

317 self.headers.setdefault("content-disposition", content_disposition) 

318 self.stat_result = stat_result 

319 if stat_result is not None: 

320 self.set_stat_headers(stat_result) 

321 

322 def set_stat_headers(self, stat_result: os.stat_result) -> None: 

323 content_length = str(stat_result.st_size) 

324 last_modified = formatdate(stat_result.st_mtime, usegmt=True) 

325 etag_base = str(stat_result.st_mtime) + "-" + str(stat_result.st_size) 

326 etag = md5_hexdigest(etag_base.encode(), usedforsecurity=False) 

327 

328 self.headers.setdefault("content-length", content_length) 

329 self.headers.setdefault("last-modified", last_modified) 

330 self.headers.setdefault("etag", etag) 

331 

332 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

333 if self.stat_result is None: 

334 try: 

335 stat_result = await anyio.to_thread.run_sync(os.stat, self.path) 

336 self.set_stat_headers(stat_result) 

337 except FileNotFoundError: 

338 raise RuntimeError(f"File at path {self.path} does not exist.") 

339 else: 

340 mode = stat_result.st_mode 

341 if not stat.S_ISREG(mode): 

342 raise RuntimeError(f"File at path {self.path} is not a file.") 

343 await send( 

344 { 

345 "type": "http.response.start", 

346 "status": self.status_code, 

347 "headers": self.raw_headers, 

348 } 

349 ) 

350 if self.send_header_only: 

351 await send({"type": "http.response.body", "body": b"", "more_body": False}) 

352 else: 

353 async with await anyio.open_file(self.path, mode="rb") as file: 

354 more_body = True 

355 while more_body: 

356 chunk = await file.read(self.chunk_size) 

357 more_body = len(chunk) == self.chunk_size 

358 await send( 

359 { 

360 "type": "http.response.body", 

361 "body": chunk, 

362 "more_body": more_body, 

363 } 

364 ) 

365 if self.background is not None: 

366 await self.background()