Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/serving.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

486 statements  

1"""A WSGI and HTTP server for use **during development only**. This 

2server is convenient to use, but is not designed to be particularly 

3stable, secure, or efficient. Use a dedicate WSGI server and HTTP 

4server when deploying to production. 

5 

6It provides features like interactive debugging and code reloading. Use 

7``run_simple`` to start the server. Put this in a ``run.py`` script: 

8 

9.. code-block:: python 

10 

11 from myapp import create_app 

12 from werkzeug import run_simple 

13""" 

14 

15from __future__ import annotations 

16 

17import errno 

18import io 

19import os 

20import selectors 

21import socket 

22import socketserver 

23import sys 

24import typing as t 

25from datetime import datetime as dt 

26from datetime import timedelta 

27from datetime import timezone 

28from http.server import BaseHTTPRequestHandler 

29from http.server import HTTPServer 

30from urllib.parse import unquote 

31from urllib.parse import urlsplit 

32 

33from ._internal import _log 

34from ._internal import _wsgi_encoding_dance 

35from .exceptions import InternalServerError 

36from .http import parse_set_header 

37from .urls import uri_to_iri 

38 

39try: 

40 import ssl 

41 

42 connection_dropped_errors: tuple[type[Exception], ...] = ( 

43 ConnectionError, 

44 socket.timeout, 

45 ssl.SSLEOFError, 

46 ) 

47except ImportError: 

48 

49 class _SslDummy: 

50 def __getattr__(self, name: str) -> t.Any: 

51 raise RuntimeError( # noqa: B904 

52 "SSL is unavailable because this Python runtime was not" 

53 " compiled with SSL/TLS support." 

54 ) 

55 

56 ssl = _SslDummy() # type: ignore 

57 connection_dropped_errors = (ConnectionError, socket.timeout) 

58 

59_log_add_style = True 

60 

61if os.name == "nt": 

62 try: 

63 __import__("colorama") 

64 except ImportError: 

65 _log_add_style = False 

66 

67can_fork = hasattr(os, "fork") 

68 

69if can_fork: 

70 ForkingMixIn = socketserver.ForkingMixIn 

71else: 

72 

73 class ForkingMixIn: # type: ignore 

74 pass 

75 

76 

77try: 

78 af_unix = socket.AF_UNIX 

79except AttributeError: 

80 af_unix = None # type: ignore 

81 

82LISTEN_QUEUE = 128 

83 

84_TSSLContextArg = t.Optional[ 

85 t.Union["ssl.SSLContext", tuple[str, t.Optional[str]], t.Literal["adhoc"]] 

86] 

87 

88if t.TYPE_CHECKING: 

89 from _typeshed.wsgi import WSGIApplication 

90 from _typeshed.wsgi import WSGIEnvironment 

91 from cryptography.hazmat.primitives.asymmetric.rsa import ( 

92 RSAPrivateKeyWithSerialization, 

93 ) 

94 from cryptography.x509 import Certificate 

95 

96 

97class DechunkedInput(io.RawIOBase): 

98 """An input stream that handles Transfer-Encoding 'chunked'""" 

99 

100 def __init__(self, rfile: t.IO[bytes]) -> None: 

101 self._rfile = rfile 

102 self._done = False 

103 self._len = 0 

104 

105 def readable(self) -> bool: 

106 return True 

107 

108 def read_chunk_len(self) -> int: 

109 try: 

110 line = self._rfile.readline().decode("latin1") 

111 _len = int(line.strip(), 16) 

112 except ValueError as e: 

113 raise OSError("Invalid chunk header") from e 

114 if _len < 0: 

115 raise OSError("Negative chunk length not allowed") 

116 return _len 

117 

118 def readinto(self, buf: bytearray) -> int: # type: ignore 

119 read = 0 

120 while not self._done and read < len(buf): 

121 if self._len == 0: 

122 # This is the first chunk or we fully consumed the previous 

123 # one. Read the next length of the next chunk 

124 self._len = self.read_chunk_len() 

125 

126 if self._len == 0: 

127 # Found the final chunk of size 0. The stream is now exhausted, 

128 # but there is still a final newline that should be consumed 

129 self._done = True 

130 

131 if self._len > 0: 

132 # There is data (left) in this chunk, so append it to the 

133 # buffer. If this operation fully consumes the chunk, this will 

134 # reset self._len to 0. 

135 n = min(len(buf), self._len) 

136 

137 # If (read + chunk size) becomes more than len(buf), buf will 

138 # grow beyond the original size and read more data than 

139 # required. So only read as much data as can fit in buf. 

140 if read + n > len(buf): 

141 buf[read:] = self._rfile.read(len(buf) - read) 

142 self._len -= len(buf) - read 

143 read = len(buf) 

144 else: 

145 buf[read : read + n] = self._rfile.read(n) 

146 self._len -= n 

147 read += n 

148 

149 if self._len == 0: 

150 # Skip the terminating newline of a chunk that has been fully 

