Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/starlette/staticfiles.py: 20%

121 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import importlib.util 

2import os 

3import stat 

4import typing 

5from email.utils import parsedate 

6 

7import anyio 

8 

9from starlette.datastructures import URL, Headers 

10from starlette.exceptions import HTTPException 

11from starlette.responses import FileResponse, RedirectResponse, Response 

12from starlette.types import Receive, Scope, Send 

13 

14PathLike = typing.Union[str, "os.PathLike[str]"] 

15 

16 

17class NotModifiedResponse(Response): 

18 NOT_MODIFIED_HEADERS = ( 

19 "cache-control", 

20 "content-location", 

21 "date", 

22 "etag", 

23 "expires", 

24 "vary", 

25 ) 

26 

27 def __init__(self, headers: Headers): 

28 super().__init__( 

29 status_code=304, 

30 headers={ 

31 name: value 

32 for name, value in headers.items() 

33 if name in self.NOT_MODIFIED_HEADERS 

34 }, 

35 ) 

36 

37 

38class StaticFiles: 

39 def __init__( 

40 self, 

41 *, 

42 directory: typing.Optional[PathLike] = None, 

43 packages: typing.Optional[ 

44 typing.List[typing.Union[str, typing.Tuple[str, str]]] 

45 ] = None, 

46 html: bool = False, 

47 check_dir: bool = True, 

48 follow_symlink: bool = False, 

49 ) -> None: 

50 self.directory = directory 

51 self.packages = packages 

52 self.all_directories = self.get_directories(directory, packages) 

53 self.html = html 

54 self.config_checked = False 

55 self.follow_symlink = follow_symlink 

56 if check_dir and directory is not None and not os.path.isdir(directory): 

57 raise RuntimeError(f"Directory '{directory}' does not exist") 

58 

59 def get_directories( 

60 self, 

61 directory: typing.Optional[PathLike] = None, 

62 packages: typing.Optional[ 

63 typing.List[typing.Union[str, typing.Tuple[str, str]]] 

64 ] = None, 

65 ) -> typing.List[PathLike]: 

66 """ 

67 Given `directory` and `packages` arguments, return a list of all the 

68 directories that should be used for serving static files from. 

69 """ 

70 directories = [] 

71 if directory is not None: 

72 directories.append(directory) 

73 

74 for package in packages or []: 

75 if isinstance(package, tuple): 

76 package, statics_dir = package 

77 else: 

78 statics_dir = "statics" 

79 spec = importlib.util.find_spec(package) 

80 assert spec is not None, f"Package {package!r} could not be found." 

81 assert spec.origin is not None, f"Package {package!r} could not be found." 

82 package_directory = os.path.normpath( 

83 os.path.join(spec.origin, "..", statics_dir) 

84 ) 

85 assert os.path.isdir( 

86 package_directory 

87 ), f"Directory '{statics_dir!r}' in package {package!r} could not be found." 

88 directories.append(package_directory) 

89 

90 return directories 

91 

92 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

93 """ 

94 The ASGI entry point. 

95 """ 

96 assert scope["type"] == "http" 

97 

98 if not self.config_checked: 

99 await self.check_config() 

100 self.config_checked = True 

101 

102 path = self.get_path(scope) 

103 response = await self.get_response(path, scope) 

104 await response(scope, receive, send) 

105 

106 def get_path(self, scope: Scope) -> str: 

107 """ 

108 Given the ASGI scope, return the `path` string to serve up, 

109 with OS specific path separators, and any '..', '.' components removed. 

110 """ 

111 return os.path.normpath(os.path.join(*scope["path"].split("/"))) 

112 

113 async def get_response(self, path: str, scope: Scope) -> Response: 

114 """ 

115 Returns an HTTP response, given the incoming path, method and request headers. 

116 """ 

117 if scope["method"] not in ("GET", "HEAD"): 

118 raise HTTPException(status_code=405) 

119 

120 try: 

121 full_path, stat_result = await anyio.to_thread.run_sync( 

122 self.lookup_path, path 

123 ) 

124 except PermissionError: 

125 raise HTTPException(status_code=401) 

126 except OSError: 

127 raise 

128 

129 if stat_result and stat.S_ISREG(stat_result.st_mode): 

