Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

483 statements  

1"""A WSGI and HTTP server for use **during development only**. This 

2server is convenient to use, but is not designed to be particularly 

3stable, secure, or efficient. Use a dedicate WSGI server and HTTP 

4server when deploying to production. 

5 

6It provides features like interactive debugging and code reloading. Use 

7``run_simple`` to start the server. Put this in a ``run.py`` script: 

8 

9.. code-block:: python 

10 

11 from myapp import create_app 

12 from werkzeug import run_simple 

13""" 

14 

15from __future__ import annotations 

16 

17import errno 

18import io 

19import os 

20import selectors 

21import socket 

22import socketserver 

23import sys 

24import typing as t 

25from datetime import datetime as dt 

26from datetime import timedelta 

27from datetime import timezone 

28from http.server import BaseHTTPRequestHandler 

29from http.server import HTTPServer 

30from urllib.parse import unquote 

31from urllib.parse import urlsplit 

32 

33from ._internal import _log 

34from ._internal import _wsgi_encoding_dance 

35from .exceptions import InternalServerError 

36from .urls import uri_to_iri 

37 

38try: 

39 import ssl 

40 

41 connection_dropped_errors: tuple[type[Exception], ...] = ( 

42 ConnectionError, 

43 socket.timeout, 

44 ssl.SSLEOFError, 

45 ) 

46except ImportError: 

47 

48 class _SslDummy: 

49 def __getattr__(self, name: str) -> t.Any: 

50 raise RuntimeError( # noqa: B904 

51 "SSL is unavailable because this Python runtime was not" 

52 " compiled with SSL/TLS support." 

53 ) 

54 

55 ssl = _SslDummy() # type: ignore 

56 connection_dropped_errors = (ConnectionError, socket.timeout) 

57 

58_log_add_style = True 

59 

60if os.name == "nt": 

61 try: 

62 __import__("colorama") 

63 except ImportError: 

64 _log_add_style = False 

65 

66can_fork = hasattr(os, "fork") 

67 

68if can_fork: 

69 ForkingMixIn = socketserver.ForkingMixIn 

70else: 

71 

72 class ForkingMixIn: # type: ignore 

73 pass 

74 

75 

76try: 

77 af_unix = socket.AF_UNIX 

78except AttributeError: 

79 af_unix = None # type: ignore 

80 

81LISTEN_QUEUE = 128 

82 

83_TSSLContextArg = t.Optional[ 

84 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], t.Literal["adhoc"]] 

85] 

86 

87if t.TYPE_CHECKING: 

88 from _typeshed.wsgi import WSGIApplication 

89 from _typeshed.wsgi import WSGIEnvironment 

90 from cryptography.hazmat.primitives.asymmetric.rsa import ( 

91 RSAPrivateKeyWithSerialization, 

92 ) 

93 from cryptography.x509 import Certificate 

94 

95 

96class DechunkedInput(io.RawIOBase): 

97 """An input stream that handles Transfer-Encoding 'chunked'""" 

98 

99 def __init__(self, rfile: t.IO[bytes]) -> None: 

100 self._rfile = rfile 

101 self._done = False 

102 self._len = 0 

103 

104 def readable(self) -> bool: 

105 return True 

106 

107 def read_chunk_len(self) -> int: 

108 try: 

109 line = self._rfile.readline().decode("latin1") 

110 _len = int(line.strip(), 16) 

111 except ValueError as e: 

112 raise OSError("Invalid chunk header") from e 

113 if _len < 0: 

114 raise OSError("Negative chunk length not allowed") 

115 return _len 

116 

117 def readinto(self, buf: bytearray) -> int: # type: ignore 

118 read = 0 

119 while not self._done and read < len(buf): 

120 if self._len == 0: 

121 # This is the first chunk or we fully consumed the previous 

122 # one. Read the next length of the next chunk 

123 self._len = self.read_chunk_len() 

124 

125 if self._len == 0: 

126 # Found the final chunk of size 0. The stream is now exhausted, 

127 # but there is still a final newline that should be consumed 

128 self._done = True 

129 

130 if self._len > 0: 

131 # There is data (left) in this chunk, so append it to the 

132 # buffer. If this operation fully consumes the chunk, this will 

133 # reset self._len to 0. 

134 n = min(len(buf), self._len) 

135 

136 # If (read + chunk size) becomes more than len(buf), buf will 

137 # grow beyond the original size and read more data than 

138 # required. So only read as much data as can fit in buf. 

139 if read + n > len(buf): 

140 buf[read:] = self._rfile.read(len(buf) - read) 

141 self._len -= len(buf) - read 

142 read = len(buf) 

143 else: 

144 buf[read : read + n] = self._rfile.read(n) 

145 self._len -= n 

146 read += n 

147 

148 if self._len == 0: 

149 # Skip the terminating newline of a chunk that has been fully 

150 # consumed. This also applies to the 0-sized final chunk 

