Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%

462 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1"""A WSGI and HTTP server for use **during development only**. This 

2server is convenient to use, but is not designed to be particularly 

3stable, secure, or efficient. Use a dedicate WSGI server and HTTP 

4server when deploying to production. 

5 

6It provides features like interactive debugging and code reloading. Use 

7``run_simple`` to start the server. Put this in a ``run.py`` script: 

8 

9.. code-block:: python 

10 

11 from myapp import create_app 

12 from werkzeug import run_simple 

13""" 

14import errno 

15import io 

16import os 

17import socket 

18import socketserver 

19import sys 

20import typing as t 

21from datetime import datetime as dt 

22from datetime import timedelta 

23from datetime import timezone 

24from http.server import BaseHTTPRequestHandler 

25from http.server import HTTPServer 

26 

27from ._internal import _log 

28from ._internal import _wsgi_encoding_dance 

29from .exceptions import InternalServerError 

30from .urls import uri_to_iri 

31from .urls import url_parse 

32from .urls import url_unquote 

33 

34try: 

35 import ssl 

36except ImportError: 

37 

38 class _SslDummy: 

39 def __getattr__(self, name: str) -> t.Any: 

40 raise RuntimeError( # noqa: B904 

41 "SSL is unavailable because this Python runtime was not" 

42 " compiled with SSL/TLS support." 

43 ) 

44 

45 ssl = _SslDummy() # type: ignore 

46 

47_log_add_style = True 

48 

49if os.name == "nt": 

50 try: 

51 __import__("colorama") 

52 except ImportError: 

53 _log_add_style = False 

54 

55can_fork = hasattr(os, "fork") 

56 

57if can_fork: 

58 ForkingMixIn = socketserver.ForkingMixIn 

59else: 

60 

61 class ForkingMixIn: # type: ignore 

62 pass 

63 

64 

65try: 

66 af_unix = socket.AF_UNIX 

67except AttributeError: 

68 af_unix = None # type: ignore 

69 

70LISTEN_QUEUE = 128 

71 

72_TSSLContextArg = t.Optional[ 

73 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], "te.Literal['adhoc']"] 

74] 

75 

76if t.TYPE_CHECKING: 

77 import typing_extensions as te # noqa: F401 

78 from _typeshed.wsgi import WSGIApplication 

79 from _typeshed.wsgi import WSGIEnvironment 

80 from cryptography.hazmat.primitives.asymmetric.rsa import ( 

81 RSAPrivateKeyWithSerialization, 

82 ) 

83 from cryptography.x509 import Certificate 

84 

85 

86class DechunkedInput(io.RawIOBase): 

87 """An input stream that handles Transfer-Encoding 'chunked'""" 

88 

89 def __init__(self, rfile: t.IO[bytes]) -> None: 

90 self._rfile = rfile 

91 self._done = False 

92 self._len = 0 

93 

94 def readable(self) -> bool: 

95 return True 

96 

97 def read_chunk_len(self) -> int: 

98 try: 

99 line = self._rfile.readline().decode("latin1") 

100 _len = int(line.strip(), 16) 

101 except ValueError as e: 

102 raise OSError("Invalid chunk header") from e 

103 if _len < 0: 

104 raise OSError("Negative chunk length not allowed") 

105 return _len 

106 

107 def readinto(self, buf: bytearray) -> int: # type: ignore 

108 read = 0 

109 while not self._done and read < len(buf): 

110 if self._len == 0: 

111 # This is the first chunk or we fully consumed the previous 

112 # one. Read the next length of the next chunk 

113 self._len = self.read_chunk_len() 

114 

115 if self._len == 0: 

116 # Found the final chunk of size 0. The stream is now exhausted, 

117 # but there is still a final newline that should be consumed 

118 self._done = True 

119 

120 if self._len > 0: 

121 # There is data (left) in this chunk, so append it to the 

122 # buffer. If this operation fully consumes the chunk, this will 

123 # reset self._len to 0. 

124 n = min(len(buf), self._len) 

125 

126 # If (read + chunk size) becomes more than len(buf), buf will 

127 # grow beyond the original size and read more data than 

128 # required. So only read as much data as can fit in buf. 

129 if read + n > len(buf): 

130 buf[read:] = self._rfile.read(len(buf) - read) 

131 self._len -= len(buf) - read 

132 read = len(buf) 

133 else: 

134 buf[read : read + n] = self._rfile.read(n) 

135 self._len -= n 

136 read += n 

137 

138 if self._len == 0: 

139 # Skip the terminating newline of a chunk that has been fully 

140 # consumed. This also applies to the 0-sized final chunk 

141 terminator = self._rfile.readline() 

142 if terminator not in (b"\n", b"\r\n", b"\r"): 