151 # consumed. This also applies to the 0-sized final chunk 

152 terminator = self._rfile.readline() 

153 if terminator not in (b"\n", b"\r\n", b"\r"): 

154 raise OSError("Missing chunk terminating newline") 

155 

156 return read 

157 

158 

159class WSGIRequestHandler(BaseHTTPRequestHandler): 

160 """A request handler that implements WSGI dispatching.""" 

161 

162 server: BaseWSGIServer 

163 

164 @property 

165 def server_version(self) -> str: # type: ignore 

166 return self.server._server_version 

167 

168 def make_environ(self) -> WSGIEnvironment: 

169 request_url = urlsplit(self.path) 

170 url_scheme = "http" if self.server.ssl_context is None else "https" 

171 

172 if not self.client_address: 

173 self.client_address = ("<local>", 0) 

174 elif isinstance(self.client_address, str): 

175 self.client_address = (self.client_address, 0) 

176 

177 # If there was no scheme but the path started with two slashes, 

178 # the first segment may have been incorrectly parsed as the 

179 # netloc, prepend it to the path again. 

180 if not request_url.scheme and request_url.netloc: 

181 path_info = f"/{request_url.netloc}{request_url.path}" 

182 else: 

183 path_info = request_url.path 

184 

185 path_info = unquote(path_info) 

186 

187 environ: WSGIEnvironment = { 

188 "wsgi.version": (1, 0), 

189 "wsgi.url_scheme": url_scheme, 

190 "wsgi.input": self.rfile, 

191 "wsgi.errors": sys.stderr, 

192 "wsgi.multithread": self.server.multithread, 

193 "wsgi.multiprocess": self.server.multiprocess, 

194 "wsgi.run_once": False, 

195 "werkzeug.socket": self.connection, 

196 "SERVER_SOFTWARE": self.server_version, 

197 "REQUEST_METHOD": self.command, 

198 "SCRIPT_NAME": "", 

199 "PATH_INFO": _wsgi_encoding_dance(path_info), 

200 "QUERY_STRING": _wsgi_encoding_dance(request_url.query), 

201 # Non-standard, added by mod_wsgi, uWSGI 

202 "REQUEST_URI": _wsgi_encoding_dance(self.path), 

203 # Non-standard, added by gunicorn 

204 "RAW_URI": _wsgi_encoding_dance(self.path), 

205 "REMOTE_ADDR": self.address_string(), 

206 "REMOTE_PORT": self.port_integer(), 

207 "SERVER_NAME": self.server.server_address[0], 

208 "SERVER_PORT": str(self.server.server_address[1]), 

209 "SERVER_PROTOCOL": self.request_version, 

210 } 

211 

212 for key, value in self.headers.items(): 

213 if "_" in key: 

214 continue 

215 

216 key = key.upper().replace("-", "_") 

217 value = value.replace("\r\n", "") 

218 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): 

219 key = f"HTTP_{key}" 

220 if key in environ: 

221 value = f"{environ[key]},{value}" 

222 environ[key] = value 

223 

224 if "chunked" in parse_set_header(environ.get("HTTP_TRANSFER_ENCODING")): 

225 environ["wsgi.input_terminated"] = True 

226 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"]) 

227 

228 # Per RFC 2616, if the URL is absolute, use that as the host. 

229 # We're using "has a scheme" to indicate an absolute URL. 

230 if request_url.scheme and request_url.netloc: 

231 environ["HTTP_HOST"] = request_url.netloc 

232 

233 try: 

234 # binary_form=False gives nicer information, but wouldn't be compatible with 

235 # what Nginx or Apache could return. 

236 peer_cert = self.connection.getpeercert(binary_form=True) 

237 if peer_cert is not None: 

238 # Nginx and Apache use PEM format. 

239 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert) 

240 except ValueError: 

241 # SSL handshake hasn't finished. 

242 self.server.log("error", "Cannot fetch SSL peer certificate info") 

243 except AttributeError: 

244 # Not using TLS, the socket will not have getpeercert(). 

245 pass 

246 

247 return environ 

248 

249 def run_wsgi(self) -> None: 

250 if self.headers.get("Expect", "").lower().strip() == "100-continue": 

251 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

252 

253 self.environ = environ = self.make_environ() 

254 status_set: str | None = None 

255 headers_set: list[tuple[str, str]] | None = None 

256 status_sent: str | None = None 

257 headers_sent: list[tuple[str, str]] | None = None 

258 chunk_response: bool = False 

259 

260 def write(data: bytes) -> None: 

261 nonlocal status_sent, headers_sent, chunk_response 

262 assert status_set is not None, "write() before start_response" 

263 assert headers_set is not None, "write() before start_response" 

264 if status_sent is None: 

265 status_sent = status_set 

266 headers_sent = headers_set 

267 try: 

268 code_str, msg = status_sent.split(None, 1) 

269 except ValueError: 

270 code_str, msg = status_sent, "" 

271 code = int(code_str) 