151 terminator = self._rfile.readline() 

152 if terminator not in (b"\n", b"\r\n", b"\r"): 

153 raise OSError("Missing chunk terminating newline") 

154 

155 return read 

156 

157 

158class WSGIRequestHandler(BaseHTTPRequestHandler): 

159 """A request handler that implements WSGI dispatching.""" 

160 

161 server: BaseWSGIServer 

162 

163 @property 

164 def server_version(self) -> str: # type: ignore 

165 return self.server._server_version 

166 

167 def make_environ(self) -> WSGIEnvironment: 

168 request_url = urlsplit(self.path) 

169 url_scheme = "http" if self.server.ssl_context is None else "https" 

170 

171 if not self.client_address: 

172 self.client_address = ("<local>", 0) 

173 elif isinstance(self.client_address, str): 

174 self.client_address = (self.client_address, 0) 

175 

176 # If there was no scheme but the path started with two slashes, 

177 # the first segment may have been incorrectly parsed as the 

178 # netloc, prepend it to the path again. 

179 if not request_url.scheme and request_url.netloc: 

180 path_info = f"/{request_url.netloc}{request_url.path}" 

181 else: 

182 path_info = request_url.path 

183 

184 path_info = unquote(path_info) 

185 

186 environ: WSGIEnvironment = { 

187 "wsgi.version": (1, 0), 

188 "wsgi.url_scheme": url_scheme, 

189 "wsgi.input": self.rfile, 

190 "wsgi.errors": sys.stderr, 

191 "wsgi.multithread": self.server.multithread, 

192 "wsgi.multiprocess": self.server.multiprocess, 

193 "wsgi.run_once": False, 

194 "werkzeug.socket": self.connection, 

195 "SERVER_SOFTWARE": self.server_version, 

196 "REQUEST_METHOD": self.command, 

197 "SCRIPT_NAME": "", 

198 "PATH_INFO": _wsgi_encoding_dance(path_info), 

199 "QUERY_STRING": _wsgi_encoding_dance(request_url.query), 

200 # Non-standard, added by mod_wsgi, uWSGI 

201 "REQUEST_URI": _wsgi_encoding_dance(self.path), 

202 # Non-standard, added by gunicorn 

203 "RAW_URI": _wsgi_encoding_dance(self.path), 

204 "REMOTE_ADDR": self.address_string(), 

205 "REMOTE_PORT": self.port_integer(), 

206 "SERVER_NAME": self.server.server_address[0], 

207 "SERVER_PORT": str(self.server.server_address[1]), 

208 "SERVER_PROTOCOL": self.request_version, 

209 } 

210 

211 for key, value in self.headers.items(): 

212 if "_" in key: 

213 continue 

214 

215 key = key.upper().replace("-", "_") 

216 value = value.replace("\r\n", "") 

217 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): 

218 key = f"HTTP_{key}" 

219 if key in environ: 

220 value = f"{environ[key]},{value}" 

221 environ[key] = value 

222 

223 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked": 

224 environ["wsgi.input_terminated"] = True 

225 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"]) 

226 

227 # Per RFC 2616, if the URL is absolute, use that as the host. 

228 # We're using "has a scheme" to indicate an absolute URL. 

229 if request_url.scheme and request_url.netloc: 

230 environ["HTTP_HOST"] = request_url.netloc 

231 

232 try: 

233 # binary_form=False gives nicer information, but wouldn't be compatible with 

234 # what Nginx or Apache could return. 

235 peer_cert = self.connection.getpeercert(binary_form=True) 

236 if peer_cert is not None: 

237 # Nginx and Apache use PEM format. 

238 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert) 

239 except ValueError: 

240 # SSL handshake hasn't finished. 

241 self.server.log("error", "Cannot fetch SSL peer certificate info") 

242 except AttributeError: 

243 # Not using TLS, the socket will not have getpeercert(). 

244 pass 

245 

246 return environ 

247 

248 def run_wsgi(self) -> None: 

249 if self.headers.get("Expect", "").lower().strip() == "100-continue": 

250 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

251 

252 self.environ = environ = self.make_environ() 

253 status_set: str | None = None 

254 headers_set: list[tuple[str, str]] | None = None 

255 status_sent: str | None = None 

256 headers_sent: list[tuple[str, str]] | None = None 

257 chunk_response: bool = False 

258 

259 def write(data: bytes) -> None: 

260 nonlocal status_sent, headers_sent, chunk_response 

261 assert status_set is not None, "write() before start_response" 

262 assert headers_set is not None, "write() before start_response" 

263 if status_sent is None: 

264 status_sent = status_set 

265 headers_sent = headers_set 

266 try: 

267 code_str, msg = status_sent.split(None, 1) 

268 except ValueError: 

269 code_str, msg = status_sent, "" 

270 code = int(code_str) 

271 self.send_response(code, msg) 

272 header_keys = set() 

273 for key, value in headers_sent: 

