Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

481 statements  

1"""A WSGI and HTTP server for use **during development only**. This 

2server is convenient to use, but is not designed to be particularly 

3stable, secure, or efficient. Use a dedicate WSGI server and HTTP 

4server when deploying to production. 

5 

6It provides features like interactive debugging and code reloading. Use 

7``run_simple`` to start the server. Put this in a ``run.py`` script: 

8 

9.. code-block:: python 

10 

11 from myapp import create_app 

12 from werkzeug import run_simple 

13""" 

14 

15from __future__ import annotations 

16 

17import errno 

18import io 

19import os 

20import selectors 

21import socket 

22import socketserver 

23import sys 

24import typing as t 

25from datetime import datetime as dt 

26from datetime import timedelta 

27from datetime import timezone 

28from http.server import BaseHTTPRequestHandler 

29from http.server import HTTPServer 

30from urllib.parse import unquote 

31from urllib.parse import urlsplit 

32 

33from ._internal import _log 

34from ._internal import _wsgi_encoding_dance 

35from .exceptions import InternalServerError 

36from .urls import uri_to_iri 

37 

38try: 

39 import ssl 

40except ImportError: 

41 

42 class _SslDummy: 

43 def __getattr__(self, name: str) -> t.Any: 

44 raise RuntimeError( # noqa: B904 

45 "SSL is unavailable because this Python runtime was not" 

46 " compiled with SSL/TLS support." 

47 ) 

48 

49 ssl = _SslDummy() # type: ignore 

50 

51_log_add_style = True 

52 

53if os.name == "nt": 

54 try: 

55 __import__("colorama") 

56 except ImportError: 

57 _log_add_style = False 

58 

59can_fork = hasattr(os, "fork") 

60 

61if can_fork: 

62 ForkingMixIn = socketserver.ForkingMixIn 

63else: 

64 

65 class ForkingMixIn: # type: ignore 

66 pass 

67 

68 

69try: 

70 af_unix = socket.AF_UNIX 

71except AttributeError: 

72 af_unix = None # type: ignore 

73 

74LISTEN_QUEUE = 128 

75 

76_TSSLContextArg = t.Optional[ 

77 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], t.Literal["adhoc"]] 

78] 

79 

80if t.TYPE_CHECKING: 

81 from _typeshed.wsgi import WSGIApplication 

82 from _typeshed.wsgi import WSGIEnvironment 

83 from cryptography.hazmat.primitives.asymmetric.rsa import ( 

84 RSAPrivateKeyWithSerialization, 

85 ) 

86 from cryptography.x509 import Certificate 

87 

88 

89class DechunkedInput(io.RawIOBase): 

90 """An input stream that handles Transfer-Encoding 'chunked'""" 

91 

92 def __init__(self, rfile: t.IO[bytes]) -> None: 

93 self._rfile = rfile 

94 self._done = False 

95 self._len = 0 

96 

97 def readable(self) -> bool: 

98 return True 

99 

100 def read_chunk_len(self) -> int: 

101 try: 

102 line = self._rfile.readline().decode("latin1") 

103 _len = int(line.strip(), 16) 

104 except ValueError as e: 

105 raise OSError("Invalid chunk header") from e 

106 if _len < 0: 

107 raise OSError("Negative chunk length not allowed") 

108 return _len 

109 

110 def readinto(self, buf: bytearray) -> int: # type: ignore 

111 read = 0 

112 while not self._done and read < len(buf): 

113 if self._len == 0: 

114 # This is the first chunk or we fully consumed the previous 

115 # one. Read the next length of the next chunk 

116 self._len = self.read_chunk_len() 

117 

118 if self._len == 0: 

119 # Found the final chunk of size 0. The stream is now exhausted, 

120 # but there is still a final newline that should be consumed 

121 self._done = True 

122 

123 if self._len > 0: 

124 # There is data (left) in this chunk, so append it to the 

125 # buffer. If this operation fully consumes the chunk, this will 

126 # reset self._len to 0. 

127 n = min(len(buf), self._len) 

128 

129 # If (read + chunk size) becomes more than len(buf), buf will 

130 # grow beyond the original size and read more data than 

131 # required. So only read as much data as can fit in buf. 

132 if read + n > len(buf): 

133 buf[read:] = self._rfile.read(len(buf) - read) 

134 self._len -= len(buf) - read 

135 read = len(buf) 

136 else: 

137 buf[read : read + n] = self._rfile.read(n) 

138 self._len -= n 

139 read += n 

140 

141 if self._len == 0: 

142 # Skip the terminating newline of a chunk that has been fully 

143 # consumed. This also applies to the 0-sized final chunk 

144 terminator = self._rfile.readline() 

145 if terminator not in (b"\n", b"\r\n", b"\r"): 

146 raise OSError("Missing chunk terminating newline") 

147 

148 return read 

149 

150 

151class WSGIRequestHandler(BaseHTTPRequestHandler): 