272 self.send_response(code, msg) 

273 header_keys = set() 

274 for key, value in headers_sent: 

275 self.send_header(key, value) 

276 header_keys.add(key.lower()) 

277 

278 # Use chunked transfer encoding if there is no content 

279 # length. Do not use for 1xx and 204 responses. 304 

280 # responses and HEAD requests are also excluded, which 

281 # is the more conservative behavior and matches other 

282 # parts of the code. 

283 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1 

284 if ( 

285 not ( 

286 "content-length" in header_keys 

287 or environ["REQUEST_METHOD"] == "HEAD" 

288 or (100 <= code < 200) 

289 or code in {204, 304} 

290 ) 

291 and self.protocol_version >= "HTTP/1.1" 

292 ): 

293 chunk_response = True 

294 self.send_header("Transfer-Encoding", "chunked") 

295 

296 # Always close the connection. This disables HTTP/1.1 

297 # keep-alive connections. They aren't handled well by 

298 # Python's http.server because it doesn't know how to 

299 # drain the stream before the next request line. 

300 self.send_header("Connection", "close") 

301 self.end_headers() 

302 

303 assert isinstance(data, bytes), "applications must write bytes" 

304 

305 if data: 

306 if chunk_response: 

307 self.wfile.write(hex(len(data))[2:].encode()) 

308 self.wfile.write(b"\r\n") 

309 

310 self.wfile.write(data) 

311 

312 if chunk_response: 

313 self.wfile.write(b"\r\n") 

314 

315 self.wfile.flush() 

316 

317 def start_response(status, headers, exc_info=None): # type: ignore 

318 nonlocal status_set, headers_set 

319 if exc_info: 

320 try: 

321 if headers_sent: 

322 raise exc_info[1].with_traceback(exc_info[2]) 

323 finally: 

324 exc_info = None 

325 elif headers_set: 

326 raise AssertionError("Headers already set") 

327 status_set = status 

328 headers_set = headers 

329 return write 

330 

331 def execute(app: WSGIApplication) -> None: 

332 application_iter = app(environ, start_response) 

333 try: 

334 for data in application_iter: 

335 write(data) 

336 if not headers_sent: 

337 write(b"") 

338 if chunk_response: 

339 self.wfile.write(b"0\r\n\r\n") 

340 finally: 

341 # Check for any remaining data in the read socket, and discard it. This 

342 # will read past request.max_content_length, but lets the client see a 

343 # 413 response instead of a connection reset failure. If we supported 

344 # keep-alive connections, this naive approach would break by reading the 

345 # next request line. Since we know that write (above) closes every 

346 # connection we can read everything. 

347 selector = selectors.DefaultSelector() 

348 selector.register(self.connection, selectors.EVENT_READ) 

349 total_size = 0 

350 total_reads = 0 

351 

352 # A timeout of 0 tends to fail because a client needs a small amount of 

353 # time to continue sending its data. 

354 while selector.select(timeout=0.01): 

355 # Only read 10MB into memory at a time. 

356 data = self.rfile.read(10_000_000) 

357 total_size += len(data) 

358 total_reads += 1 

359 

360 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends 

361 # more than that, they'll get a connection reset failure. 

362 if not data or total_size >= 10_000_000_000 or total_reads > 1000: 

363 break 

364 

365 selector.close() 

366 

367 if hasattr(application_iter, "close"): 

368 application_iter.close() 

369 

370 try: 

371 execute(self.server.app) 

372 except connection_dropped_errors as e: 

373 self.connection_dropped(e, environ) 

374 except Exception as e: 

375 if self.server.passthrough_errors: 

376 raise 

377 

378 if status_sent is not None and chunk_response: 

379 self.close_connection = True 

380 

381 try: 

382 # if we haven't yet sent the headers but they are set 

383 # we roll back to be able to set them again. 

384 if status_sent is None: 

385 status_set = None 

386 headers_set = None 

387 execute(InternalServerError()) 

388 except Exception: 

389 pass 

390 

391 from .debug.tbtools import DebugTraceback 

392 

393 msg = DebugTraceback(e).render_traceback_text() 

394 self.server.log("error", f"Error on request:\n{msg}") 

395 

396 def handle(self) -> None: 

397 """Handles a request ignoring dropped connections.""" 

398 try: 

399 super().handle() 

400 except (ConnectionError, socket.timeout) as e: 

401 self.connection_dropped(e) 

402 except Exception as e: 

403 if self.server.ssl_context is not None and is_ssl_error(e): 

404 self.log_error("SSL error occurred: %s", e) 

405 else: 

406 raise 

407 

408 def connection_dropped( 

409 self, error: BaseException, environ: WSGIEnvironment | None = None 

410 ) -> None: 

411 """Called if the connection was closed by the client. By default 

412 nothing happens. 

413 """ 

414 

415 def __getattr__(self, name: str) -> t.Any: 

416 # All HTTP methods are handled by run_wsgi. 

417 if name.startswith("do_"): 