274 self.send_header(key, value) 

275 header_keys.add(key.lower()) 

276 

277 # Use chunked transfer encoding if there is no content 

278 # length. Do not use for 1xx and 204 responses. 304 

279 # responses and HEAD requests are also excluded, which 

280 # is the more conservative behavior and matches other 

281 # parts of the code. 

282 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1 

283 if ( 

284 not ( 

285 "content-length" in header_keys 

286 or environ["REQUEST_METHOD"] == "HEAD" 

287 or (100 <= code < 200) 

288 or code in {204, 304} 

289 ) 

290 and self.protocol_version >= "HTTP/1.1" 

291 ): 

292 chunk_response = True 

293 self.send_header("Transfer-Encoding", "chunked") 

294 

295 # Always close the connection. This disables HTTP/1.1 

296 # keep-alive connections. They aren't handled well by 

297 # Python's http.server because it doesn't know how to 

298 # drain the stream before the next request line. 

299 self.send_header("Connection", "close") 

300 self.end_headers() 

301 

302 assert isinstance(data, bytes), "applications must write bytes" 

303 

304 if data: 

305 if chunk_response: 

306 self.wfile.write(hex(len(data))[2:].encode()) 

307 self.wfile.write(b"\r\n") 

308 

309 self.wfile.write(data) 

310 

311 if chunk_response: 

312 self.wfile.write(b"\r\n") 

313 

314 self.wfile.flush() 

315 

316 def start_response(status, headers, exc_info=None): # type: ignore 

317 nonlocal status_set, headers_set 

318 if exc_info: 

319 try: 

320 if headers_sent: 

321 raise exc_info[1].with_traceback(exc_info[2]) 

322 finally: 

323 exc_info = None 

324 elif headers_set: 

325 raise AssertionError("Headers already set") 

326 status_set = status 

327 headers_set = headers 

328 return write 

329 

330 def execute(app: WSGIApplication) -> None: 

331 application_iter = app(environ, start_response) 

332 try: 

333 for data in application_iter: 

334 write(data) 

335 if not headers_sent: 

336 write(b"") 

337 if chunk_response: 

338 self.wfile.write(b"0\r\n\r\n") 

339 finally: 

340 # Check for any remaining data in the read socket, and discard it. This 

341 # will read past request.max_content_length, but lets the client see a 

342 # 413 response instead of a connection reset failure. If we supported 

343 # keep-alive connections, this naive approach would break by reading the 

344 # next request line. Since we know that write (above) closes every 

345 # connection we can read everything. 

346 selector = selectors.DefaultSelector() 

347 selector.register(self.connection, selectors.EVENT_READ) 

348 total_size = 0 

349 total_reads = 0 

350 

351 # A timeout of 0 tends to fail because a client needs a small amount of 

352 # time to continue sending its data. 

353 while selector.select(timeout=0.01): 

354 # Only read 10MB into memory at a time. 

355 data = self.rfile.read(10_000_000) 

356 total_size += len(data) 

357 total_reads += 1 

358 

359 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends 

360 # more than that, they'll get a connection reset failure. 

361 if not data or total_size >= 10_000_000_000 or total_reads > 1000: 

362 break 

363 

364 selector.close() 

365 

366 if hasattr(application_iter, "close"): 

367 application_iter.close() 

368 

369 try: 

370 execute(self.server.app) 

371 except connection_dropped_errors as e: 

372 self.connection_dropped(e, environ) 

373 except Exception as e: 

374 if self.server.passthrough_errors: 

375 raise 

376 

377 if status_sent is not None and chunk_response: 

378 self.close_connection = True 

379 

380 try: 

381 # if we haven't yet sent the headers but they are set 

382 # we roll back to be able to set them again. 

383 if status_sent is None: 

384 status_set = None 

385 headers_set = None 

386 execute(InternalServerError()) 

387 except Exception: 

388 pass 

389 

390 from .debug.tbtools import DebugTraceback 

391 

392 msg = DebugTraceback(e).render_traceback_text() 

393 self.server.log("error", f"Error on request:\n{msg}") 

394 

395 def handle(self) -> None: 

396 """Handles a request ignoring dropped connections.""" 

397 try: 

398 super().handle() 

399 except (ConnectionError, socket.timeout) as e: 

400 self.connection_dropped(e) 

401 except Exception as e: 

402 if self.server.ssl_context is not None and is_ssl_error(e): 

403 self.log_error("SSL error occurred: %s", e) 

404 else: 

405 raise 

406 

407 def connection_dropped( 

408 self, error: BaseException, environ: WSGIEnvironment | None = None 

409 ) -> None: 

410 """Called if the connection was closed by the client. By default 

411 nothing happens. 

412 """ 

413 

414 def __getattr__(self, name: str) -> t.Any: 

415 # All HTTP methods are handled by run_wsgi. 

416 if name.startswith("do_"): 

417 return self.run_wsgi 

418 