152 """A request handler that implements WSGI dispatching.""" 

153 

154 server: BaseWSGIServer 

155 

156 @property 

157 def server_version(self) -> str: # type: ignore 

158 return self.server._server_version 

159 

160 def make_environ(self) -> WSGIEnvironment: 

161 request_url = urlsplit(self.path) 

162 url_scheme = "http" if self.server.ssl_context is None else "https" 

163 

164 if not self.client_address: 

165 self.client_address = ("<local>", 0) 

166 elif isinstance(self.client_address, str): 

167 self.client_address = (self.client_address, 0) 

168 

169 # If there was no scheme but the path started with two slashes, 

170 # the first segment may have been incorrectly parsed as the 

171 # netloc, prepend it to the path again. 

172 if not request_url.scheme and request_url.netloc: 

173 path_info = f"/{request_url.netloc}{request_url.path}" 

174 else: 

175 path_info = request_url.path 

176 

177 path_info = unquote(path_info) 

178 

179 environ: WSGIEnvironment = { 

180 "wsgi.version": (1, 0), 

181 "wsgi.url_scheme": url_scheme, 

182 "wsgi.input": self.rfile, 

183 "wsgi.errors": sys.stderr, 

184 "wsgi.multithread": self.server.multithread, 

185 "wsgi.multiprocess": self.server.multiprocess, 

186 "wsgi.run_once": False, 

187 "werkzeug.socket": self.connection, 

188 "SERVER_SOFTWARE": self.server_version, 

189 "REQUEST_METHOD": self.command, 

190 "SCRIPT_NAME": "", 

191 "PATH_INFO": _wsgi_encoding_dance(path_info), 

192 "QUERY_STRING": _wsgi_encoding_dance(request_url.query), 

193 # Non-standard, added by mod_wsgi, uWSGI 

194 "REQUEST_URI": _wsgi_encoding_dance(self.path), 

195 # Non-standard, added by gunicorn 

196 "RAW_URI": _wsgi_encoding_dance(self.path), 

197 "REMOTE_ADDR": self.address_string(), 

198 "REMOTE_PORT": self.port_integer(), 

199 "SERVER_NAME": self.server.server_address[0], 

200 "SERVER_PORT": str(self.server.server_address[1]), 

201 "SERVER_PROTOCOL": self.request_version, 

202 } 

203 

204 for key, value in self.headers.items(): 

205 if "_" in key: 

206 continue 

207 

208 key = key.upper().replace("-", "_") 

209 value = value.replace("\r\n", "") 

210 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): 

211 key = f"HTTP_{key}" 

212 if key in environ: 

213 value = f"{environ[key]},{value}" 

214 environ[key] = value 

215 

216 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked": 

217 environ["wsgi.input_terminated"] = True 

218 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"]) 

219 

220 # Per RFC 2616, if the URL is absolute, use that as the host. 

221 # We're using "has a scheme" to indicate an absolute URL. 

222 if request_url.scheme and request_url.netloc: 

223 environ["HTTP_HOST"] = request_url.netloc 

224 

225 try: 

226 # binary_form=False gives nicer information, but wouldn't be compatible with 

227 # what Nginx or Apache could return. 

228 peer_cert = self.connection.getpeercert(binary_form=True) 

229 if peer_cert is not None: 

230 # Nginx and Apache use PEM format. 

231 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert) 

232 except ValueError: 

233 # SSL handshake hasn't finished. 

234 self.server.log("error", "Cannot fetch SSL peer certificate info") 

235 except AttributeError: 

236 # Not using TLS, the socket will not have getpeercert(). 

237 pass 

238 

239 return environ 

240 

241 def run_wsgi(self) -> None: 

242 if self.headers.get("Expect", "").lower().strip() == "100-continue": 

243 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

244 

245 self.environ = environ = self.make_environ() 

246 status_set: str | None = None 

247 headers_set: list[tuple[str, str]] | None = None 

248 status_sent: str | None = None 

249 headers_sent: list[tuple[str, str]] | None = None 

250 chunk_response: bool = False 

251 

252 def write(data: bytes) -> None: 

253 nonlocal status_sent, headers_sent, chunk_response 

254 assert status_set is not None, "write() before start_response" 

255 assert headers_set is not None, "write() before start_response" 

256 if status_sent is None: 

257 status_sent = status_set 

258 headers_sent = headers_set 

259 try: 

260 code_str, msg = status_sent.split(None, 1) 

261 except ValueError: 

262 code_str, msg = status_sent, "" 

263 code = int(code_str) 

264 self.send_response(code, msg) 

265 header_keys = set() 

266 for key, value in headers_sent: 

267 self.send_header(key, value) 

268 header_keys.add(key.lower()) 

269 

270 # Use chunked transfer encoding if there is no content 

271 # length. Do not use for 1xx and 204 responses. 304 