130 # We have a static file to serve. 

131 return self.file_response(full_path, stat_result, scope) 

132 

133 elif stat_result and stat.S_ISDIR(stat_result.st_mode) and self.html: 

134 # We're in HTML mode, and have got a directory URL. 

135 # Check if we have 'index.html' file to serve. 

136 index_path = os.path.join(path, "index.html") 

137 full_path, stat_result = await anyio.to_thread.run_sync( 

138 self.lookup_path, index_path 

139 ) 

140 if stat_result is not None and stat.S_ISREG(stat_result.st_mode): 

141 if not scope["path"].endswith("/"): 

142 # Directory URLs should redirect to always end in "/". 

143 url = URL(scope=scope) 

144 url = url.replace(path=url.path + "/") 

145 return RedirectResponse(url=url) 

146 return self.file_response(full_path, stat_result, scope) 

147 

148 if self.html: 

149 # Check for '404.html' if we're in HTML mode. 

150 full_path, stat_result = await anyio.to_thread.run_sync( 

151 self.lookup_path, "404.html" 

152 ) 

153 if stat_result and stat.S_ISREG(stat_result.st_mode): 

154 return FileResponse( 

155 full_path, 

156 stat_result=stat_result, 

157 method=scope["method"], 

158 status_code=404, 

159 ) 

160 raise HTTPException(status_code=404) 

161 

162 def lookup_path( 

163 self, path: str 

164 ) -> typing.Tuple[str, typing.Optional[os.stat_result]]: 

165 for directory in self.all_directories: 

166 joined_path = os.path.join(directory, path) 

167 if self.follow_symlink: 

168 full_path = os.path.abspath(joined_path) 

169 else: 

170 full_path = os.path.realpath(joined_path) 

171 directory = os.path.realpath(directory) 

172 if os.path.commonprefix([full_path, directory]) != directory: 

173 # Don't allow misbehaving clients to break out of the static files 

174 # directory. 

175 continue 

176 try: 

177 return full_path, os.stat(full_path) 

178 except (FileNotFoundError, NotADirectoryError): 

179 continue 

180 return "", None 

181 

182 def file_response( 

183 self, 

184 full_path: PathLike, 

185 stat_result: os.stat_result, 

186 scope: Scope, 

187 status_code: int = 200, 

188 ) -> Response: 

189 method = scope["method"] 

190 request_headers = Headers(scope=scope) 

191 

192 response = FileResponse( 

193 full_path, status_code=status_code, stat_result=stat_result, method=method 

194 ) 

195 if self.is_not_modified(response.headers, request_headers): 

196 return NotModifiedResponse(response.headers) 

197 return response 

198 

199 async def check_config(self) -> None: 

200 """ 

201 Perform a one-off configuration check that StaticFiles is actually 

202 pointed at a directory, so that we can raise loud errors rather than 

203 just returning 404 responses. 

204 """ 

205 if self.directory is None: 

206 return 

207 

208 try: 

209 stat_result = await anyio.to_thread.run_sync(os.stat, self.directory) 

210 except FileNotFoundError: 

211 raise RuntimeError( 

212 f"StaticFiles directory '{self.directory}' does not exist." 

213 ) 

214 if not (stat.S_ISDIR(stat_result.st_mode) or stat.S_ISLNK(stat_result.st_mode)): 

215 raise RuntimeError( 

216 f"StaticFiles path '{self.directory}' is not a directory." 

217 ) 

218 

219 def is_not_modified( 

220 self, response_headers: Headers, request_headers: Headers 

221 ) -> bool: 

222 """ 

223 Given the request and response headers, return `True` if an HTTP 

224 "Not Modified" response could be returned instead. 

225 """ 

226 try: 

227 if_none_match = request_headers["if-none-match"] 

228 etag = response_headers["etag"] 

229 if if_none_match == etag: 

230 return True 

231 except KeyError: 

232 pass 

233 

234 try: 

235 if_modified_since = parsedate(request_headers["if-modified-since"]) 

236 last_modified = parsedate(response_headers["last-modified"]) 

237 if ( 

238 if_modified_since is not None 

239 and last_modified is not None 

240 and if_modified_since >= last_modified 

241 ): 

242 return True 

243 except KeyError: 

244 pass 

245 

246 return False