418 return self.run_wsgi 

419 

420 # All other attributes are forwarded to the base class. 

421 return getattr(super(), name) 

422 

423 def address_string(self) -> str: 

424 if getattr(self, "environ", None): 

425 return self.environ["REMOTE_ADDR"] # type: ignore 

426 

427 if not self.client_address: 

428 return "<local>" 

429 

430 return self.client_address[0] 

431 

432 def port_integer(self) -> int: 

433 return self.client_address[1] 

434 

435 # Escape control characters. This is defined (but private) in Python 3.12. 

436 _control_char_table = str.maketrans( 

437 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]} 

438 ) 

439 _control_char_table[ord("\\")] = r"\\" 

440 

441 def log_request(self, code: int | str = "-", size: int | str = "-") -> None: 

442 try: 

443 path = uri_to_iri(self.path) 

444 msg = f"{self.command} {path} {self.request_version}" 

445 except AttributeError: 

446 # path isn't set if the requestline was bad 

447 msg = self.requestline 

448 

449 # Escape control characters that may be in the decoded path. 

450 msg = msg.translate(self._control_char_table) 

451 code = str(code) 

452 

453 if code[0] == "1": # 1xx - Informational 

454 msg = _ansi_style(msg, "bold") 

455 elif code == "200": # 2xx - Success 

456 pass 

457 elif code == "304": # 304 - Resource Not Modified 

458 msg = _ansi_style(msg, "cyan") 

459 elif code[0] == "3": # 3xx - Redirection 

460 msg = _ansi_style(msg, "green") 

461 elif code == "404": # 404 - Resource Not Found 

462 msg = _ansi_style(msg, "yellow") 

463 elif code[0] == "4": # 4xx - Client Error 

464 msg = _ansi_style(msg, "bold", "red") 

465 else: # 5xx, or any other response 

466 msg = _ansi_style(msg, "bold", "magenta") 

467 

468 self.log("info", '"%s" %s %s', msg, code, size) 

469 

470 def log_error(self, format: str, *args: t.Any) -> None: 

471 self.log("error", format, *args) 

472 

473 def log_message(self, format: str, *args: t.Any) -> None: 

474 self.log("info", format, *args) 

475 

476 def log(self, type: str, message: str, *args: t.Any) -> None: 

477 # an IPv6 scoped address contains "%" which breaks logging 

478 address_string = self.address_string().replace("%", "%%") 

479 _log( 

480 type, 

481 f"{address_string} - - [{self.log_date_time_string()}] {message}\n", 

482 *args, 

483 ) 

484 

485 

486def _ansi_style(value: str, *styles: str) -> str: 

487 if not _log_add_style: 

488 return value 

489 

490 codes = { 

491 "bold": 1, 

492 "red": 31, 

493 "green": 32, 

494 "yellow": 33, 

495 "magenta": 35, 

496 "cyan": 36, 

497 } 

498 

499 for style in styles: 

500 value = f"\x1b[{codes[style]}m{value}" 

501 

502 return f"{value}\x1b[0m" 

503 

504 

505def generate_adhoc_ssl_pair( 

506 cn: str | None = None, 

507) -> tuple[Certificate, RSAPrivateKeyWithSerialization]: 

508 try: 

509 from cryptography import x509 

510 from cryptography.hazmat.backends import default_backend 

511 from cryptography.hazmat.primitives import hashes 

512 from cryptography.hazmat.primitives.asymmetric import rsa 

513 from cryptography.x509.oid import NameOID 

514 except ImportError: 

515 raise TypeError( 

516 "Using ad-hoc certificates requires the cryptography library." 

517 ) from None 

518 

519 backend = default_backend() 

520 pkey = rsa.generate_private_key( 

521 public_exponent=65537, key_size=2048, backend=backend 

522 ) 

523 

524 # pretty damn sure that this is not actually accepted by anyone 

525 if cn is None: 

526 cn = "*" 

527 

528 subject = x509.Name( 

529 [ 

530 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"), 

531 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

532 ] 

533 ) 

534 

535 backend = default_backend() 

536 cert = ( 

537 x509.CertificateBuilder() 

538 .subject_name(subject) 

539 .issuer_name(subject) 

540 .public_key(pkey.public_key()) 

541 .serial_number(x509.random_serial_number()) 

542 .not_valid_before(dt.now(timezone.utc)) 

543 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365)) 

544 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False) 

545 .add_extension( 

546 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]), 

547 critical=False, 

548 ) 

549 .sign(pkey, hashes.SHA256(), backend) 

550 ) 

551 return cert, pkey 

552 

553 

554def make_ssl_devcert( 

555 base_path: str, host: str | None = None, cn: str | None = None 

556) -> tuple[str, str]: 