143 raise OSError("Missing chunk terminating newline") 

144 

145 return read 

146 

147 

148class WSGIRequestHandler(BaseHTTPRequestHandler): 

149 """A request handler that implements WSGI dispatching.""" 

150 

151 server: "BaseWSGIServer" 

152 

153 @property 

154 def server_version(self) -> str: # type: ignore 

155 from . import __version__ 

156 

157 return f"Werkzeug/{__version__}" 

158 

159 def make_environ(self) -> "WSGIEnvironment": 

160 request_url = url_parse(self.path) 

161 url_scheme = "http" if self.server.ssl_context is None else "https" 

162 

163 if not self.client_address: 

164 self.client_address = ("<local>", 0) 

165 elif isinstance(self.client_address, str): 

166 self.client_address = (self.client_address, 0) 

167 

168 # If there was no scheme but the path started with two slashes, 

169 # the first segment may have been incorrectly parsed as the 

170 # netloc, prepend it to the path again. 

171 if not request_url.scheme and request_url.netloc: 

172 path_info = f"/{request_url.netloc}{request_url.path}" 

173 else: 

174 path_info = request_url.path 

175 

176 path_info = url_unquote(path_info) 

177 

178 environ: "WSGIEnvironment" = { 

179 "wsgi.version": (1, 0), 

180 "wsgi.url_scheme": url_scheme, 

181 "wsgi.input": self.rfile, 

182 "wsgi.errors": sys.stderr, 

183 "wsgi.multithread": self.server.multithread, 

184 "wsgi.multiprocess": self.server.multiprocess, 

185 "wsgi.run_once": False, 

186 "werkzeug.socket": self.connection, 

187 "SERVER_SOFTWARE": self.server_version, 

188 "REQUEST_METHOD": self.command, 

189 "SCRIPT_NAME": "", 

190 "PATH_INFO": _wsgi_encoding_dance(path_info), 

191 "QUERY_STRING": _wsgi_encoding_dance(request_url.query), 

192 # Non-standard, added by mod_wsgi, uWSGI 

193 "REQUEST_URI": _wsgi_encoding_dance(self.path), 

194 # Non-standard, added by gunicorn 

195 "RAW_URI": _wsgi_encoding_dance(self.path), 

196 "REMOTE_ADDR": self.address_string(), 

197 "REMOTE_PORT": self.port_integer(), 

198 "SERVER_NAME": self.server.server_address[0], 

199 "SERVER_PORT": str(self.server.server_address[1]), 

200 "SERVER_PROTOCOL": self.request_version, 

201 } 

202 

203 for key, value in self.headers.items(): 

204 key = key.upper().replace("-", "_") 

205 value = value.replace("\r\n", "") 

206 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): 

207 key = f"HTTP_{key}" 

208 if key in environ: 

209 value = f"{environ[key]},{value}" 

210 environ[key] = value 

211 

212 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked": 

213 environ["wsgi.input_terminated"] = True 

214 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"]) 

215 

216 # Per RFC 2616, if the URL is absolute, use that as the host. 

217 # We're using "has a scheme" to indicate an absolute URL. 

218 if request_url.scheme and request_url.netloc: 

219 environ["HTTP_HOST"] = request_url.netloc 

220 

221 try: 

222 # binary_form=False gives nicer information, but wouldn't be compatible with 

223 # what Nginx or Apache could return. 

224 peer_cert = self.connection.getpeercert(binary_form=True) 

225 if peer_cert is not None: 

226 # Nginx and Apache use PEM format. 

227 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert) 

228 except ValueError: 

229 # SSL handshake hasn't finished. 

230 self.server.log("error", "Cannot fetch SSL peer certificate info") 

231 except AttributeError: 

232 # Not using TLS, the socket will not have getpeercert(). 

233 pass 

234 

235 return environ 

236 

237 def run_wsgi(self) -> None: 

238 if self.headers.get("Expect", "").lower().strip() == "100-continue": 

239 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

240 

241 self.environ = environ = self.make_environ() 

242 status_set: t.Optional[str] = None 

243 headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None 

244 status_sent: t.Optional[str] = None 

245 headers_sent: t.Optional[t.List[t.Tuple[str, str]]] = None 

246 chunk_response: bool = False 

247 

248 def write(data: bytes) -> None: 

249 nonlocal status_sent, headers_sent, chunk_response 

250 assert status_set is not None, "write() before start_response" 

251 assert headers_set is not None, "write() before start_response" 

252 if status_sent is None: 

253 status_sent = status_set 

254 headers_sent = headers_set 

255 try: 

256 code_str, msg = status_sent.split(None, 1) 

257 except ValueError: 

258 code_str, msg = status_sent, "" 