272 # responses and HEAD requests are also excluded, which 

273 # is the more conservative behavior and matches other 

274 # parts of the code. 

275 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1 

276 if ( 

277 not ( 

278 "content-length" in header_keys 

279 or environ["REQUEST_METHOD"] == "HEAD" 

280 or (100 <= code < 200) 

281 or code in {204, 304} 

282 ) 

283 and self.protocol_version >= "HTTP/1.1" 

284 ): 

285 chunk_response = True 

286 self.send_header("Transfer-Encoding", "chunked") 

287 

288 # Always close the connection. This disables HTTP/1.1 

289 # keep-alive connections. They aren't handled well by 

290 # Python's http.server because it doesn't know how to 

291 # drain the stream before the next request line. 

292 self.send_header("Connection", "close") 

293 self.end_headers() 

294 

295 assert isinstance(data, bytes), "applications must write bytes" 

296 

297 if data: 

298 if chunk_response: 

299 self.wfile.write(hex(len(data))[2:].encode()) 

300 self.wfile.write(b"\r\n") 

301 

302 self.wfile.write(data) 

303 

304 if chunk_response: 

305 self.wfile.write(b"\r\n") 

306 

307 self.wfile.flush() 

308 

309 def start_response(status, headers, exc_info=None): # type: ignore 

310 nonlocal status_set, headers_set 

311 if exc_info: 

312 try: 

313 if headers_sent: 

314 raise exc_info[1].with_traceback(exc_info[2]) 

315 finally: 

316 exc_info = None 

317 elif headers_set: 

318 raise AssertionError("Headers already set") 

319 status_set = status 

320 headers_set = headers 

321 return write 

322 

323 def execute(app: WSGIApplication) -> None: 

324 application_iter = app(environ, start_response) 

325 try: 

326 for data in application_iter: 

327 write(data) 

328 if not headers_sent: 

329 write(b"") 

330 if chunk_response: 

331 self.wfile.write(b"0\r\n\r\n") 

332 finally: 

333 # Check for any remaining data in the read socket, and discard it. This 

334 # will read past request.max_content_length, but lets the client see a 

335 # 413 response instead of a connection reset failure. If we supported 

336 # keep-alive connections, this naive approach would break by reading the 

337 # next request line. Since we know that write (above) closes every 

338 # connection we can read everything. 

339 selector = selectors.DefaultSelector() 

340 selector.register(self.connection, selectors.EVENT_READ) 

341 total_size = 0 

342 total_reads = 0 

343 

344 # A timeout of 0 tends to fail because a client needs a small amount of 

345 # time to continue sending its data. 

346 while selector.select(timeout=0.01): 

347 # Only read 10MB into memory at a time. 

348 data = self.rfile.read(10_000_000) 

349 total_size += len(data) 

350 total_reads += 1 

351 

352 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends 

353 # more than that, they'll get a connection reset failure. 

354 if not data or total_size >= 10_000_000_000 or total_reads > 1000: 

355 break 

356 

357 selector.close() 

358 

359 if hasattr(application_iter, "close"): 

360 application_iter.close() 

361 

362 try: 

363 execute(self.server.app) 

364 except (ConnectionError, socket.timeout) as e: 

365 self.connection_dropped(e, environ) 

366 except Exception as e: 

367 if self.server.passthrough_errors: 

368 raise 

369 

370 if status_sent is not None and chunk_response: 

371 self.close_connection = True 

372 

373 try: 

374 # if we haven't yet sent the headers but they are set 

375 # we roll back to be able to set them again. 

376 if status_sent is None: 

377 status_set = None 

378 headers_set = None 

379 execute(InternalServerError()) 

380 except Exception: 

381 pass 

382 

383 from .debug.tbtools import DebugTraceback 

384 

385 msg = DebugTraceback(e).render_traceback_text() 

386 self.server.log("error", f"Error on request:\n{msg}") 

387 

388 def handle(self) -> None: 

389 """Handles a request ignoring dropped connections.""" 

390 try: 

391 super().handle() 

392 except (ConnectionError, socket.timeout) as e: 

393 self.connection_dropped(e) 

394 except Exception as e: 

395 if self.server.ssl_context is not None and is_ssl_error(e): 

396 self.log_error("SSL error occurred: %s", e) 

397 else: 

398 raise 

399 

400 def connection_dropped( 

401 self, error: BaseException, environ: WSGIEnvironment | None = None 

402 ) -> None: 

403 """Called if the connection was closed by the client. By default 

404 nothing happens. 

405 """ 

406 

407 def __getattr__(self, name: str) -> t.Any: 

408 # All HTTP methods are handled by run_wsgi. 

409 if name.startswith("do_"): 

410 return self.run_wsgi 

411 

412 # All other attributes are forwarded to the base class. 

413 return getattr(super(), name) 

414 

415 def address_string(self) -> str: 

416 if getattr(self, "environ", None): 