557 """Creates an SSL key for development. This should be used instead of 

558 the ``'adhoc'`` key which generates a new cert on each server start. 

559 It accepts a path for where it should store the key and cert and 

560 either a host or CN. If a host is given it will use the CN 

561 ``*.host/CN=host``. 

562 

563 For more information see :func:`run_simple`. 

564 

565 .. versionadded:: 0.9 

566 

567 :param base_path: the path to the certificate and key. The extension 

568 ``.crt`` is added for the certificate, ``.key`` is 

569 added for the key. 

570 :param host: the name of the host. This can be used as an alternative 

571 for the `cn`. 

572 :param cn: the `CN` to use. 

573 """ 

574 

575 if host is not None: 

576 cn = host 

577 cert, pkey = generate_adhoc_ssl_pair(cn=cn) 

578 

579 from cryptography.hazmat.primitives import serialization 

580 

581 cert_file = f"{base_path}.crt" 

582 pkey_file = f"{base_path}.key" 

583 

584 with open(cert_file, "wb") as f: 

585 f.write(cert.public_bytes(serialization.Encoding.PEM)) 

586 with open(pkey_file, "wb") as f: 

587 f.write( 

588 pkey.private_bytes( 

589 encoding=serialization.Encoding.PEM, 

590 format=serialization.PrivateFormat.TraditionalOpenSSL, 

591 encryption_algorithm=serialization.NoEncryption(), 

592 ) 

593 ) 

594 

595 return cert_file, pkey_file 

596 

597 

598def generate_adhoc_ssl_context() -> ssl.SSLContext: 

599 """Generates an adhoc SSL context for the development server.""" 

600 import atexit 

601 import tempfile 

602 

603 cert, pkey = generate_adhoc_ssl_pair() 

604 

605 from cryptography.hazmat.primitives import serialization 

606 

607 cert_handle, cert_file = tempfile.mkstemp() 

608 pkey_handle, pkey_file = tempfile.mkstemp() 

609 atexit.register(os.remove, pkey_file) 

610 atexit.register(os.remove, cert_file) 

611 

612 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM)) 

613 os.write( 

614 pkey_handle, 

615 pkey.private_bytes( 

616 encoding=serialization.Encoding.PEM, 

617 format=serialization.PrivateFormat.TraditionalOpenSSL, 

618 encryption_algorithm=serialization.NoEncryption(), 

619 ), 

620 ) 

621 

622 os.close(cert_handle) 

623 os.close(pkey_handle) 

624 ctx = load_ssl_context(cert_file, pkey_file) 

625 return ctx 

626 

627 

628def load_ssl_context( 

629 cert_file: str, pkey_file: str | None = None, protocol: int | None = None 

630) -> ssl.SSLContext: 

631 """Loads SSL context from cert/private key files and optional protocol. 

632 Many parameters are directly taken from the API of 

633 :py:class:`ssl.SSLContext`. 

634 

635 :param cert_file: Path of the certificate to use. 

636 :param pkey_file: Path of the private key to use. If not given, the key 

637 will be obtained from the certificate file. 

638 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module. 

639 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`. 

640 """ 

641 if protocol is None: 

642 protocol = ssl.PROTOCOL_TLS_SERVER 

643 

644 ctx = ssl.SSLContext(protocol) 

645 ctx.load_cert_chain(cert_file, pkey_file) 

646 return ctx 

647 

648 

649def is_ssl_error(error: Exception | None = None) -> bool: 

650 """Checks if the given error (or the current one) is an SSL error.""" 

651 if error is None: 

652 error = t.cast(Exception, sys.exc_info()[1]) 

653 return isinstance(error, ssl.SSLError) 

654 

655 

656def select_address_family(host: str, port: int) -> socket.AddressFamily: 

657 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on 

658 the host and port.""" 

659 if host.startswith("unix://"): 

660 return socket.AF_UNIX 

661 elif ":" in host and hasattr(socket, "AF_INET6"): 

662 return socket.AF_INET6 

663 return socket.AF_INET 

664 

665 

666def get_sockaddr( 

667 host: str, port: int, family: socket.AddressFamily 

668) -> tuple[str, int] | str: 

669 """Return a fully qualified socket address that can be passed to 

670 :func:`socket.bind`.""" 

671 if family == af_unix: 

672 # Absolute path avoids IDNA encoding error when path starts with dot. 

673 return os.path.abspath(host.partition("://")[2]) 

674 try: 

675 res = socket.getaddrinfo( 

676 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP 

677 ) 

678 except socket.gaierror: 

679 return host, port 

680 return res[0][4] # type: ignore 

681 

682 

683def get_interface_ip(family: socket.AddressFamily) -> str: 

684 """Get the IP address of an external interface. Used when binding to 

685 0.0.0.0 or ::1 to show a more useful URL. 

686 

687 :meta private: 

688 """ 

689 # arbitrary private address 

690 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219" 

691 

692 with socket.socket(family, socket.SOCK_DGRAM) as s: 

693 try: 

694 s.connect((host, 58162)) 

695 except OSError: 

696 return "::1" if family == socket.AF_INET6 else "127.0.0.1" 

697 

698 return s.getsockname()[0] # type: ignore 

699 

700 

701class BaseWSGIServer(HTTPServer): 

702 """A WSGI server that that handles one request at a time. 