259 code = int(code_str) 

260 self.send_response(code, msg) 

261 header_keys = set() 

262 for key, value in headers_sent: 

263 self.send_header(key, value) 

264 header_keys.add(key.lower()) 

265 

266 # Use chunked transfer encoding if there is no content 

267 # length. Do not use for 1xx and 204 responses. 304 

268 # responses and HEAD requests are also excluded, which 

269 # is the more conservative behavior and matches other 

270 # parts of the code. 

271 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1 

272 if ( 

273 not ( 

274 "content-length" in header_keys 

275 or environ["REQUEST_METHOD"] == "HEAD" 

276 or (100 <= code < 200) 

277 or code in {204, 304} 

278 ) 

279 and self.protocol_version >= "HTTP/1.1" 

280 ): 

281 chunk_response = True 

282 self.send_header("Transfer-Encoding", "chunked") 

283 

284 # Always close the connection. This disables HTTP/1.1 

285 # keep-alive connections. They aren't handled well by 

286 # Python's http.server because it doesn't know how to 

287 # drain the stream before the next request line. 

288 self.send_header("Connection", "close") 

289 self.end_headers() 

290 

291 assert isinstance(data, bytes), "applications must write bytes" 

292 

293 if data: 

294 if chunk_response: 

295 self.wfile.write(hex(len(data))[2:].encode()) 

296 self.wfile.write(b"\r\n") 

297 

298 self.wfile.write(data) 

299 

300 if chunk_response: 

301 self.wfile.write(b"\r\n") 

302 

303 self.wfile.flush() 

304 

305 def start_response(status, headers, exc_info=None): # type: ignore 

306 nonlocal status_set, headers_set 

307 if exc_info: 

308 try: 

309 if headers_sent: 

310 raise exc_info[1].with_traceback(exc_info[2]) 

311 finally: 

312 exc_info = None 

313 elif headers_set: 

314 raise AssertionError("Headers already set") 

315 status_set = status 

316 headers_set = headers 

317 return write 

318 

319 def execute(app: "WSGIApplication") -> None: 

320 application_iter = app(environ, start_response) 

321 try: 

322 for data in application_iter: 

323 write(data) 

324 if not headers_sent: 

325 write(b"") 

326 if chunk_response: 

327 self.wfile.write(b"0\r\n\r\n") 

328 finally: 

329 if hasattr(application_iter, "close"): 

330 application_iter.close() 

331 

332 try: 

333 execute(self.server.app) 

334 except (ConnectionError, socket.timeout) as e: 

335 self.connection_dropped(e, environ) 

336 except Exception as e: 

337 if self.server.passthrough_errors: 

338 raise 

339 

340 if status_sent is not None and chunk_response: 

341 self.close_connection = True 

342 

343 try: 

344 # if we haven't yet sent the headers but they are set 

345 # we roll back to be able to set them again. 

346 if status_sent is None: 

347 status_set = None 

348 headers_set = None 

349 execute(InternalServerError()) 

350 except Exception: 

351 pass 

352 

353 from .debug.tbtools import DebugTraceback 

354 

355 msg = DebugTraceback(e).render_traceback_text() 

356 self.server.log("error", f"Error on request:\n{msg}") 

357 

358 def handle(self) -> None: 

359 """Handles a request ignoring dropped connections.""" 

360 try: 

361 super().handle() 

362 except (ConnectionError, socket.timeout) as e: 

363 self.connection_dropped(e) 

364 except Exception as e: 

365 if self.server.ssl_context is not None and is_ssl_error(e): 

366 self.log_error("SSL error occurred: %s", e) 

367 else: 

368 raise 

369 

370 def connection_dropped( 

371 self, error: BaseException, environ: t.Optional["WSGIEnvironment"] = None 

372 ) -> None: 

373 """Called if the connection was closed by the client. By default 

374 nothing happens. 

375 """ 

376 

377 def __getattr__(self, name: str) -> t.Any: 

378 # All HTTP methods are handled by run_wsgi. 

379 if name.startswith("do_"): 

380 return self.run_wsgi 

381 

382 # All other attributes are forwarded to the base class. 

383 return getattr(super(), name) 

384 

385 def address_string(self) -> str: 

386 if getattr(self, "environ", None): 

387 return self.environ["REMOTE_ADDR"] # type: ignore 

388 

389 if not self.client_address: 

390 return "<local>" 

391 

392 return self.client_address[0] 

393 

394 def port_integer(self) -> int: 

395 return self.client_address[1] 

396 

397 def log_request( 

398 self, code: t.Union[int, str] = "-", size: t.Union[int, str] = "-" 

399 ) -> None: 

400 try: 

401 path = uri_to_iri(self.path) 