417 return self.environ["REMOTE_ADDR"] # type: ignore 

418 

419 if not self.client_address: 

420 return "<local>" 

421 

422 return self.client_address[0] 

423 

424 def port_integer(self) -> int: 

425 return self.client_address[1] 

426 

427 # Escape control characters. This is defined (but private) in Python 3.12. 

428 _control_char_table = str.maketrans( 

429 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]} 

430 ) 

431 _control_char_table[ord("\\")] = r"\\" 

432 

433 def log_request(self, code: int | str = "-", size: int | str = "-") -> None: 

434 try: 

435 path = uri_to_iri(self.path) 

436 msg = f"{self.command} {path} {self.request_version}" 

437 except AttributeError: 

438 # path isn't set if the requestline was bad 

439 msg = self.requestline 

440 

441 # Escape control characters that may be in the decoded path. 

442 msg = msg.translate(self._control_char_table) 

443 code = str(code) 

444 

445 if code[0] == "1": # 1xx - Informational 

446 msg = _ansi_style(msg, "bold") 

447 elif code == "200": # 2xx - Success 

448 pass 

449 elif code == "304": # 304 - Resource Not Modified 

450 msg = _ansi_style(msg, "cyan") 

451 elif code[0] == "3": # 3xx - Redirection 

452 msg = _ansi_style(msg, "green") 

453 elif code == "404": # 404 - Resource Not Found 

454 msg = _ansi_style(msg, "yellow") 

455 elif code[0] == "4": # 4xx - Client Error 

456 msg = _ansi_style(msg, "bold", "red") 

457 else: # 5xx, or any other response 

458 msg = _ansi_style(msg, "bold", "magenta") 

459 

460 self.log("info", '"%s" %s %s', msg, code, size) 

461 

462 def log_error(self, format: str, *args: t.Any) -> None: 

463 self.log("error", format, *args) 

464 

465 def log_message(self, format: str, *args: t.Any) -> None: 

466 self.log("info", format, *args) 

467 

468 def log(self, type: str, message: str, *args: t.Any) -> None: 

469 _log( 

470 type, 

471 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n", 

472 *args, 

473 ) 

474 

475 

476def _ansi_style(value: str, *styles: str) -> str: 

477 if not _log_add_style: 

478 return value 

479 

480 codes = { 

481 "bold": 1, 

482 "red": 31, 

483 "green": 32, 

484 "yellow": 33, 

485 "magenta": 35, 

486 "cyan": 36, 

487 } 

488 

489 for style in styles: 

490 value = f"\x1b[{codes[style]}m{value}" 

491 

492 return f"{value}\x1b[0m" 

493 

494 

495def generate_adhoc_ssl_pair( 

496 cn: str | None = None, 

497) -> tuple[Certificate, RSAPrivateKeyWithSerialization]: 

498 try: 

499 from cryptography import x509 

500 from cryptography.hazmat.backends import default_backend 

501 from cryptography.hazmat.primitives import hashes 

502 from cryptography.hazmat.primitives.asymmetric import rsa 

503 from cryptography.x509.oid import NameOID 

504 except ImportError: 

505 raise TypeError( 

506 "Using ad-hoc certificates requires the cryptography library." 

507 ) from None 

508 

509 backend = default_backend() 

510 pkey = rsa.generate_private_key( 

511 public_exponent=65537, key_size=2048, backend=backend 

512 ) 

513 

514 # pretty damn sure that this is not actually accepted by anyone 

515 if cn is None: 

516 cn = "*" 

517 

518 subject = x509.Name( 

519 [ 

520 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"), 

521 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

522 ] 

523 ) 

524 

525 backend = default_backend() 

526 cert = ( 

527 x509.CertificateBuilder() 

528 .subject_name(subject) 

529 .issuer_name(subject) 

530 .public_key(pkey.public_key()) 

531 .serial_number(x509.random_serial_number()) 

532 .not_valid_before(dt.now(timezone.utc)) 

533 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365)) 

534 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False) 

535 .add_extension( 

536 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]), 

537 critical=False, 

538 ) 

539 .sign(pkey, hashes.SHA256(), backend) 

540 ) 

541 return cert, pkey 

542 

543 

544def make_ssl_devcert( 

545 base_path: str, host: str | None = None, cn: str | None = None 

546) -> tuple[str, str]: 

547 """Creates an SSL key for development. This should be used instead of 

548 the ``'adhoc'`` key which generates a new cert on each server start. 

549 It accepts a path for where it should store the key and cert and 

550 either a host or CN. If a host is given it will use the CN 

551 ``*.host/CN=host``. 

552 

553 For more information see :func:`run_simple`. 

554 

555 .. versionadded:: 0.9 

556 

557 :param base_path: the path to the certificate and key. The extension 

558 ``.crt`` is added for the certificate, ``.key`` is 

559 added for the key. 

560 :param host: the name of the host. This can be used as an alternative 

561 for the `cn`. 

562 :param cn: the `CN` to use. 

563 """ 