419 # All other attributes are forwarded to the base class. 

420 return getattr(super(), name) 

421 

422 def address_string(self) -> str: 

423 if getattr(self, "environ", None): 

424 return self.environ["REMOTE_ADDR"] # type: ignore 

425 

426 if not self.client_address: 

427 return "<local>" 

428 

429 return self.client_address[0] 

430 

431 def port_integer(self) -> int: 

432 return self.client_address[1] 

433 

434 # Escape control characters. This is defined (but private) in Python 3.12. 

435 _control_char_table = str.maketrans( 

436 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]} 

437 ) 

438 _control_char_table[ord("\\")] = r"\\" 

439 

440 def log_request(self, code: int | str = "-", size: int | str = "-") -> None: 

441 try: 

442 path = uri_to_iri(self.path) 

443 msg = f"{self.command} {path} {self.request_version}" 

444 except AttributeError: 

445 # path isn't set if the requestline was bad 

446 msg = self.requestline 

447 

448 # Escape control characters that may be in the decoded path. 

449 msg = msg.translate(self._control_char_table) 

450 code = str(code) 

451 

452 if code[0] == "1": # 1xx - Informational 

453 msg = _ansi_style(msg, "bold") 

454 elif code == "200": # 2xx - Success 

455 pass 

456 elif code == "304": # 304 - Resource Not Modified 

457 msg = _ansi_style(msg, "cyan") 

458 elif code[0] == "3": # 3xx - Redirection 

459 msg = _ansi_style(msg, "green") 

460 elif code == "404": # 404 - Resource Not Found 

461 msg = _ansi_style(msg, "yellow") 

462 elif code[0] == "4": # 4xx - Client Error 

463 msg = _ansi_style(msg, "bold", "red") 

464 else: # 5xx, or any other response 

465 msg = _ansi_style(msg, "bold", "magenta") 

466 

467 self.log("info", '"%s" %s %s', msg, code, size) 

468 

469 def log_error(self, format: str, *args: t.Any) -> None: 

470 self.log("error", format, *args) 

471 

472 def log_message(self, format: str, *args: t.Any) -> None: 

473 self.log("info", format, *args) 

474 

475 def log(self, type: str, message: str, *args: t.Any) -> None: 

476 _log( 

477 type, 

478 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n", 

479 *args, 

480 ) 

481 

482 

483def _ansi_style(value: str, *styles: str) -> str: 

484 if not _log_add_style: 

485 return value 

486 

487 codes = { 

488 "bold": 1, 

489 "red": 31, 

490 "green": 32, 

491 "yellow": 33, 

492 "magenta": 35, 

493 "cyan": 36, 

494 } 

495 

496 for style in styles: 

497 value = f"\x1b[{codes[style]}m{value}" 

498 

499 return f"{value}\x1b[0m" 

500 

501 

502def generate_adhoc_ssl_pair( 

503 cn: str | None = None, 

504) -> tuple[Certificate, RSAPrivateKeyWithSerialization]: 

505 try: 

506 from cryptography import x509 

507 from cryptography.hazmat.backends import default_backend 

508 from cryptography.hazmat.primitives import hashes 

509 from cryptography.hazmat.primitives.asymmetric import rsa 

510 from cryptography.x509.oid import NameOID 

511 except ImportError: 

512 raise TypeError( 

513 "Using ad-hoc certificates requires the cryptography library." 

514 ) from None 

515 

516 backend = default_backend() 

517 pkey = rsa.generate_private_key( 

518 public_exponent=65537, key_size=2048, backend=backend 

519 ) 

520 

521 # pretty damn sure that this is not actually accepted by anyone 

522 if cn is None: 

523 cn = "*" 

524 

525 subject = x509.Name( 

526 [ 

527 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"), 

528 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

529 ] 

530 ) 

531 

532 backend = default_backend() 

533 cert = ( 

534 x509.CertificateBuilder() 

535 .subject_name(subject) 

536 .issuer_name(subject) 

537 .public_key(pkey.public_key()) 

538 .serial_number(x509.random_serial_number()) 

539 .not_valid_before(dt.now(timezone.utc)) 

540 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365)) 

541 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False) 

542 .add_extension( 

543 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]), 

544 critical=False, 

545 ) 

546 .sign(pkey, hashes.SHA256(), backend) 

547 ) 

548 return cert, pkey 

549 

550 

551def make_ssl_devcert( 

552 base_path: str, host: str | None = None, cn: str | None = None 

553) -> tuple[str, str]: 

554 """Creates an SSL key for development. This should be used instead of 

555 the ``'adhoc'`` key which generates a new cert on each server start. 

556 It accepts a path for where it should store the key and cert and 

557 either a host or CN. If a host is given it will use the CN 

558 ``*.host/CN=host``. 

559 

560 For more information see :func:`run_simple`. 

561 

562 .. versionadded:: 0.9 

563 

564 :param base_path: the path to the certificate and key. The extension 

565 ``.crt`` is added for the certificate, ``.key`` is 

566 added for the key. 

567 :param host: the name of the host. This can be used as an alternative 

568 for the `cn`. 

569 :param cn: the `CN` to use. 

570 """ 