402 msg = f"{self.command} {path} {self.request_version}" 

403 except AttributeError: 

404 # path isn't set if the requestline was bad 

405 msg = self.requestline 

406 

407 code = str(code) 

408 

409 if code[0] == "1": # 1xx - Informational 

410 msg = _ansi_style(msg, "bold") 

411 elif code == "200": # 2xx - Success 

412 pass 

413 elif code == "304": # 304 - Resource Not Modified 

414 msg = _ansi_style(msg, "cyan") 

415 elif code[0] == "3": # 3xx - Redirection 

416 msg = _ansi_style(msg, "green") 

417 elif code == "404": # 404 - Resource Not Found 

418 msg = _ansi_style(msg, "yellow") 

419 elif code[0] == "4": # 4xx - Client Error 

420 msg = _ansi_style(msg, "bold", "red") 

421 else: # 5xx, or any other response 

422 msg = _ansi_style(msg, "bold", "magenta") 

423 

424 self.log("info", '"%s" %s %s', msg, code, size) 

425 

426 def log_error(self, format: str, *args: t.Any) -> None: 

427 self.log("error", format, *args) 

428 

429 def log_message(self, format: str, *args: t.Any) -> None: 

430 self.log("info", format, *args) 

431 

432 def log(self, type: str, message: str, *args: t.Any) -> None: 

433 _log( 

434 type, 

435 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n", 

436 *args, 

437 ) 

438 

439 

440def _ansi_style(value: str, *styles: str) -> str: 

441 if not _log_add_style: 

442 return value 

443 

444 codes = { 

445 "bold": 1, 

446 "red": 31, 

447 "green": 32, 

448 "yellow": 33, 

449 "magenta": 35, 

450 "cyan": 36, 

451 } 

452 

453 for style in styles: 

454 value = f"\x1b[{codes[style]}m{value}" 

455 

456 return f"{value}\x1b[0m" 

457 

458 

459def generate_adhoc_ssl_pair( 

460 cn: t.Optional[str] = None, 

461) -> t.Tuple["Certificate", "RSAPrivateKeyWithSerialization"]: 

462 try: 

463 from cryptography import x509 

464 from cryptography.x509.oid import NameOID 

465 from cryptography.hazmat.backends import default_backend 

466 from cryptography.hazmat.primitives import hashes 

467 from cryptography.hazmat.primitives.asymmetric import rsa 

468 except ImportError: 

469 raise TypeError( 

470 "Using ad-hoc certificates requires the cryptography library." 

471 ) from None 

472 

473 backend = default_backend() 

474 pkey = rsa.generate_private_key( 

475 public_exponent=65537, key_size=2048, backend=backend 

476 ) 

477 

478 # pretty damn sure that this is not actually accepted by anyone 

479 if cn is None: 

480 cn = "*" 

481 

482 subject = x509.Name( 

483 [ 

484 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"), 

485 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

486 ] 

487 ) 

488 

489 backend = default_backend() 

490 cert = ( 

491 x509.CertificateBuilder() 

492 .subject_name(subject) 

493 .issuer_name(subject) 

494 .public_key(pkey.public_key()) 

495 .serial_number(x509.random_serial_number()) 

496 .not_valid_before(dt.now(timezone.utc)) 

497 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365)) 

498 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False) 

499 .add_extension(x509.SubjectAlternativeName([x509.DNSName(cn)]), critical=False) 

500 .sign(pkey, hashes.SHA256(), backend) 

501 ) 

502 return cert, pkey 

503 

504 

505def make_ssl_devcert( 

506 base_path: str, host: t.Optional[str] = None, cn: t.Optional[str] = None 

507) -> t.Tuple[str, str]: 

508 """Creates an SSL key for development. This should be used instead of 

509 the ``'adhoc'`` key which generates a new cert on each server start. 

510 It accepts a path for where it should store the key and cert and 

511 either a host or CN. If a host is given it will use the CN 

512 ``*.host/CN=host``. 

513 

514 For more information see :func:`run_simple`. 

515 

516 .. versionadded:: 0.9 

517 

518 :param base_path: the path to the certificate and key. The extension 

519 ``.crt`` is added for the certificate, ``.key`` is 

520 added for the key. 

521 :param host: the name of the host. This can be used as an alternative 

522 for the `cn`. 

523 :param cn: the `CN` to use. 

524 """ 

525 

526 if host is not None: 

527 cn = f"*.{host}/CN={host}" 

528 cert, pkey = generate_adhoc_ssl_pair(cn=cn) 

529 

530 from cryptography.hazmat.primitives import serialization 

531 

532 cert_file = f"{base_path}.crt" 

533 pkey_file = f"{base_path}.key" 

534 

535 with open(cert_file, "wb") as f: 