564 

565 if host is not None: 

566 cn = host 

567 cert, pkey = generate_adhoc_ssl_pair(cn=cn) 

568 

569 from cryptography.hazmat.primitives import serialization 

570 

571 cert_file = f"{base_path}.crt" 

572 pkey_file = f"{base_path}.key" 

573 

574 with open(cert_file, "wb") as f: 

575 f.write(cert.public_bytes(serialization.Encoding.PEM)) 

576 with open(pkey_file, "wb") as f: 

577 f.write( 

578 pkey.private_bytes( 

579 encoding=serialization.Encoding.PEM, 

580 format=serialization.PrivateFormat.TraditionalOpenSSL, 

581 encryption_algorithm=serialization.NoEncryption(), 

582 ) 

583 ) 

584 

585 return cert_file, pkey_file 

586 

587 

588def generate_adhoc_ssl_context() -> ssl.SSLContext: 

589 """Generates an adhoc SSL context for the development server.""" 

590 import atexit 

591 import tempfile 

592 

593 cert, pkey = generate_adhoc_ssl_pair() 

594 

595 from cryptography.hazmat.primitives import serialization 

596 

597 cert_handle, cert_file = tempfile.mkstemp() 

598 pkey_handle, pkey_file = tempfile.mkstemp() 

599 atexit.register(os.remove, pkey_file) 

600 atexit.register(os.remove, cert_file) 

601 

602 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM)) 

603 os.write( 

604 pkey_handle, 

605 pkey.private_bytes( 

606 encoding=serialization.Encoding.PEM, 

607 format=serialization.PrivateFormat.TraditionalOpenSSL, 

608 encryption_algorithm=serialization.NoEncryption(), 

609 ), 

610 ) 

611 

612 os.close(cert_handle) 

613 os.close(pkey_handle) 

614 ctx = load_ssl_context(cert_file, pkey_file) 

615 return ctx 

616 

617 

618def load_ssl_context( 

619 cert_file: str, pkey_file: str | None = None, protocol: int | None = None 

620) -> ssl.SSLContext: 

621 """Loads SSL context from cert/private key files and optional protocol. 

622 Many parameters are directly taken from the API of 

623 :py:class:`ssl.SSLContext`. 

624 

625 :param cert_file: Path of the certificate to use. 

626 :param pkey_file: Path of the private key to use. If not given, the key 

627 will be obtained from the certificate file. 

628 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module. 

629 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`. 

630 """ 

631 if protocol is None: 

632 protocol = ssl.PROTOCOL_TLS_SERVER 

633 

634 ctx = ssl.SSLContext(protocol) 

635 ctx.load_cert_chain(cert_file, pkey_file) 

636 return ctx 

637 

638 

639def is_ssl_error(error: Exception | None = None) -> bool: 

640 """Checks if the given error (or the current one) is an SSL error.""" 

641 if error is None: 

642 error = t.cast(Exception, sys.exc_info()[1]) 

643 return isinstance(error, ssl.SSLError) 

644 

645 

646def select_address_family(host: str, port: int) -> socket.AddressFamily: 

647 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on 

648 the host and port.""" 

649 if host.startswith("unix://"): 

650 return socket.AF_UNIX 

651 elif ":" in host and hasattr(socket, "AF_INET6"): 

652 return socket.AF_INET6 

653 return socket.AF_INET 

654 

655 

656def get_sockaddr( 

657 host: str, port: int, family: socket.AddressFamily 

658) -> tuple[str, int] | str: 

659 """Return a fully qualified socket address that can be passed to 

660 :func:`socket.bind`.""" 

661 if family == af_unix: 

662 # Absolute path avoids IDNA encoding error when path starts with dot. 

663 return os.path.abspath(host.partition("://")[2]) 

664 try: 

665 res = socket.getaddrinfo( 

666 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP 

667 ) 

668 except socket.gaierror: 

669 return host, port 

670 return res[0][4] # type: ignore 

671 

672 

673def get_interface_ip(family: socket.AddressFamily) -> str: 

674 """Get the IP address of an external interface. Used when binding to 

675 0.0.0.0 or ::1 to show a more useful URL. 

676 

677 :meta private: 

678 """ 

679 # arbitrary private address 

680 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219" 

681 

682 with socket.socket(family, socket.SOCK_DGRAM) as s: 

683 try: 

684 s.connect((host, 58162)) 

685 except OSError: 

686 return "::1" if family == socket.AF_INET6 else "127.0.0.1" 

687 

688 return s.getsockname()[0] # type: ignore 

689 

690 

691class BaseWSGIServer(HTTPServer): 

692 """A WSGI server that that handles one request at a time. 

693 

694 Use :func:`make_server` to create a server instance. 