571 

572 if host is not None: 

573 cn = host 

574 cert, pkey = generate_adhoc_ssl_pair(cn=cn) 

575 

576 from cryptography.hazmat.primitives import serialization 

577 

578 cert_file = f"{base_path}.crt" 

579 pkey_file = f"{base_path}.key" 

580 

581 with open(cert_file, "wb") as f: 

582 f.write(cert.public_bytes(serialization.Encoding.PEM)) 

583 with open(pkey_file, "wb") as f: 

584 f.write( 

585 pkey.private_bytes( 

586 encoding=serialization.Encoding.PEM, 

587 format=serialization.PrivateFormat.TraditionalOpenSSL, 

588 encryption_algorithm=serialization.NoEncryption(), 

589 ) 

590 ) 

591 

592 return cert_file, pkey_file 

593 

594 

595def generate_adhoc_ssl_context() -> ssl.SSLContext: 

596 """Generates an adhoc SSL context for the development server.""" 

597 import atexit 

598 import tempfile 

599 

600 cert, pkey = generate_adhoc_ssl_pair() 

601 

602 from cryptography.hazmat.primitives import serialization 

603 

604 cert_handle, cert_file = tempfile.mkstemp() 

605 pkey_handle, pkey_file = tempfile.mkstemp() 

606 atexit.register(os.remove, pkey_file) 

607 atexit.register(os.remove, cert_file) 

608 

609 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM)) 

610 os.write( 

611 pkey_handle, 

612 pkey.private_bytes( 

613 encoding=serialization.Encoding.PEM, 

614 format=serialization.PrivateFormat.TraditionalOpenSSL, 

615 encryption_algorithm=serialization.NoEncryption(), 

616 ), 

617 ) 

618 

619 os.close(cert_handle) 

620 os.close(pkey_handle) 

621 ctx = load_ssl_context(cert_file, pkey_file) 

622 return ctx 

623 

624 

625def load_ssl_context( 

626 cert_file: str, pkey_file: str | None = None, protocol: int | None = None 

627) -> ssl.SSLContext: 

628 """Loads SSL context from cert/private key files and optional protocol. 

629 Many parameters are directly taken from the API of 

630 :py:class:`ssl.SSLContext`. 

631 

632 :param cert_file: Path of the certificate to use. 

633 :param pkey_file: Path of the private key to use. If not given, the key 

634 will be obtained from the certificate file. 

635 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module. 

636 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`. 

637 """ 

638 if protocol is None: 

639 protocol = ssl.PROTOCOL_TLS_SERVER 

640 

641 ctx = ssl.SSLContext(protocol) 

642 ctx.load_cert_chain(cert_file, pkey_file) 

643 return ctx 

644 

645 

646def is_ssl_error(error: Exception | None = None) -> bool: 

647 """Checks if the given error (or the current one) is an SSL error.""" 

648 if error is None: 

649 error = t.cast(Exception, sys.exc_info()[1]) 

650 return isinstance(error, ssl.SSLError) 

651 

652 

653def select_address_family(host: str, port: int) -> socket.AddressFamily: 

654 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on 

655 the host and port.""" 

656 if host.startswith("unix://"): 

657 return socket.AF_UNIX 

658 elif ":" in host and hasattr(socket, "AF_INET6"): 

659 return socket.AF_INET6 

660 return socket.AF_INET 

661 

662 

663def get_sockaddr( 

664 host: str, port: int, family: socket.AddressFamily 

665) -> tuple[str, int] | str: 

666 """Return a fully qualified socket address that can be passed to 

667 :func:`socket.bind`.""" 

668 if family == af_unix: 

669 # Absolute path avoids IDNA encoding error when path starts with dot. 

670 return os.path.abspath(host.partition("://")[2]) 

671 try: 

672 res = socket.getaddrinfo( 

673 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP 

674 ) 

675 except socket.gaierror: 

676 return host, port 

677 return res[0][4] # type: ignore 

678 

679 

680def get_interface_ip(family: socket.AddressFamily) -> str: 

681 """Get the IP address of an external interface. Used when binding to 

682 0.0.0.0 or ::1 to show a more useful URL. 

683 

684 :meta private: 

685 """ 

686 # arbitrary private address 

687 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219" 

688 

689 with socket.socket(family, socket.SOCK_DGRAM) as s: 

690 try: 

691 s.connect((host, 58162)) 

692 except OSError: 

693 return "::1" if family == socket.AF_INET6 else "127.0.0.1" 

694 

695 return s.getsockname()[0] # type: ignore 

696 

697 

698class BaseWSGIServer(HTTPServer): 

699 """A WSGI server that that handles one request at a time. 

700 

701 Use :func:`make_server` to create a server instance. 