536 f.write(cert.public_bytes(serialization.Encoding.PEM)) 

537 with open(pkey_file, "wb") as f: 

538 f.write( 

539 pkey.private_bytes( 

540 encoding=serialization.Encoding.PEM, 

541 format=serialization.PrivateFormat.TraditionalOpenSSL, 

542 encryption_algorithm=serialization.NoEncryption(), 

543 ) 

544 ) 

545 

546 return cert_file, pkey_file 

547 

548 

549def generate_adhoc_ssl_context() -> "ssl.SSLContext": 

550 """Generates an adhoc SSL context for the development server.""" 

551 import tempfile 

552 import atexit 

553 

554 cert, pkey = generate_adhoc_ssl_pair() 

555 

556 from cryptography.hazmat.primitives import serialization 

557 

558 cert_handle, cert_file = tempfile.mkstemp() 

559 pkey_handle, pkey_file = tempfile.mkstemp() 

560 atexit.register(os.remove, pkey_file) 

561 atexit.register(os.remove, cert_file) 

562 

563 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM)) 

564 os.write( 

565 pkey_handle, 

566 pkey.private_bytes( 

567 encoding=serialization.Encoding.PEM, 

568 format=serialization.PrivateFormat.TraditionalOpenSSL, 

569 encryption_algorithm=serialization.NoEncryption(), 

570 ), 

571 ) 

572 

573 os.close(cert_handle) 

574 os.close(pkey_handle) 

575 ctx = load_ssl_context(cert_file, pkey_file) 

576 return ctx 

577 

578 

579def load_ssl_context( 

580 cert_file: str, pkey_file: t.Optional[str] = None, protocol: t.Optional[int] = None 

581) -> "ssl.SSLContext": 

582 """Loads SSL context from cert/private key files and optional protocol. 

583 Many parameters are directly taken from the API of 

584 :py:class:`ssl.SSLContext`. 

585 

586 :param cert_file: Path of the certificate to use. 

587 :param pkey_file: Path of the private key to use. If not given, the key 

588 will be obtained from the certificate file. 

589 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module. 

590 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`. 

591 """ 

592 if protocol is None: 

593 protocol = ssl.PROTOCOL_TLS_SERVER 

594 

595 ctx = ssl.SSLContext(protocol) 

596 ctx.load_cert_chain(cert_file, pkey_file) 

597 return ctx 

598 

599 

600def is_ssl_error(error: t.Optional[Exception] = None) -> bool: 

601 """Checks if the given error (or the current one) is an SSL error.""" 

602 if error is None: 

603 error = t.cast(Exception, sys.exc_info()[1]) 

604 return isinstance(error, ssl.SSLError) 

605 

606 

607def select_address_family(host: str, port: int) -> socket.AddressFamily: 

608 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on 

609 the host and port.""" 

610 if host.startswith("unix://"): 

611 return socket.AF_UNIX 

612 elif ":" in host and hasattr(socket, "AF_INET6"): 

613 return socket.AF_INET6 

614 return socket.AF_INET 

615 

616 

617def get_sockaddr( 

618 host: str, port: int, family: socket.AddressFamily 

619) -> t.Union[t.Tuple[str, int], str]: 

620 """Return a fully qualified socket address that can be passed to 

621 :func:`socket.bind`.""" 

622 if family == af_unix: 

623 return host.split("://", 1)[1] 

624 try: 

625 res = socket.getaddrinfo( 

626 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP 

627 ) 

628 except socket.gaierror: 

629 return host, port 

630 return res[0][4] # type: ignore 

631 

632 

633def get_interface_ip(family: socket.AddressFamily) -> str: 

634 """Get the IP address of an external interface. Used when binding to 

635 0.0.0.0 or ::1 to show a more useful URL. 

636 

637 :meta private: 

638 """ 

639 # arbitrary private address 

640 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219" 

641 

642 with socket.socket(family, socket.SOCK_DGRAM) as s: 

643 try: 

644 s.connect((host, 58162)) 

645 except OSError: 

646 return "::1" if family == socket.AF_INET6 else "127.0.0.1" 

647 

648 return s.getsockname()[0] # type: ignore 

649 

650 

651class BaseWSGIServer(HTTPServer): 

652 """A WSGI server that that handles one request at a time. 

653 

654 Use :func:`make_server` to create a server instance. 