695 """ 

696 

697 multithread = False 

698 multiprocess = False 

699 request_queue_size = LISTEN_QUEUE 

700 allow_reuse_address = True 

701 

702 def __init__( 

703 self, 

704 host: str, 

705 port: int, 

706 app: WSGIApplication, 

707 handler: type[WSGIRequestHandler] | None = None, 

708 passthrough_errors: bool = False, 

709 ssl_context: _TSSLContextArg | None = None, 

710 fd: int | None = None, 

711 ) -> None: 

712 if handler is None: 

713 handler = WSGIRequestHandler 

714 

715 # If the handler doesn't directly set a protocol version and 

716 # thread or process workers are used, then allow chunked 

717 # responses and keep-alive connections by enabling HTTP/1.1. 

718 if "protocol_version" not in vars(handler) and ( 

719 self.multithread or self.multiprocess 

720 ): 

721 handler.protocol_version = "HTTP/1.1" 

722 

723 self.host = host 

724 self.port = port 

725 self.app = app 

726 self.passthrough_errors = passthrough_errors 

727 

728 self.address_family = address_family = select_address_family(host, port) 

729 server_address = get_sockaddr(host, int(port), address_family) 

730 

731 # Remove a leftover Unix socket file from a previous run. Don't 

732 # remove a file that was set up by run_simple. 

733 if address_family == af_unix and fd is None: 

734 server_address = t.cast(str, server_address) 

735 

736 if os.path.exists(server_address): 

737 os.unlink(server_address) 

738 

739 # Bind and activate will be handled manually, it should only 

740 # happen if we're not using a socket that was already set up. 

741 super().__init__( 

742 server_address, # type: ignore[arg-type] 

743 handler, 

744 bind_and_activate=False, 

745 ) 

746 

747 if fd is None: 

748 # No existing socket descriptor, do bind_and_activate=True. 

749 try: 

750 self.server_bind() 

751 self.server_activate() 

752 except OSError as e: 

753 # Catch connection issues and show them without the traceback. Show 

754 # extra instructions for address not found, and for macOS. 

755 self.server_close() 

756 print(e.strerror, file=sys.stderr) 

757 

758 if e.errno == errno.EADDRINUSE: 

759 print( 

760 f"Port {port} is in use by another program. Either identify and" 

761 " stop that program, or start the server with a different" 

762 " port.", 

763 file=sys.stderr, 

764 ) 

765 

766 if sys.platform == "darwin" and port == 5000: 

767 print( 

768 "On macOS, try disabling the 'AirPlay Receiver' service" 

769 " from System Preferences -> General -> AirDrop & Handoff.", 

770 file=sys.stderr, 

771 ) 

772 

773 sys.exit(1) 

774 except BaseException: 

775 self.server_close() 

776 raise 

777 else: 

778 # TCPServer automatically opens a socket even if bind_and_activate is False. 

779 # Close it to silence a ResourceWarning. 

780 self.server_close() 

781 

782 # Use the passed in socket directly. 

783 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM) 

784 self.server_address = self.socket.getsockname() 

785 

786 if address_family != af_unix: 

787 # If port was 0, this will record the bound port. 

788 self.port = self.server_address[1] 

789 

790 if ssl_context is not None: 

791 if isinstance(ssl_context, tuple): 

792 ssl_context = load_ssl_context(*ssl_context) 

793 elif ssl_context == "adhoc": 

794 ssl_context = generate_adhoc_ssl_context() 

795 

796 self.socket = ssl_context.wrap_socket(self.socket, server_side=True) 

797 self.ssl_context: ssl.SSLContext | None = ssl_context 

798 else: 

799 self.ssl_context = None 

800 

801 import importlib.metadata 

802 

803 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}" 

804 

805 def log(self, type: str, message: str, *args: t.Any) -> None: 

806 _log(type, message, *args) 

807 

808 def serve_forever(self, poll_interval: float = 0.5) -> None: 

809 try: 

810 super().serve_forever(poll_interval=poll_interval) 

811 except KeyboardInterrupt: 

812 pass 

813 finally: 

814 self.server_close() 

815 

816 def handle_error( 

817 self, request: t.Any, client_address: tuple[str, int] | str 

818 ) -> None: 

819 if self.passthrough_errors: 

820 raise 

821 

822 return super().handle_error(request, client_address) 

823 

824 def log_startup(self) -> None: 

825 """Show information about the address when starting the server.""" 

826 dev_warning = ( 

827 "WARNING: This is a development server. Do not use it in a production" 

828 " deployment. Use a production WSGI server instead." 

829 ) 

830 dev_warning = _ansi_style(dev_warning, "bold", "red") 

831 messages = [dev_warning] 

832 

833 if self.address_family == af_unix: 

834 messages.append(f" * Running on {self.host}") 

835 else: 

836 scheme = "http" if self.ssl_context is None else "https" 

837 display_hostname = self.host 

838 

839 if self.host in {"0.0.0.0", "::"}: 

840 messages.append(f" * Running on all addresses ({self.host})") 

841 

842 if self.host == "0.0.0.0": 

843 localhost = "127.0.0.1" 

844 display_hostname = get_interface_ip(socket.AF_INET) 

845 else: 

846 localhost = "[::1]" 

847 display_hostname = get_interface_ip(socket.AF_INET6) 

848 

849 messages.append(f" * Running on {scheme}://{localhost}:{self.port}") 

850 

851 if ":" in display_hostname: 

852 display_hostname = f"[{display_hostname}]" 

853 

854 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}") 

855 

856 _log("info", "\n".join(messages)) 

857 

858 

859class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer): 

860 """A WSGI server that handles concurrent requests in separate 