702 """ 

703 

704 multithread = False 

705 multiprocess = False 

706 request_queue_size = LISTEN_QUEUE 

707 allow_reuse_address = True 

708 

709 def __init__( 

710 self, 

711 host: str, 

712 port: int, 

713 app: WSGIApplication, 

714 handler: type[WSGIRequestHandler] | None = None, 

715 passthrough_errors: bool = False, 

716 ssl_context: _TSSLContextArg | None = None, 

717 fd: int | None = None, 

718 ) -> None: 

719 if handler is None: 

720 handler = WSGIRequestHandler 

721 

722 # If the handler doesn't directly set a protocol version and 

723 # thread or process workers are used, then allow chunked 

724 # responses and keep-alive connections by enabling HTTP/1.1. 

725 if "protocol_version" not in vars(handler) and ( 

726 self.multithread or self.multiprocess 

727 ): 

728 handler.protocol_version = "HTTP/1.1" 

729 

730 self.host = host 

731 self.port = port 

732 self.app = app 

733 self.passthrough_errors = passthrough_errors 

734 

735 self.address_family = address_family = select_address_family(host, port) 

736 server_address = get_sockaddr(host, int(port), address_family) 

737 

738 # Remove a leftover Unix socket file from a previous run. Don't 

739 # remove a file that was set up by run_simple. 

740 if address_family == af_unix and fd is None: 

741 server_address = t.cast(str, server_address) 

742 

743 if os.path.exists(server_address): 

744 os.unlink(server_address) 

745 

746 # Bind and activate will be handled manually, it should only 

747 # happen if we're not using a socket that was already set up. 

748 super().__init__( 

749 server_address, # type: ignore[arg-type] 

750 handler, 

751 bind_and_activate=False, 

752 ) 

753 

754 if fd is None: 

755 # No existing socket descriptor, do bind_and_activate=True. 

756 try: 

757 self.server_bind() 

758 self.server_activate() 

759 except OSError as e: 

760 # Catch connection issues and show them without the traceback. Show 

761 # extra instructions for address not found, and for macOS. 

762 self.server_close() 

763 print(e.strerror, file=sys.stderr) 

764 

765 if e.errno == errno.EADDRINUSE: 

766 print( 

767 f"Port {port} is in use by another program. Either identify and" 

768 " stop that program, or start the server with a different" 

769 " port.", 

770 file=sys.stderr, 

771 ) 

772 

773 if sys.platform == "darwin" and port == 5000: 

774 print( 

775 "On macOS, try disabling the 'AirPlay Receiver' service" 

776 " from System Preferences -> General -> AirDrop & Handoff.", 

777 file=sys.stderr, 

778 ) 

779 

780 sys.exit(1) 

781 except BaseException: 

782 self.server_close() 

783 raise 

784 else: 

785 # TCPServer automatically opens a socket even if bind_and_activate is False. 

786 # Close it to silence a ResourceWarning. 

787 self.server_close() 

788 

789 # Use the passed in socket directly. 

790 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM) 

791 self.server_address = self.socket.getsockname() 

792 

793 if address_family != af_unix: 

794 # If port was 0, this will record the bound port. 

795 self.port = self.server_address[1] 

796 

797 if ssl_context is not None: 

798 if isinstance(ssl_context, tuple): 

799 ssl_context = load_ssl_context(*ssl_context) 

800 elif ssl_context == "adhoc": 

801 ssl_context = generate_adhoc_ssl_context() 

802 

803 self.socket = ssl_context.wrap_socket(self.socket, server_side=True) 

804 self.ssl_context: ssl.SSLContext | None = ssl_context 

805 else: 

806 self.ssl_context = None 

807 

808 import importlib.metadata 

809 

810 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}" 

811 

812 def log(self, type: str, message: str, *args: t.Any) -> None: 

813 _log(type, message, *args) 

814 

815 def serve_forever(self, poll_interval: float = 0.5) -> None: 

816 try: 

817 super().serve_forever(poll_interval=poll_interval) 

818 except KeyboardInterrupt: 

819 pass 

820 finally: 

821 self.server_close() 

822 

823 def handle_error( 

824 self, request: t.Any, client_address: tuple[str, int] | str 

825 ) -> None: 

826 if self.passthrough_errors: 

827 raise 

828 

829 return super().handle_error(request, client_address) 

830 

831 def log_startup(self) -> None: 

832 """Show information about the address when starting the server.""" 

833 dev_warning = ( 

834 "WARNING: This is a development server. Do not use it in a production" 

835 " deployment. Use a production WSGI server instead." 

836 ) 

837 dev_warning = _ansi_style(dev_warning, "bold", "red") 

838 messages = [dev_warning] 

839 

840 if self.address_family == af_unix: 

841 messages.append(f" * Running on {self.host}") 

842 else: 

843 scheme = "http" if self.ssl_context is None else "https" 

844 display_hostname = self.host 

845 

846 if self.host in {"0.0.0.0", "::"}: 

847 messages.append(f" * Running on all addresses ({self.host})") 

848 

849 if self.host == "0.0.0.0": 

850 localhost = "127.0.0.1" 

851 display_hostname = get_interface_ip(socket.AF_INET) 

852 else: 

853 localhost = "[::1]" 

854 display_hostname = get_interface_ip(socket.AF_INET6) 

855 

856 messages.append(f" * Running on {scheme}://{localhost}:{self.port}") 

857 

858 if ":" in display_hostname: 

859 display_hostname = f"[{display_hostname}]" 

860 

861 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}") 

862 

863 _log("info", "\n".join(messages)) 

864 

865 

866class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer): 

867 """A WSGI server that handles concurrent requests in separate 