655 """ 

656 

657 multithread = False 

658 multiprocess = False 

659 request_queue_size = LISTEN_QUEUE 

660 allow_reuse_address = True 

661 

662 def __init__( 

663 self, 

664 host: str, 

665 port: int, 

666 app: "WSGIApplication", 

667 handler: t.Optional[t.Type[WSGIRequestHandler]] = None, 

668 passthrough_errors: bool = False, 

669 ssl_context: t.Optional[_TSSLContextArg] = None, 

670 fd: t.Optional[int] = None, 

671 ) -> None: 

672 if handler is None: 

673 handler = WSGIRequestHandler 

674 

675 # If the handler doesn't directly set a protocol version and 

676 # thread or process workers are used, then allow chunked 

677 # responses and keep-alive connections by enabling HTTP/1.1. 

678 if "protocol_version" not in vars(handler) and ( 

679 self.multithread or self.multiprocess 

680 ): 

681 handler.protocol_version = "HTTP/1.1" 

682 

683 self.host = host 

684 self.port = port 

685 self.app = app 

686 self.passthrough_errors = passthrough_errors 

687 

688 self.address_family = address_family = select_address_family(host, port) 

689 server_address = get_sockaddr(host, int(port), address_family) 

690 

691 # Remove a leftover Unix socket file from a previous run. Don't 

692 # remove a file that was set up by run_simple. 

693 if address_family == af_unix and fd is None: 

694 server_address = t.cast(str, server_address) 

695 

696 if os.path.exists(server_address): 

697 os.unlink(server_address) 

698 

699 # Bind and activate will be handled manually, it should only 

700 # happen if we're not using a socket that was already set up. 

701 super().__init__( 

702 server_address, # type: ignore[arg-type] 

703 handler, 

704 bind_and_activate=False, 

705 ) 

706 

707 if fd is None: 

708 # No existing socket descriptor, do bind_and_activate=True. 

709 try: 

710 self.server_bind() 

711 self.server_activate() 

712 except OSError as e: 

713 # Catch connection issues and show them without the traceback. Show 

714 # extra instructions for address not found, and for macOS. 

715 self.server_close() 

716 print(e.strerror, file=sys.stderr) 

717 

718 if e.errno == errno.EADDRINUSE: 

719 print( 

720 f"Port {port} is in use by another program. Either identify and" 

721 " stop that program, or start the server with a different" 

722 " port.", 

723 file=sys.stderr, 

724 ) 

725 

726 if sys.platform == "darwin" and port == 5000: 

727 print( 

728 "On macOS, try disabling the 'AirPlay Receiver' service" 

729 " from System Preferences -> Sharing.", 

730 file=sys.stderr, 

731 ) 

732 

733 sys.exit(1) 

734 except BaseException: 

735 self.server_close() 

736 raise 

737 else: 

738 # TCPServer automatically opens a socket even if bind_and_activate is False. 

739 # Close it to silence a ResourceWarning. 

740 self.server_close() 

741 

742 # Use the passed in socket directly. 

743 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM) 

744 self.server_address = self.socket.getsockname() 

745 

746 if address_family != af_unix: 

747 # If port was 0, this will record the bound port. 

748 self.port = self.server_address[1] 

749 

750 if ssl_context is not None: 

751 if isinstance(ssl_context, tuple): 

752 ssl_context = load_ssl_context(*ssl_context) 

753 elif ssl_context == "adhoc": 

754 ssl_context = generate_adhoc_ssl_context() 

755 

756 self.socket = ssl_context.wrap_socket(self.socket, server_side=True) 

757 self.ssl_context: t.Optional["ssl.SSLContext"] = ssl_context 

758 else: 

759 self.ssl_context = None 

760 

761 def log(self, type: str, message: str, *args: t.Any) -> None: 

762 _log(type, message, *args) 

763 

764 def serve_forever(self, poll_interval: float = 0.5) -> None: 

765 try: 

766 super().serve_forever(poll_interval=poll_interval) 

767 except KeyboardInterrupt: 

768 pass 

769 finally: 

770 self.server_close() 

771 

772 def handle_error( 

773 self, request: t.Any, client_address: t.Union[t.Tuple[str, int], str] 

774 ) -> None: 

775 if self.passthrough_errors: 

776 raise 

777 

778 return super().handle_error(request, client_address) 

779 

780 def log_startup(self) -> None: 

781 """Show information about the address when starting the server.""" 

782 dev_warning = ( 

783 "WARNING: This is a development server. Do not use it in a production" 

784 " deployment. Use a production WSGI server instead." 

785 ) 

786 dev_warning = _ansi_style(dev_warning, "bold", "red") 

787 messages = [dev_warning] 

788 

789 if self.address_family == af_unix: 

790 messages.append(f" * Running on {self.host}") 

791 else: 

792 scheme = "http" if self.ssl_context is None else "https" 

793 display_hostname = self.host 

794 

795 if self.host in {"0.0.0.0", "::"}: 

796 messages.append(f" * Running on all addresses ({self.host})") 

797 

798 if self.host == "0.0.0.0": 

799 localhost = "127.0.0.1" 

800 display_hostname = get_interface_ip(socket.AF_INET) 

801 else: 

802 localhost = "[::1]" 

803 display_hostname = get_interface_ip(socket.AF_INET6) 

804 

805 messages.append(f" * Running on {scheme}://{localhost}:{self.port}") 

806 

807 if ":" in display_hostname: 

808 display_hostname = f"[{display_hostname}]" 

809 

810 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}") 

811 

812 _log("info", "\n".join(messages)) 

813 

814 

815class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer): 

816 """A WSGI server that handles concurrent requests in separate 