861 threads. 

862 

863 Use :func:`make_server` to create a server instance. 

864 """ 

865 

866 multithread = True 

867 daemon_threads = True 

868 

869 

870class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer): 

871 """A WSGI server that handles concurrent requests in separate forked 

872 processes. 

873 

874 Use :func:`make_server` to create a server instance. 

875 """ 

876 

877 multiprocess = True 

878 

879 def __init__( 

880 self, 

881 host: str, 

882 port: int, 

883 app: WSGIApplication, 

884 processes: int = 40, 

885 handler: type[WSGIRequestHandler] | None = None, 

886 passthrough_errors: bool = False, 

887 ssl_context: _TSSLContextArg | None = None, 

888 fd: int | None = None, 

889 ) -> None: 

890 if not can_fork: 

891 raise ValueError("Your platform does not support forking.") 

892 

893 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd) 

894 self.max_children = processes 

895 

896 

897def make_server( 

898 host: str, 

899 port: int, 

900 app: WSGIApplication, 

901 threaded: bool = False, 

902 processes: int = 1, 

903 request_handler: type[WSGIRequestHandler] | None = None, 

904 passthrough_errors: bool = False, 

905 ssl_context: _TSSLContextArg | None = None, 

906 fd: int | None = None, 

907) -> BaseWSGIServer: 

908 """Create an appropriate WSGI server instance based on the value of 

909 ``threaded`` and ``processes``. 

910 

911 This is called from :func:`run_simple`, but can be used separately 

912 to have access to the server object, such as to run it in a separate 

913 thread. 

914 

915 See :func:`run_simple` for parameter docs. 

916 """ 

917 if threaded and processes > 1: 

918 raise ValueError("Cannot have a multi-thread and multi-process server.") 

919 

920 if threaded: 

921 return ThreadedWSGIServer( 

922 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

923 ) 

924 

925 if processes > 1: 

926 return ForkingWSGIServer( 

927 host, 

928 port, 

929 app, 

930 processes, 

931 request_handler, 

932 passthrough_errors, 

933 ssl_context, 

934 fd=fd, 

935 ) 

936 

937 return BaseWSGIServer( 

938 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

939 ) 

940 

941 

942def is_running_from_reloader() -> bool: 

943 """Check if the server is running as a subprocess within the 

944 Werkzeug reloader. 

945 

946 .. versionadded:: 0.10 

947 """ 

948 return os.environ.get("WERKZEUG_RUN_MAIN") == "true" 

949 

950 

951def run_simple( 

952 hostname: str, 

953 port: int, 

954 application: WSGIApplication, 

955 use_reloader: bool = False, 

956 use_debugger: bool = False, 

957 use_evalex: bool = True, 

958 extra_files: t.Iterable[str] | None = None, 

959 exclude_patterns: t.Iterable[str] | None = None, 

960 reloader_interval: int = 1, 

961 reloader_type: str = "auto", 

962 threaded: bool = False, 

963 processes: int = 1, 

964 request_handler: type[WSGIRequestHandler] | None = None, 

965 static_files: dict[str, str | tuple[str, str]] | None = None, 

966 passthrough_errors: bool = False, 

967 ssl_context: _TSSLContextArg | None = None, 

968) -> None: 

969 """Start a development server for a WSGI application. Various 

970 optional features can be enabled. 

971 

972 .. warning:: 

973 

974 Do not use the development server when deploying to production. 

975 It is intended for use only during local development. It is not 

976 designed to be particularly efficient, stable, or secure. 

977 

978 :param hostname: The host to bind to, for example ``'localhost'``. 

979 Can be a domain, IPv4 or IPv6 address, or file path starting 

980 with ``unix://`` for a Unix socket. 

981 :param port: The port to bind to, for example ``8080``. Using ``0`` 

982 tells the OS to pick a random free port. 

983 :param application: The WSGI application to run. 

984 :param use_reloader: Use a reloader process to restart the server 

985 process when files are changed. 

986 :param use_debugger: Use Werkzeug's debugger, which will show 

987 formatted tracebacks on unhandled exceptions. 

988 :param use_evalex: Make the debugger interactive. A Python terminal 