868 threads. 

869 

870 Use :func:`make_server` to create a server instance. 

871 """ 

872 

873 multithread = True 

874 daemon_threads = True 

875 

876 

877class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer): 

878 """A WSGI server that handles concurrent requests in separate forked 

879 processes. 

880 

881 Use :func:`make_server` to create a server instance. 

882 """ 

883 

884 multiprocess = True 

885 

886 def __init__( 

887 self, 

888 host: str, 

889 port: int, 

890 app: WSGIApplication, 

891 processes: int = 40, 

892 handler: type[WSGIRequestHandler] | None = None, 

893 passthrough_errors: bool = False, 

894 ssl_context: _TSSLContextArg | None = None, 

895 fd: int | None = None, 

896 ) -> None: 

897 if not can_fork: 

898 raise ValueError("Your platform does not support forking.") 

899 

900 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd) 

901 self.max_children = processes 

902 

903 

904def make_server( 

905 host: str, 

906 port: int, 

907 app: WSGIApplication, 

908 threaded: bool = False, 

909 processes: int = 1, 

910 request_handler: type[WSGIRequestHandler] | None = None, 

911 passthrough_errors: bool = False, 

912 ssl_context: _TSSLContextArg | None = None, 

913 fd: int | None = None, 

914) -> BaseWSGIServer: 

915 """Create an appropriate WSGI server instance based on the value of 

916 ``threaded`` and ``processes``. 

917 

918 This is called from :func:`run_simple`, but can be used separately 

919 to have access to the server object, such as to run it in a separate 

920 thread. 

921 

922 See :func:`run_simple` for parameter docs. 

923 """ 

924 if threaded and processes > 1: 

925 raise ValueError("Cannot have a multi-thread and multi-process server.") 

926 

927 if threaded: 

928 return ThreadedWSGIServer( 

929 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

930 ) 

931 

932 if processes > 1: 

933 return ForkingWSGIServer( 

934 host, 

935 port, 

936 app, 

937 processes, 

938 request_handler, 

939 passthrough_errors, 

940 ssl_context, 

941 fd=fd, 

942 ) 

943 

944 return BaseWSGIServer( 

945 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

946 ) 

947 

948 

949def is_running_from_reloader() -> bool: 

950 """Check if the server is running as a subprocess within the 

951 Werkzeug reloader. 

952 

953 .. versionadded:: 0.10 

954 """ 

955 return os.environ.get("WERKZEUG_RUN_MAIN") == "true" 

956 

957 

958def run_simple( 

959 hostname: str, 

960 port: int, 

961 application: WSGIApplication, 

962 use_reloader: bool = False, 

963 use_debugger: bool = False, 

964 use_evalex: bool = True, 

965 extra_files: t.Iterable[str] | None = None, 

966 exclude_patterns: t.Iterable[str] | None = None, 

967 reloader_interval: int = 1, 

968 reloader_type: str = "auto", 

969 threaded: bool = False, 

970 processes: int = 1, 

971 request_handler: type[WSGIRequestHandler] | None = None, 

972 static_files: dict[str, str | tuple[str, str]] | None = None, 

973 passthrough_errors: bool = False, 

974 ssl_context: _TSSLContextArg | None = None, 

975) -> None: 

976 """Start a development server for a WSGI application. Various 

977 optional features can be enabled. 

978 

979 .. warning:: 

980 

981 Do not use the development server when deploying to production. 

982 It is intended for use only during local development. It is not 

983 designed to be particularly efficient, stable, or secure. 

984 

985 :param hostname: The host to bind to, for example ``'localhost'``. 

986 Can be a domain, IPv4 or IPv6 address, or file path starting 

987 with ``unix://`` for a Unix socket. 

988 :param port: The port to bind to, for example ``8080``. Using ``0`` 

989 tells the OS to pick a random free port. 

990 :param application: The WSGI application to run. 

991 :param use_reloader: Use a reloader process to restart the server 

992 process when files are changed. 

993 :param use_debugger: Use Werkzeug's debugger, which will show 

994 formatted tracebacks on unhandled exceptions. 

995 :param use_evalex: Make the debugger interactive. A Python terminal 