703 

704 Use :func:`make_server` to create a server instance. 

705 """ 

706 

707 multithread = False 

708 multiprocess = False 

709 request_queue_size = LISTEN_QUEUE 

710 allow_reuse_address = True 

711 

712 def __init__( 

713 self, 

714 host: str, 

715 port: int, 

716 app: WSGIApplication, 

717 handler: type[WSGIRequestHandler] | None = None, 

718 passthrough_errors: bool = False, 

719 ssl_context: _TSSLContextArg | None = None, 

720 fd: int | None = None, 

721 ) -> None: 

722 if handler is None: 

723 handler = WSGIRequestHandler 

724 

725 # If the handler doesn't directly set a protocol version and 

726 # thread or process workers are used, then allow chunked 

727 # responses and keep-alive connections by enabling HTTP/1.1. 

728 if "protocol_version" not in vars(handler) and ( 

729 self.multithread or self.multiprocess 

730 ): 

731 handler.protocol_version = "HTTP/1.1" 

732 

733 self.host = host 

734 self.port = port 

735 self.app = app 

736 self.passthrough_errors = passthrough_errors 

737 

738 self.address_family = address_family = select_address_family(host, port) 

739 server_address = get_sockaddr(host, int(port), address_family) 

740 

741 # Remove a leftover Unix socket file from a previous run. Don't 

742 # remove a file that was set up by run_simple. 

743 if address_family == af_unix and fd is None: 

744 server_address = t.cast(str, server_address) 

745 

746 if os.path.exists(server_address): 

747 os.unlink(server_address) 

748 

749 # Bind and activate will be handled manually, it should only 

750 # happen if we're not using a socket that was already set up. 

751 super().__init__( 

752 server_address, # type: ignore[arg-type] 

753 handler, 

754 bind_and_activate=False, 

755 ) 

756 

757 if fd is None: 

758 # No existing socket descriptor, do bind_and_activate=True. 

759 try: 

760 self.server_bind() 

761 self.server_activate() 

762 except OSError as e: 

763 # Catch connection issues and show them without the traceback. Show 

764 # extra instructions for address not found, and for macOS. 

765 self.server_close() 

766 print(e.strerror, file=sys.stderr) 

767 

768 if e.errno == errno.EADDRINUSE: 

769 print( 

770 f"Port {port} is in use by another program. Either identify and" 

771 " stop that program, or start the server with a different" 

772 " port.", 

773 file=sys.stderr, 

774 ) 

775 

776 if sys.platform == "darwin" and port == 5000: 

777 print( 

778 "On macOS, try searching for and disabling" 

779 " 'AirPlay Receiver' in System Settings.", 

780 file=sys.stderr, 

781 ) 

782 

783 sys.exit(1) 

784 except BaseException: 

785 self.server_close() 

786 raise 

787 else: 

788 # TCPServer automatically opens a socket even if bind_and_activate is False. 

789 # Close it to silence a ResourceWarning. 

790 self.server_close() 

791 

792 # Use the passed in socket directly. 

793 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM) 

794 self.server_address = self.socket.getsockname() 

795 

796 if address_family != af_unix: 

797 # If port was 0, this will record the bound port. 

798 self.port = self.server_address[1] 

799 

800 if ssl_context is not None: 

801 if isinstance(ssl_context, tuple): 

802 ssl_context = load_ssl_context(*ssl_context) 

803 elif ssl_context == "adhoc": 

804 ssl_context = generate_adhoc_ssl_context() 

805 

806 self.socket = ssl_context.wrap_socket(self.socket, server_side=True) 

807 self.ssl_context: ssl.SSLContext | None = ssl_context 

808 else: 

809 self.ssl_context = None 

810 

811 import importlib.metadata 

812 

813 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}" 

814 

815 def log(self, type: str, message: str, *args: t.Any) -> None: 

816 _log(type, message, *args) 

817 

818 def serve_forever(self, poll_interval: float = 0.5) -> None: 

819 try: 

820 super().serve_forever(poll_interval=poll_interval) 

821 except KeyboardInterrupt: 

822 pass 

823 finally: 

824 self.server_close() 

825 

826 def handle_error( 

827 self, request: t.Any, client_address: tuple[str, int] | str 

828 ) -> None: 

829 if self.passthrough_errors: 

830 raise 

831 

832 return super().handle_error(request, client_address) 

833 

834 def log_startup(self) -> None: 

835 """Show information about the address when starting the server.""" 

836 dev_warning = ( 

837 "WARNING: This is a development server. Do not use it in a production" 

838 " deployment. Use a production WSGI server instead." 

839 ) 

840 dev_warning = _ansi_style(dev_warning, "bold", "red") 

841 messages = [dev_warning] 

842 

843 if self.address_family == af_unix: 

844 messages.append(f" * Running on {self.host}") 

845 else: 

846 scheme = "http" if self.ssl_context is None else "https" 

847 display_hostname = self.host 

848 

849 if self.host in {"0.0.0.0", "::"}: 

850 messages.append(f" * Running on all addresses ({self.host})") 

851 

852 if self.host == "0.0.0.0": 

853 localhost = "127.0.0.1" 

854 display_hostname = get_interface_ip(socket.AF_INET) 

855 else: 

856 localhost = "[::1]" 

857 display_hostname = get_interface_ip(socket.AF_INET6) 

858 

859 messages.append(f" * Running on {scheme}://{localhost}:{self.port}") 

860 

861 if ":" in display_hostname: 

862 display_hostname = f"[{display_hostname}]" 

863 

864 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}") 

865 

866 _log("info", "\n".join(messages)) 

867 

868 

869class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer): 

870 """A WSGI server that handles concurrent requests in separate 