989 can be opened for any frame in the traceback. Some protection is 

990 provided by requiring a PIN, but this should never be enabled 

991 on a publicly visible server. 

992 :param extra_files: The reloader will watch these files for changes 

993 in addition to Python modules. For example, watch a 

994 configuration file. 

995 :param exclude_patterns: The reloader will ignore changes to any 

996 files matching these :mod:`fnmatch` patterns. For example, 

997 ignore cache files. 

998 :param reloader_interval: How often the reloader tries to check for 

999 changes. 

1000 :param reloader_type: The reloader to use. The ``'stat'`` reloader 

1001 is built in, but may require significant CPU to watch files. The 

1002 ``'watchdog'`` reloader is much more efficient but requires 

1003 installing the ``watchdog`` package first. 

1004 :param threaded: Handle concurrent requests using threads. Cannot be 

1005 used with ``processes``. 

1006 :param processes: Handle concurrent requests using up to this number 

1007 of processes. Cannot be used with ``threaded``. 

1008 :param request_handler: Use a different 

1009 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to 

1010 handle requests. 

1011 :param static_files: A dict mapping URL prefixes to directories to 

1012 serve static files from using 

1013 :class:`~werkzeug.middleware.SharedDataMiddleware`. 

1014 :param passthrough_errors: Don't catch unhandled exceptions at the 

1015 server level, let the server crash instead. If ``use_debugger`` 

1016 is enabled, the debugger will still catch such errors. 

1017 :param ssl_context: Configure TLS to serve over HTTPS. Can be an 

1018 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)`` 

1019 tuple to create a typical context, or the string ``'adhoc'`` to 

1020 generate a temporary self-signed certificate. 

1021 

1022 .. versionchanged:: 2.1 

1023 Instructions are shown for dealing with an "address already in 

1024 use" error. 

1025 

1026 .. versionchanged:: 2.1 

1027 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in 

1028 addition to a real IP. 

1029 

1030 .. versionchanged:: 2.1 

1031 The command-line interface was removed. 

1032 

1033 .. versionchanged:: 2.0 

1034 Running on ``0.0.0.0`` or ``::`` shows a real IP address that 

1035 was bound as well as a warning not to run the development server 

1036 in production. 

1037 

1038 .. versionchanged:: 2.0 

1039 The ``exclude_patterns`` parameter was added. 

1040 

1041 .. versionchanged:: 0.15 

1042 Bind to a Unix socket by passing a ``hostname`` that starts with 

1043 ``unix://``. 

1044 

1045 .. versionchanged:: 0.10 

1046 Improved the reloader and added support for changing the backend 

1047 through the ``reloader_type`` parameter. 

1048 

1049 .. versionchanged:: 0.9 

1050 A command-line interface was added. 

1051 

1052 .. versionchanged:: 0.8 

1053 ``ssl_context`` can be a tuple of paths to the certificate and 

1054 private key files. 

1055 

1056 .. versionchanged:: 0.6 

1057 The ``ssl_context`` parameter was added. 

1058 

1059 .. versionchanged:: 0.5 

1060 The ``static_files`` and ``passthrough_errors`` parameters were 

1061 added. 

1062 """ 

1063 if not isinstance(port, int): 

1064 raise TypeError("port must be an integer") 

1065 

1066 if static_files: 

1067 from .middleware.shared_data import SharedDataMiddleware 

1068 

1069 application = SharedDataMiddleware(application, static_files) 

1070 

1071 if use_debugger: 

1072 from .debug import DebuggedApplication 

1073 

1074 application = DebuggedApplication(application, evalex=use_evalex) 

1075 # Allow the specified hostname to use the debugger, in addition to 

1076 # localhost domains. 

1077 application.trusted_hosts.append(hostname) 

1078 

1079 if not is_running_from_reloader(): 

1080 fd = None 

1081 else: 

1082 fd = int(os.environ["WERKZEUG_SERVER_FD"]) 

1083 

1084 srv = make_server( 

1085 hostname, 

1086 port, 

1087 application, 

1088 threaded, 

1089 processes, 

1090 request_handler, 

1091 passthrough_errors, 

1092 ssl_context, 

1093 fd=fd, 

1094 ) 

1095 srv.socket.set_inheritable(True) 

1096 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno()) 

1097 

1098 if not is_running_from_reloader(): 

1099 srv.log_startup() 

1100 _log("info", _ansi_style("Press CTRL+C to quit", "yellow")) 

1101 

1102 if use_reloader: 

1103 from ._reloader import run_with_reloader 

1104 

1105 try: 

1106 run_with_reloader( 

1107 srv.serve_forever, 

1108 extra_files=extra_files, 

1109 exclude_patterns=exclude_patterns, 

1110 interval=reloader_interval, 

1111 reloader_type=reloader_type, 

1112 ) 

1113 finally: 

1114 srv.server_close() 

1115 else: 

1116 srv.serve_forever()