817 threads. 

818 

819 Use :func:`make_server` to create a server instance. 

820 """ 

821 

822 multithread = True 

823 daemon_threads = True 

824 

825 

826class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer): 

827 """A WSGI server that handles concurrent requests in separate forked 

828 processes. 

829 

830 Use :func:`make_server` to create a server instance. 

831 """ 

832 

833 multiprocess = True 

834 

835 def __init__( 

836 self, 

837 host: str, 

838 port: int, 

839 app: "WSGIApplication", 

840 processes: int = 40, 

841 handler: t.Optional[t.Type[WSGIRequestHandler]] = None, 

842 passthrough_errors: bool = False, 

843 ssl_context: t.Optional[_TSSLContextArg] = None, 

844 fd: t.Optional[int] = None, 

845 ) -> None: 

846 if not can_fork: 

847 raise ValueError("Your platform does not support forking.") 

848 

849 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd) 

850 self.max_children = processes 

851 

852 

853def make_server( 

854 host: str, 

855 port: int, 

856 app: "WSGIApplication", 

857 threaded: bool = False, 

858 processes: int = 1, 

859 request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None, 

860 passthrough_errors: bool = False, 

861 ssl_context: t.Optional[_TSSLContextArg] = None, 

862 fd: t.Optional[int] = None, 

863) -> BaseWSGIServer: 

864 """Create an appropriate WSGI server instance based on the value of 

865 ``threaded`` and ``processes``. 

866 

867 This is called from :func:`run_simple`, but can be used separately 

868 to have access to the server object, such as to run it in a separate 

869 thread. 

870 

871 See :func:`run_simple` for parameter docs. 

872 """ 

873 if threaded and processes > 1: 

874 raise ValueError("Cannot have a multi-thread and multi-process server.") 

875 

876 if threaded: 

877 return ThreadedWSGIServer( 

878 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

879 ) 

880 

881 if processes > 1: 

882 return ForkingWSGIServer( 

883 host, 

884 port, 

885 app, 

886 processes, 

887 request_handler, 

888 passthrough_errors, 

889 ssl_context, 

890 fd=fd, 

891 ) 

892 

893 return BaseWSGIServer( 

894 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

895 ) 

896 

897 

898def is_running_from_reloader() -> bool: 

899 """Check if the server is running as a subprocess within the 

900 Werkzeug reloader. 

901 

902 .. versionadded:: 0.10 

903 """ 

904 return os.environ.get("WERKZEUG_RUN_MAIN") == "true" 

905 

906 

907def run_simple( 

908 hostname: str, 

909 port: int, 

910 application: "WSGIApplication", 

911 use_reloader: bool = False, 

912 use_debugger: bool = False, 

913 use_evalex: bool = True, 

914 extra_files: t.Optional[t.Iterable[str]] = None, 

915 exclude_patterns: t.Optional[t.Iterable[str]] = None, 

916 reloader_interval: int = 1, 

917 reloader_type: str = "auto", 

918 threaded: bool = False, 

919 processes: int = 1, 

920 request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None, 

921 static_files: t.Optional[t.Dict[str, t.Union[str, t.Tuple[str, str]]]] = None, 

922 passthrough_errors: bool = False, 

923 ssl_context: t.Optional[_TSSLContextArg] = None, 

924) -> None: 

925 """Start a development server for a WSGI application. Various 

926 optional features can be enabled. 

927 

928 .. warning:: 

929 

930 Do not use the development server when deploying to production. 

931 It is intended for use only during local development. It is not 

932 designed to be particularly efficient, stable, or secure. 

933 

934 :param hostname: The host to bind to, for example ``'localhost'``. 

935 Can be a domain, IPv4 or IPv6 address, or file path starting 

936 with ``unix://`` for a Unix socket. 

937 :param port: The port to bind to, for example ``8080``. Using ``0`` 

938 tells the OS to pick a random free port. 

939 :param application: The WSGI application to run. 

940 :param use_reloader: Use a reloader process to restart the server 

941 process when files are changed. 

942 :param use_debugger: Use Werkzeug's debugger, which will show 

943 formatted tracebacks on unhandled exceptions. 