996 can be opened for any frame in the traceback. Some protection is 

997 provided by requiring a PIN, but this should never be enabled 

998 on a publicly visible server. 

999 :param extra_files: The reloader will watch these files for changes 

1000 in addition to Python modules. For example, watch a 

1001 configuration file. 

1002 :param exclude_patterns: The reloader will ignore changes to any 

1003 files matching these :mod:`fnmatch` patterns. For example, 

1004 ignore cache files. 

1005 :param reloader_interval: How often the reloader tries to check for 

1006 changes. 

1007 :param reloader_type: The reloader to use. The ``'stat'`` reloader 

1008 is built in, but may require significant CPU to watch files. The 

1009 ``'watchdog'`` reloader is much more efficient but requires 

1010 installing the ``watchdog`` package first. 

1011 :param threaded: Handle concurrent requests using threads. Cannot be 

1012 used with ``processes``. 

1013 :param processes: Handle concurrent requests using up to this number 

1014 of processes. Cannot be used with ``threaded``. 

1015 :param request_handler: Use a different 

1016 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to 

1017 handle requests. 

1018 :param static_files: A dict mapping URL prefixes to directories to 

1019 serve static files from using 

1020 :class:`~werkzeug.middleware.SharedDataMiddleware`. 

1021 :param passthrough_errors: Don't catch unhandled exceptions at the 

1022 server level, let the server crash instead. If ``use_debugger`` 

1023 is enabled, the debugger will still catch such errors. 

1024 :param ssl_context: Configure TLS to serve over HTTPS. Can be an 

1025 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)`` 

1026 tuple to create a typical context, or the string ``'adhoc'`` to 

1027 generate a temporary self-signed certificate. 

1028 

1029 .. versionchanged:: 2.1 

1030 Instructions are shown for dealing with an "address already in 

1031 use" error. 

1032 

1033 .. versionchanged:: 2.1 

1034 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in 

1035 addition to a real IP. 

1036 

1037 .. versionchanged:: 2.1 

1038 The command-line interface was removed. 

1039 

1040 .. versionchanged:: 2.0 

1041 Running on ``0.0.0.0`` or ``::`` shows a real IP address that 

1042 was bound as well as a warning not to run the development server 

1043 in production. 

1044 

1045 .. versionchanged:: 2.0 

1046 The ``exclude_patterns`` parameter was added. 

1047 

1048 .. versionchanged:: 0.15 

1049 Bind to a Unix socket by passing a ``hostname`` that starts with 

1050 ``unix://``. 

1051 

1052 .. versionchanged:: 0.10 

1053 Improved the reloader and added support for changing the backend 

1054 through the ``reloader_type`` parameter. 

1055 

1056 .. versionchanged:: 0.9 

1057 A command-line interface was added. 

1058 

1059 .. versionchanged:: 0.8 

1060 ``ssl_context`` can be a tuple of paths to the certificate and 

1061 private key files. 

1062 

1063 .. versionchanged:: 0.6 

1064 The ``ssl_context`` parameter was added. 

1065 

1066 .. versionchanged:: 0.5 

1067 The ``static_files`` and ``passthrough_errors`` parameters were 

1068 added. 

1069 """ 

1070 if not isinstance(port, int): 

1071 raise TypeError("port must be an integer") 

1072 

1073 if static_files: 

1074 from .middleware.shared_data import SharedDataMiddleware 

1075 

1076 application = SharedDataMiddleware(application, static_files) 

1077 

1078 if use_debugger: 

1079 from .debug import DebuggedApplication 

1080 

1081 application = DebuggedApplication(application, evalex=use_evalex) 

1082 # Allow the specified hostname to use the debugger, in addition to 

1083 # localhost domains. 

1084 application.trusted_hosts.append(hostname) 

1085 

1086 if not is_running_from_reloader(): 

1087 fd = None 

1088 else: 

1089 fd = int(os.environ["WERKZEUG_SERVER_FD"]) 

1090 

1091 srv = make_server( 

1092 hostname, 

1093 port, 

1094 application, 

1095 threaded, 

1096 processes, 

1097 request_handler, 

1098 passthrough_errors, 

1099 ssl_context, 

1100 fd=fd, 

1101 ) 

1102 srv.socket.set_inheritable(True) 

1103 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno()) 

1104 

1105 if not is_running_from_reloader(): 

1106 srv.log_startup() 

1107 _log("info", _ansi_style("Press CTRL+C to quit", "yellow")) 

1108 

1109 if use_reloader: 

1110 from ._reloader import run_with_reloader 

1111 

1112 try: 

1113 run_with_reloader( 

1114 srv.serve_forever, 

1115 extra_files=extra_files, 

1116 exclude_patterns=exclude_patterns, 

1117 interval=reloader_interval, 

1118 reloader_type=reloader_type, 

1119 ) 

1120 finally: 

1121 srv.server_close() 

1122 else: 

1123 srv.serve_forever()