871 threads. 

872 

873 Use :func:`make_server` to create a server instance. 

874 """ 

875 

876 multithread = True 

877 daemon_threads = True 

878 

879 

880class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer): 

881 """A WSGI server that handles concurrent requests in separate forked 

882 processes. 

883 

884 Use :func:`make_server` to create a server instance. 

885 """ 

886 

887 multiprocess = True 

888 

889 def __init__( 

890 self, 

891 host: str, 

892 port: int, 

893 app: WSGIApplication, 

894 processes: int = 40, 

895 handler: type[WSGIRequestHandler] | None = None, 

896 passthrough_errors: bool = False, 

897 ssl_context: _TSSLContextArg | None = None, 

898 fd: int | None = None, 

899 ) -> None: 

900 if not can_fork: 

901 raise ValueError("Your platform does not support forking.") 

902 

903 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd) 

904 self.max_children = processes 

905 

906 

907def make_server( 

908 host: str, 

909 port: int, 

910 app: WSGIApplication, 

911 threaded: bool = False, 

912 processes: int = 1, 

913 request_handler: type[WSGIRequestHandler] | None = None, 

914 passthrough_errors: bool = False, 

915 ssl_context: _TSSLContextArg | None = None, 

916 fd: int | None = None, 

917) -> BaseWSGIServer: 

918 """Create an appropriate WSGI server instance based on the value of 

919 ``threaded`` and ``processes``. 

920 

921 This is called from :func:`run_simple`, but can be used separately 

922 to have access to the server object, such as to run it in a separate 

923 thread. 

924 

925 See :func:`run_simple` for parameter docs. 

926 """ 

927 if threaded and processes > 1: 

928 raise ValueError("Cannot have a multi-thread and multi-process server.") 

929 

930 if threaded: 

931 return ThreadedWSGIServer( 

932 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

933 ) 

934 

935 if processes > 1: 

936 return ForkingWSGIServer( 

937 host, 

938 port, 

939 app, 

940 processes, 

941 request_handler, 

942 passthrough_errors, 

943 ssl_context, 

944 fd=fd, 

945 ) 

946 

947 return BaseWSGIServer( 

948 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

949 ) 

950 

951 

952def is_running_from_reloader() -> bool: 

953 """Check if the server is running as a subprocess within the 

954 Werkzeug reloader. 

955 

956 .. versionadded:: 0.10 

957 """ 

958 return os.environ.get("WERKZEUG_RUN_MAIN") == "true" 

959 

960 

961def run_simple( 

962 hostname: str, 

963 port: int, 

964 application: WSGIApplication, 

965 use_reloader: bool = False, 

966 use_debugger: bool = False, 

967 use_evalex: bool = True, 

968 extra_files: t.Iterable[str] | None = None, 

969 exclude_patterns: t.Iterable[str] | None = None, 

970 reloader_interval: int = 1, 

971 reloader_type: str = "auto", 

972 threaded: bool = False, 

973 processes: int = 1, 

974 request_handler: type[WSGIRequestHandler] | None = None, 

975 static_files: dict[str, str | tuple[str, str]] | None = None, 

976 passthrough_errors: bool = False, 

977 ssl_context: _TSSLContextArg | None = None, 

978) -> None: 

979 """Start a development server for a WSGI application. Various 

980 optional features can be enabled. 

981 

982 .. warning:: 

983 

984 Do not use the development server when deploying to production. 

985 It is intended for use only during local development. It is not 

986 designed to be particularly efficient, stable, or secure. 

987 

988 :param hostname: The host to bind to, for example ``'localhost'``. 

989 Can be a domain, IPv4 or IPv6 address, or file path starting 

990 with ``unix://`` for a Unix socket. 

991 :param port: The port to bind to, for example ``8080``. Using ``0`` 

992 tells the OS to pick a random free port. 

993 :param application: The WSGI application to run. 

994 :param use_reloader: Use a reloader process to restart the server 

995 process when files are changed. 

996 :param use_debugger: Use Werkzeug's debugger, which will show 

997 formatted tracebacks on unhandled exceptions. 

998 :param use_evalex: Make the debugger interactive. A Python terminal 