944 :param use_evalex: Make the debugger interactive. A Python terminal 

945 can be opened for any frame in the traceback. Some protection is 

946 provided by requiring a PIN, but this should never be enabled 

947 on a publicly visible server. 

948 :param extra_files: The reloader will watch these files for changes 

949 in addition to Python modules. For example, watch a 

950 configuration file. 

951 :param exclude_patterns: The reloader will ignore changes to any 

952 files matching these :mod:`fnmatch` patterns. For example, 

953 ignore cache files. 

954 :param reloader_interval: How often the reloader tries to check for 

955 changes. 

956 :param reloader_type: The reloader to use. The ``'stat'`` reloader 

957 is built in, but may require significant CPU to watch files. The 

958 ``'watchdog'`` reloader is much more efficient but requires 

959 installing the ``watchdog`` package first. 

960 :param threaded: Handle concurrent requests using threads. Cannot be 

961 used with ``processes``. 

962 :param processes: Handle concurrent requests using up to this number 

963 of processes. Cannot be used with ``threaded``. 

964 :param request_handler: Use a different 

965 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to 

966 handle requests. 

967 :param static_files: A dict mapping URL prefixes to directories to 

968 serve static files from using 

969 :class:`~werkzeug.middleware.SharedDataMiddleware`. 

970 :param passthrough_errors: Don't catch unhandled exceptions at the 

971 server level, let the serve crash instead. If ``use_debugger`` 

972 is enabled, the debugger will still catch such errors. 

973 :param ssl_context: Configure TLS to serve over HTTPS. Can be an 

974 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)`` 

975 tuple to create a typical context, or the string ``'adhoc'`` to 

976 generate a temporary self-signed certificate. 

977 

978 .. versionchanged:: 2.1 

979 Instructions are shown for dealing with an "address already in 

980 use" error. 

981 

982 .. versionchanged:: 2.1 

983 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in 

984 addition to a real IP. 

985 

986 .. versionchanged:: 2.1 

987 The command-line interface was removed. 

988 

989 .. versionchanged:: 2.0 

990 Running on ``0.0.0.0`` or ``::`` shows a real IP address that 

991 was bound as well as a warning not to run the development server 

992 in production. 

993 

994 .. versionchanged:: 2.0 

995 The ``exclude_patterns`` parameter was added. 

996 

997 .. versionchanged:: 0.15 

998 Bind to a Unix socket by passing a ``hostname`` that starts with 

999 ``unix://``. 

1000 

1001 .. versionchanged:: 0.10 

1002 Improved the reloader and added support for changing the backend 

1003 through the ``reloader_type`` parameter. 

1004 

1005 .. versionchanged:: 0.9 

1006 A command-line interface was added. 

1007 

1008 .. versionchanged:: 0.8 

1009 ``ssl_context`` can be a tuple of paths to the certificate and 

1010 private key files. 

1011 

1012 .. versionchanged:: 0.6 

1013 The ``ssl_context`` parameter was added. 

1014 

1015 .. versionchanged:: 0.5 

1016 The ``static_files`` and ``passthrough_errors`` parameters were 

1017 added. 

1018 """ 

1019 if not isinstance(port, int): 

1020 raise TypeError("port must be an integer") 

1021 

1022 if static_files: 

1023 from .middleware.shared_data import SharedDataMiddleware 

1024 

1025 application = SharedDataMiddleware(application, static_files) 

1026 

1027 if use_debugger: 

1028 from .debug import DebuggedApplication 

1029 

1030 application = DebuggedApplication(application, evalex=use_evalex) 

1031 

1032 if not is_running_from_reloader(): 

1033 fd = None 

1034 else: 

1035 fd = int(os.environ["WERKZEUG_SERVER_FD"]) 

1036 

1037 srv = make_server( 

1038 hostname, 

1039 port, 

1040 application, 

1041 threaded, 

1042 processes, 

1043 request_handler, 

1044 passthrough_errors, 

1045 ssl_context, 

1046 fd=fd, 

1047 ) 

1048 srv.socket.set_inheritable(True) 

1049 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno()) 

1050 

1051 if not is_running_from_reloader(): 

1052 srv.log_startup() 

1053 _log("info", _ansi_style("Press CTRL+C to quit", "yellow")) 

1054 

1055 if use_reloader: 

1056 from ._reloader import run_with_reloader 

1057 

1058 try: 

1059 run_with_reloader( 

1060 srv.serve_forever, 

1061 extra_files=extra_files, 

1062 exclude_patterns=exclude_patterns, 

1063 interval=reloader_interval, 

1064 reloader_type=reloader_type, 

1065 ) 

1066 finally: 

1067 srv.server_close() 

1068 else: 

1069 srv.serve_forever()