999 can be opened for any frame in the traceback. Some protection is 

1000 provided by requiring a PIN, but this should never be enabled 

1001 on a publicly visible server. 

1002 :param extra_files: The reloader will watch these files for changes 

1003 in addition to Python modules. For example, watch a 

1004 configuration file. 

1005 :param exclude_patterns: The reloader will ignore changes to any 

1006 files matching these :mod:`fnmatch` patterns. For example, 

1007 ignore cache files. 

1008 :param reloader_interval: How often the reloader tries to check for 

1009 changes. 

1010 :param reloader_type: The reloader to use. The ``'stat'`` reloader 

1011 is built in, but may require significant CPU to watch files. The 

1012 ``'watchdog'`` reloader is much more efficient but requires 

1013 installing the ``watchdog`` package first. 

1014 :param threaded: Handle concurrent requests using threads. Cannot be 

1015 used with ``processes``. 

1016 :param processes: Handle concurrent requests using up to this number 

1017 of processes. Cannot be used with ``threaded``. 

1018 :param request_handler: Use a different 

1019 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to 

1020 handle requests. 

1021 :param static_files: A dict mapping URL prefixes to directories to 

1022 serve static files from using 

1023 :class:`~werkzeug.middleware.SharedDataMiddleware`. 

1024 :param passthrough_errors: Don't catch unhandled exceptions at the 

1025 server level, let the server crash instead. If ``use_debugger`` 

1026 is enabled, the debugger will still catch such errors. 

1027 :param ssl_context: Configure TLS to serve over HTTPS. Can be an 

1028 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)`` 

1029 tuple to create a typical context, or the string ``'adhoc'`` to 

1030 generate a temporary self-signed certificate. 

1031 

1032 .. versionchanged:: 2.1 

1033 Instructions are shown for dealing with an "address already in 

1034 use" error. 

1035 

1036 .. versionchanged:: 2.1 

1037 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in 

1038 addition to a real IP. 

1039 

1040 .. versionchanged:: 2.1 

1041 The command-line interface was removed. 

1042 

1043 .. versionchanged:: 2.0 

1044 Running on ``0.0.0.0`` or ``::`` shows a real IP address that 

1045 was bound as well as a warning not to run the development server 

1046 in production. 

1047 

1048 .. versionchanged:: 2.0 

1049 The ``exclude_patterns`` parameter was added. 

1050 

1051 .. versionchanged:: 0.15 

1052 Bind to a Unix socket by passing a ``hostname`` that starts with 

1053 ``unix://``. 

1054 

1055 .. versionchanged:: 0.10 

1056 Improved the reloader and added support for changing the backend 

1057 through the ``reloader_type`` parameter. 

1058 

1059 .. versionchanged:: 0.9 

1060 A command-line interface was added. 

1061 

1062 .. versionchanged:: 0.8 

1063 ``ssl_context`` can be a tuple of paths to the certificate and 

1064 private key files. 

1065 

1066 .. versionchanged:: 0.6 

1067 The ``ssl_context`` parameter was added. 

1068 

1069 .. versionchanged:: 0.5 

1070 The ``static_files`` and ``passthrough_errors`` parameters were 

1071 added. 

1072 """ 

1073 if not isinstance(port, int): 

1074 raise TypeError("port must be an integer") 

1075 

1076 if static_files: 

1077 from .middleware.shared_data import SharedDataMiddleware 

1078 

1079 application = SharedDataMiddleware(application, static_files) 

1080 

1081 if use_debugger: 

1082 from .debug import DebuggedApplication 

1083 

1084 application = DebuggedApplication(application, evalex=use_evalex) 

1085 # Allow the specified hostname to use the debugger, in addition to 

1086 # localhost domains. 

1087 application.trusted_hosts.append(hostname) 

1088 

1089 if not is_running_from_reloader(): 

1090 fd = None 

1091 else: 

1092 fd = int(os.environ["WERKZEUG_SERVER_FD"]) 

1093 

1094 srv = make_server( 

1095 hostname, 

1096 port, 

1097 application, 

1098 threaded, 

1099 processes, 

1100 request_handler, 

1101 passthrough_errors, 

1102 ssl_context, 

1103 fd=fd, 

1104 ) 

1105 srv.socket.set_inheritable(True) 

1106 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno()) 

1107 

1108 if not is_running_from_reloader(): 

1109 srv.log_startup() 

1110 _log("info", _ansi_style("Press CTRL+C to quit", "yellow")) 

1111 

1112 if use_reloader: 

1113 from ._reloader import run_with_reloader 

1114 

1115 try: 

1116 run_with_reloader( 

1117 srv.serve_forever, 

1118 extra_files=extra_files, 

1119 exclude_patterns=exclude_patterns, 

1120 interval=reloader_interval, 

1121 reloader_type=reloader_type, 

1122 ) 

1123 finally: 

1124 srv.server_close() 

1125 else: 

1126 srv.serve_forever()