Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/serving.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

484 statements  

1"""A WSGI and HTTP server for use **during development only**. This 

2server is convenient to use, but is not designed to be particularly 

3stable, secure, or efficient. Use a dedicate WSGI server and HTTP 

4server when deploying to production. 

5 

6It provides features like interactive debugging and code reloading. Use 

7``run_simple`` to start the server. Put this in a ``run.py`` script: 

8 

9.. code-block:: python 

10 

11 from myapp import create_app 

12 from werkzeug import run_simple 

13""" 

14 

15from __future__ import annotations 

16 

17import errno 

18import io 

19import os 

20import selectors 

21import socket 

22import socketserver 

23import sys 

24import typing as t 

25from datetime import datetime as dt 

26from datetime import timedelta 

27from datetime import timezone 

28from http.server import BaseHTTPRequestHandler 

29from http.server import HTTPServer 

30from urllib.parse import unquote 

31from urllib.parse import urlsplit 

32 

33from ._internal import _log 

34from ._internal import _plain_int 

35from ._internal import _wsgi_encoding_dance 

36from .datastructures import HeaderSet 

37from .exceptions import InternalServerError 

38from .urls import uri_to_iri 

39 

40try: 

41 import ssl 

42 

43 connection_dropped_errors: tuple[type[Exception], ...] = ( 

44 ConnectionError, 

45 socket.timeout, 

46 ssl.SSLEOFError, 

47 ) 

48except ImportError: 

49 

50 class _SslDummy: 

51 def __getattr__(self, name: str) -> t.Any: 

52 raise RuntimeError( # noqa: B904 

53 "SSL is unavailable because this Python runtime was not" 

54 " compiled with SSL/TLS support." 

55 ) 

56 

57 ssl = _SslDummy() # type: ignore 

58 connection_dropped_errors = (ConnectionError, socket.timeout) 

59 

60_log_add_style = True 

61 

62if os.name == "nt": 

63 try: 

64 __import__("colorama") 

65 except ImportError: 

66 _log_add_style = False 

67 

68can_fork = hasattr(os, "fork") 

69 

70if can_fork: 

71 ForkingMixIn = socketserver.ForkingMixIn 

72else: 

73 

74 class ForkingMixIn: # type: ignore 

75 pass 

76 

77 

78try: 

79 af_unix = socket.AF_UNIX 

80except AttributeError: 

81 af_unix = None # type: ignore 

82 

83LISTEN_QUEUE = 128 

84 

85_TSSLContextArg = t.Union[ 

86 "ssl.SSLContext", tuple[str, str | None], t.Literal["adhoc"], None 

87] 

88 

89if t.TYPE_CHECKING: 

90 from _typeshed.wsgi import WSGIApplication 

91 from _typeshed.wsgi import WSGIEnvironment 

92 from cryptography.hazmat.primitives.asymmetric.rsa import ( 

93 RSAPrivateKeyWithSerialization, 

94 ) 

95 from cryptography.x509 import Certificate 

96 

97 

98class DechunkedInput(io.RawIOBase): 

99 """An input stream that handles Transfer-Encoding 'chunked'""" 

100 

101 def __init__(self, rfile: t.IO[bytes]) -> None: 

102 self._rfile = rfile 

103 self._done = False 

104 self._len = 0 

105 

106 def readable(self) -> bool: 

107 return True 

108 

109 def read_chunk_len(self) -> int: 

110 try: 

111 line = self._rfile.readline().decode("latin1") 

112 _len = _plain_int(line, 16) 

113 except ValueError as e: 

114 raise OSError("Invalid chunk header") from e 

115 if _len < 0: 

116 raise OSError("Negative chunk length not allowed") 

117 return _len 

118 

119 def readinto(self, buf: bytearray) -> int: # type: ignore 

120 read = 0 

121 while not self._done and read < len(buf): 

122 if self._len == 0: 

123 # This is the first chunk or we fully consumed the previous 

124 # one. Read the next length of the next chunk 

125 self._len = self.read_chunk_len() 

126 

127 if self._len == 0: 

128 # Found the final chunk of size 0. The stream is now exhausted, 

129 # but there is still a final newline that should be consumed 

130 self._done = True 

131 

132 if self._len > 0: 

133 # There is data (left) in this chunk, so append it to the 

134 # buffer. If this operation fully consumes the chunk, this will 

135 # reset self._len to 0. 

136 n = min(len(buf), self._len) 

137 

138 # If (read + chunk size) becomes more than len(buf), buf will 

139 # grow beyond the original size and read more data than 

140 # required. So only read as much data as can fit in buf. 

141 if read + n > len(buf): 

142 buf[read:] = self._rfile.read(len(buf) - read) 

143 self._len -= len(buf) - read 

144 read = len(buf) 

145 else: 

146 buf[read : read + n] = self._rfile.read(n) 

147 self._len -= n 

148 read += n 

149 

150 if self._len == 0: 

151 # Skip the terminating newline of a chunk that has been fully 

152 # consumed. This also applies to the 0-sized final chunk 

153 terminator = self._rfile.readline() 

154 if terminator not in (b"\n", b"\r\n", b"\r"): 

155 raise OSError("Missing chunk terminating newline") 

156 

157 return read 

158 

159 

160class WSGIRequestHandler(BaseHTTPRequestHandler): 

161 """A request handler that implements WSGI dispatching.""" 

162 

163 server: BaseWSGIServer 

164 

165 @property 

166 def server_version(self) -> str: # type: ignore 

167 return self.server._server_version 

168 

169 def make_environ(self) -> WSGIEnvironment: 

170 request_url = urlsplit(self.path) 

171 url_scheme = "http" if self.server.ssl_context is None else "https" 

172 

173 if not self.client_address: 

174 self.client_address = ("<local>", 0) 

175 elif isinstance(self.client_address, str): 

176 self.client_address = (self.client_address, 0) 

177 

178 # If there was no scheme but the path started with two slashes, 

179 # the first segment may have been incorrectly parsed as the 

180 # netloc, prepend it to the path again. 

181 if not request_url.scheme and request_url.netloc: 

182 path_info = f"/{request_url.netloc}{request_url.path}" 

183 else: 

184 path_info = request_url.path 

185 

186 path_info = unquote(path_info) 

187 

188 environ: WSGIEnvironment = { 

189 "wsgi.version": (1, 0), 

190 "wsgi.url_scheme": url_scheme, 

191 "wsgi.input": self.rfile, 

192 "wsgi.errors": sys.stderr, 

193 "wsgi.multithread": self.server.multithread, 

194 "wsgi.multiprocess": self.server.multiprocess, 

195 "wsgi.run_once": False, 

196 "werkzeug.socket": self.connection, 

197 "SERVER_SOFTWARE": self.server_version, 

198 "REQUEST_METHOD": self.command, 

199 "SCRIPT_NAME": "", 

200 "PATH_INFO": _wsgi_encoding_dance(path_info), 

201 "QUERY_STRING": _wsgi_encoding_dance(request_url.query), 

202 # Non-standard, added by mod_wsgi, uWSGI 

203 "REQUEST_URI": _wsgi_encoding_dance(self.path), 

204 # Non-standard, added by gunicorn 

205 "RAW_URI": _wsgi_encoding_dance(self.path), 

206 "REMOTE_ADDR": self.address_string(), 

207 "REMOTE_PORT": self.port_integer(), 

208 "SERVER_NAME": self.server.server_address[0], 

209 "SERVER_PORT": str(self.server.server_address[1]), 

210 "SERVER_PROTOCOL": self.request_version, 

211 } 

212 

213 for key, value in self.headers.items(): 

214 if "_" in key: 

215 continue 

216 

217 key = key.upper().replace("-", "_") 

218 value = value.replace("\r\n", "") 

219 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): 

220 key = f"HTTP_{key}" 

221 if key in environ: 

222 value = f"{environ[key]},{value}" 

223 environ[key] = value 

224 

225 if "chunked" in HeaderSet.from_header(environ.get("HTTP_TRANSFER_ENCODING")): 

226 environ["wsgi.input_terminated"] = True 

227 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"]) 

228 

229 # Per RFC 2616, if the URL is absolute, use that as the host. 

230 # We're using "has a scheme" to indicate an absolute URL. 

231 if request_url.scheme and request_url.netloc: 

232 environ["HTTP_HOST"] = request_url.netloc 

233 

234 try: 

235 # binary_form=False gives nicer information, but wouldn't be compatible with 

236 # what Nginx or Apache could return. 

237 peer_cert = self.connection.getpeercert(binary_form=True) 

238 if peer_cert is not None: 

239 # Nginx and Apache use PEM format. 

240 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert) 

241 except ValueError: 

242 # SSL handshake hasn't finished. 

243 self.server.log("error", "Cannot fetch SSL peer certificate info") 

244 except AttributeError: 

245 # Not using TLS, the socket will not have getpeercert(). 

246 pass 

247 

248 return environ 

249 

250 def run_wsgi(self) -> None: 

251 self.environ = environ = self.make_environ() 

252 status_set: str | None = None 

253 headers_set: list[tuple[str, str]] | None = None 

254 status_sent: str | None = None 

255 headers_sent: list[tuple[str, str]] | None = None 

256 chunk_response: bool = False 

257 

258 def write(data: bytes) -> None: 

259 nonlocal status_sent, headers_sent, chunk_response 

260 assert status_set is not None, "write() before start_response" 

261 assert headers_set is not None, "write() before start_response" 

262 if status_sent is None: 

263 status_sent = status_set 

264 headers_sent = headers_set 

265 try: 

266 code_str, msg = status_sent.split(None, 1) 

267 except ValueError: 

268 code_str, msg = status_sent, "" 

269 code = int(code_str) 

270 self.send_response(code, msg) 

271 header_keys = set() 

272 for key, value in headers_sent: 

273 self.send_header(key, value) 

274 header_keys.add(key.lower()) 

275 

276 # Use chunked transfer encoding if there is no content 

277 # length. Do not use for 1xx and 204 responses. 304 

278 # responses and HEAD requests are also excluded, which 

279 # is the more conservative behavior and matches other 

280 # parts of the code. 

281 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1 

282 if ( 

283 not ( 

284 "content-length" in header_keys 

285 or environ["REQUEST_METHOD"] == "HEAD" 

286 or (100 <= code < 200) 

287 or code in {204, 304} 

288 ) 

289 and self.protocol_version >= "HTTP/1.1" 

290 ): 

291 chunk_response = True 

292 self.send_header("Transfer-Encoding", "chunked") 

293 

294 # Always close the connection. This disables HTTP/1.1 

295 # keep-alive connections. They aren't handled well by 

296 # Python's http.server because it doesn't know how to 

297 # drain the stream before the next request line. 

298 self.send_header("Connection", "close") 

299 self.end_headers() 

300 

301 assert isinstance(data, bytes), "applications must write bytes" 

302 

303 if data: 

304 if chunk_response: 

305 self.wfile.write(hex(len(data))[2:].encode()) 

306 self.wfile.write(b"\r\n") 

307 

308 self.wfile.write(data) 

309 

310 if chunk_response: 

311 self.wfile.write(b"\r\n") 

312 

313 self.wfile.flush() 

314 

315 def start_response(status, headers, exc_info=None): # type: ignore 

316 nonlocal status_set, headers_set 

317 if exc_info: 

318 try: 

319 if headers_sent: 

320 raise exc_info[1].with_traceback(exc_info[2]) 

321 finally: 

322 exc_info = None 

323 elif headers_set: 

324 raise AssertionError("Headers already set") 

325 status_set = status 

326 headers_set = headers 

327 return write 

328 

329 def execute(app: WSGIApplication) -> None: 

330 application_iter = app(environ, start_response) 

331 try: 

332 for data in application_iter: 

333 write(data) 

334 if not headers_sent: 

335 write(b"") 

336 if chunk_response: 

337 self.wfile.write(b"0\r\n\r\n") 

338 finally: 

339 # Check for any remaining data in the read socket, and discard it. This 

340 # will read past request.max_content_length, but lets the client see a 

341 # 413 response instead of a connection reset failure. If we supported 

342 # keep-alive connections, this naive approach would break by reading the 

343 # next request line. Since we know that write (above) closes every 

344 # connection we can read everything. 

345 selector = selectors.DefaultSelector() 

346 selector.register(self.connection, selectors.EVENT_READ) 

347 total_size = 0 

348 total_reads = 0 

349 

350 # A timeout of 0 tends to fail because a client needs a small amount of 

351 # time to continue sending its data. 

352 while selector.select(timeout=0.01): 

353 # Only read 10MB into memory at a time. 

354 data = self.rfile.read(10_000_000) 

355 total_size += len(data) 

356 total_reads += 1 

357 

358 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends 

359 # more than that, they'll get a connection reset failure. 

360 if not data or total_size >= 10_000_000_000 or total_reads > 1000: 

361 break 

362 

363 selector.close() 

364 

365 if hasattr(application_iter, "close"): 

366 application_iter.close() 

367 

368 try: 

369 execute(self.server.app) 

370 except connection_dropped_errors as e: 

371 self.connection_dropped(e, environ) 

372 except Exception as e: 

373 if self.server.passthrough_errors: 

374 raise 

375 

376 if status_sent is not None and chunk_response: 

377 self.close_connection = True 

378 

379 try: 

380 # if we haven't yet sent the headers but they are set 

381 # we roll back to be able to set them again. 

382 if status_sent is None: 

383 status_set = None 

384 headers_set = None 

385 execute(InternalServerError()) 

386 except Exception: 

387 pass 

388 

389 from .debug.tbtools import DebugTraceback 

390 

391 msg = DebugTraceback(e).render_traceback_text() 

392 self.server.log("error", f"Error on request:\n{msg}") 

393 

394 def handle(self) -> None: 

395 """Handles a request ignoring dropped connections.""" 

396 try: 

397 super().handle() 

398 except (ConnectionError, TimeoutError) as e: 

399 self.connection_dropped(e) 

400 except Exception as e: 

401 if self.server.ssl_context is not None and is_ssl_error(e): 

402 self.log_error("SSL error occurred: %s", e) 

403 else: 

404 raise 

405 

406 def connection_dropped( 

407 self, error: BaseException, environ: WSGIEnvironment | None = None 

408 ) -> None: 

409 """Called if the connection was closed by the client. By default 

410 nothing happens. 

411 """ 

412 

413 def __getattr__(self, name: str) -> t.Any: 

414 # All HTTP methods are handled by run_wsgi. 

415 if name.startswith("do_"): 

416 return self.run_wsgi 

417 

418 # All other attributes are forwarded to the base class. 

419 return getattr(super(), name) 

420 

421 def address_string(self) -> str: 

422 if getattr(self, "environ", None): 

423 return self.environ["REMOTE_ADDR"] # type: ignore 

424 

425 if not self.client_address: 

426 return "<local>" 

427 

428 return self.client_address[0] 

429 

430 def port_integer(self) -> int: 

431 return self.client_address[1] 

432 

433 # Escape control characters. This is defined (but private) in Python 3.12. 

434 _control_char_table = str.maketrans( 

435 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]} 

436 ) 

437 _control_char_table[ord("\\")] = r"\\" 

438 

439 def log_request(self, code: int | str = "-", size: int | str = "-") -> None: 

440 try: 

441 path = uri_to_iri(self.path) 

442 msg = f"{self.command} {path} {self.request_version}" 

443 except AttributeError: 

444 # path isn't set if the requestline was bad 

445 msg = self.requestline 

446 

447 # Escape control characters that may be in the decoded path. 

448 msg = msg.translate(self._control_char_table) 

449 code = str(code) 

450 

451 if code[0] == "1": # 1xx - Informational 

452 msg = _ansi_style(msg, "bold") 

453 elif code == "200": # 2xx - Success 

454 pass 

455 elif code == "304": # 304 - Resource Not Modified 

456 msg = _ansi_style(msg, "cyan") 

457 elif code[0] == "3": # 3xx - Redirection 

458 msg = _ansi_style(msg, "green") 

459 elif code == "404": # 404 - Resource Not Found 

460 msg = _ansi_style(msg, "yellow") 

461 elif code[0] == "4": # 4xx - Client Error 

462 msg = _ansi_style(msg, "bold", "red") 

463 else: # 5xx, or any other response 

464 msg = _ansi_style(msg, "bold", "magenta") 

465 

466 self.log("info", '"%s" %s %s', msg, code, size) 

467 

468 def log_error(self, format: str, *args: t.Any) -> None: 

469 self.log("error", format, *args) 

470 

471 def log_message(self, format: str, *args: t.Any) -> None: 

472 self.log("info", format, *args) 

473 

474 def log(self, type: str, message: str, *args: t.Any) -> None: 

475 # an IPv6 scoped address contains "%" which breaks logging 

476 address_string = self.address_string().replace("%", "%%") 

477 _log( 

478 type, 

479 f"{address_string} - - [{self.log_date_time_string()}] {message}\n", 

480 *args, 

481 ) 

482 

483 

484def _ansi_style(value: str, *styles: str) -> str: 

485 if not _log_add_style: 

486 return value 

487 

488 codes = { 

489 "bold": 1, 

490 "red": 31, 

491 "green": 32, 

492 "yellow": 33, 

493 "magenta": 35, 

494 "cyan": 36, 

495 } 

496 

497 for style in styles: 

498 value = f"\x1b[{codes[style]}m{value}" 

499 

500 return f"{value}\x1b[0m" 

501 

502 

503def generate_adhoc_ssl_pair( 

504 cn: str | None = None, 

505) -> tuple[Certificate, RSAPrivateKeyWithSerialization]: 

506 try: 

507 from cryptography import x509 

508 from cryptography.hazmat.backends import default_backend 

509 from cryptography.hazmat.primitives import hashes 

510 from cryptography.hazmat.primitives.asymmetric import rsa 

511 from cryptography.x509.oid import NameOID 

512 except ImportError: 

513 raise TypeError( 

514 "Using ad-hoc certificates requires the cryptography library." 

515 ) from None 

516 

517 backend = default_backend() 

518 pkey = rsa.generate_private_key( 

519 public_exponent=65537, key_size=2048, backend=backend 

520 ) 

521 

522 # pretty damn sure that this is not actually accepted by anyone 

523 if cn is None: 

524 cn = "*" 

525 

526 subject = x509.Name( 

527 [ 

528 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"), 

529 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

530 ] 

531 ) 

532 

533 backend = default_backend() 

534 cert = ( 

535 x509.CertificateBuilder() 

536 .subject_name(subject) 

537 .issuer_name(subject) 

538 .public_key(pkey.public_key()) 

539 .serial_number(x509.random_serial_number()) 

540 .not_valid_before(dt.now(timezone.utc)) 

541 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365)) 

542 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False) 

543 .add_extension( 

544 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]), 

545 critical=False, 

546 ) 

547 .sign(pkey, hashes.SHA256(), backend) 

548 ) 

549 return cert, pkey 

550 

551 

552def make_ssl_devcert( 

553 base_path: str, host: str | None = None, cn: str | None = None 

554) -> tuple[str, str]: 

555 """Creates an SSL key for development. This should be used instead of 

556 the ``'adhoc'`` key which generates a new cert on each server start. 

557 It accepts a path for where it should store the key and cert and 

558 either a host or CN. If a host is given it will use the CN 

559 ``*.host/CN=host``. 

560 

561 For more information see :func:`run_simple`. 

562 

563 .. versionadded:: 0.9 

564 

565 :param base_path: the path to the certificate and key. The extension 

566 ``.crt`` is added for the certificate, ``.key`` is 

567 added for the key. 

568 :param host: the name of the host. This can be used as an alternative 

569 for the `cn`. 

570 :param cn: the `CN` to use. 

571 """ 

572 

573 if host is not None: 

574 cn = host 

575 cert, pkey = generate_adhoc_ssl_pair(cn=cn) 

576 

577 from cryptography.hazmat.primitives import serialization 

578 

579 cert_file = f"{base_path}.crt" 

580 pkey_file = f"{base_path}.key" 

581 

582 with open(cert_file, "wb") as f: 

583 f.write(cert.public_bytes(serialization.Encoding.PEM)) 

584 with open(pkey_file, "wb") as f: 

585 f.write( 

586 pkey.private_bytes( 

587 encoding=serialization.Encoding.PEM, 

588 format=serialization.PrivateFormat.TraditionalOpenSSL, 

589 encryption_algorithm=serialization.NoEncryption(), 

590 ) 

591 ) 

592 

593 return cert_file, pkey_file 

594 

595 

596def generate_adhoc_ssl_context() -> ssl.SSLContext: 

597 """Generates an adhoc SSL context for the development server.""" 

598 import atexit 

599 import tempfile 

600 

601 cert, pkey = generate_adhoc_ssl_pair() 

602 

603 from cryptography.hazmat.primitives import serialization 

604 

605 with tempfile.NamedTemporaryFile("wb", delete=False) as cert_file: 

606 cert_file.write(cert.public_bytes(serialization.Encoding.PEM)) 

607 

608 with tempfile.NamedTemporaryFile("wb", delete=False) as pkey_file: 

609 pkey_file.write( 

610 pkey.private_bytes( 

611 encoding=serialization.Encoding.PEM, 

612 format=serialization.PrivateFormat.TraditionalOpenSSL, 

613 encryption_algorithm=serialization.NoEncryption(), 

614 ) 

615 ) 

616 

617 atexit.register(os.remove, cert_file.name) 

618 atexit.register(os.remove, pkey_file.name) 

619 ctx = load_ssl_context(cert_file.name, pkey_file.name) 

620 return ctx 

621 

622 

623def load_ssl_context( 

624 cert_file: str, pkey_file: str | None = None, protocol: int | None = None 

625) -> ssl.SSLContext: 

626 """Loads SSL context from cert/private key files and optional protocol. 

627 Many parameters are directly taken from the API of 

628 :py:class:`ssl.SSLContext`. 

629 

630 :param cert_file: Path of the certificate to use. 

631 :param pkey_file: Path of the private key to use. If not given, the key 

632 will be obtained from the certificate file. 

633 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module. 

634 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`. 

635 """ 

636 if protocol is None: 

637 protocol = ssl.PROTOCOL_TLS_SERVER 

638 

639 ctx = ssl.SSLContext(protocol) 

640 ctx.load_cert_chain(cert_file, pkey_file) 

641 return ctx 

642 

643 

644def is_ssl_error(error: Exception | None = None) -> bool: 

645 """Checks if the given error (or the current one) is an SSL error.""" 

646 if error is None: 

647 error = t.cast(Exception, sys.exc_info()[1]) 

648 return isinstance(error, ssl.SSLError) 

649 

650 

651def select_address_family(host: str, port: int) -> socket.AddressFamily: 

652 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on 

653 the host and port.""" 

654 if host.startswith("unix://"): 

655 return socket.AF_UNIX 

656 elif ":" in host and hasattr(socket, "AF_INET6"): 

657 return socket.AF_INET6 

658 return socket.AF_INET 

659 

660 

661def get_sockaddr( 

662 host: str, port: int, family: socket.AddressFamily 

663) -> tuple[str, int] | str: 

664 """Return a fully qualified socket address that can be passed to 

665 :func:`socket.bind`.""" 

666 if family == af_unix: 

667 # Absolute path avoids IDNA encoding error when path starts with dot. 

668 return os.path.abspath(host.partition("://")[2]) 

669 try: 

670 res = socket.getaddrinfo( 

671 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP 

672 ) 

673 except socket.gaierror: 

674 return host, port 

675 return res[0][4] # type: ignore 

676 

677 

678def get_interface_ip(family: socket.AddressFamily) -> str: 

679 """Get the IP address of an external interface. Used when binding to 

680 0.0.0.0 or ::1 to show a more useful URL. 

681 

682 :meta private: 

683 """ 

684 # arbitrary private address 

685 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219" 

686 

687 with socket.socket(family, socket.SOCK_DGRAM) as s: 

688 try: 

689 s.connect((host, 58162)) 

690 except OSError: 

691 return "::1" if family == socket.AF_INET6 else "127.0.0.1" 

692 

693 return s.getsockname()[0] # type: ignore 

694 

695 

696class BaseWSGIServer(HTTPServer): 

697 """A WSGI server that that handles one request at a time. 

698 

699 Use :func:`make_server` to create a server instance. 

700 """ 

701 

702 multithread = False 

703 multiprocess = False 

704 request_queue_size = LISTEN_QUEUE 

705 allow_reuse_address = True 

706 

707 def __init__( 

708 self, 

709 host: str, 

710 port: int, 

711 app: WSGIApplication, 

712 handler: type[WSGIRequestHandler] | None = None, 

713 passthrough_errors: bool = False, 

714 ssl_context: _TSSLContextArg = None, 

715 fd: int | None = None, 

716 ) -> None: 

717 if handler is None: 

718 handler = WSGIRequestHandler 

719 

720 # If the handler doesn't directly set a protocol version and 

721 # thread or process workers are used, then allow chunked 

722 # responses and keep-alive connections by enabling HTTP/1.1. 

723 if "protocol_version" not in vars(handler) and ( 

724 self.multithread or self.multiprocess 

725 ): 

726 handler.protocol_version = "HTTP/1.1" 

727 

728 self.host = host 

729 self.port = port 

730 self.app = app 

731 self.passthrough_errors = passthrough_errors 

732 

733 self.address_family = address_family = select_address_family(host, port) 

734 server_address = get_sockaddr(host, int(port), address_family) 

735 

736 # Remove a leftover Unix socket file from a previous run. Don't 

737 # remove a file that was set up by run_simple. 

738 if address_family == af_unix and fd is None: 

739 server_address = t.cast(str, server_address) 

740 

741 if os.path.exists(server_address): 

742 os.unlink(server_address) 

743 

744 # Bind and activate will be handled manually, it should only 

745 # happen if we're not using a socket that was already set up. 

746 super().__init__( 

747 server_address, # type: ignore[arg-type] 

748 handler, 

749 bind_and_activate=False, 

750 ) 

751 

752 if fd is None: 

753 # No existing socket descriptor, do bind_and_activate=True. 

754 try: 

755 self.server_bind() 

756 self.server_activate() 

757 except OSError as e: 

758 # Catch connection issues and show them without the traceback. Show 

759 # extra instructions for address not found, and for macOS. 

760 self.server_close() 

761 print(e.strerror, file=sys.stderr) 

762 

763 if e.errno == errno.EADDRINUSE: 

764 print( 

765 f"Port {port} is in use by another program. Either identify and" 

766 " stop that program, or start the server with a different" 

767 " port.", 

768 file=sys.stderr, 

769 ) 

770 

771 if sys.platform == "darwin" and port == 5000: 

772 print( 

773 "On macOS, try searching for and disabling" 

774 " 'AirPlay Receiver' in System Settings.", 

775 file=sys.stderr, 

776 ) 

777 

778 sys.exit(1) 

779 except BaseException: 

780 self.server_close() 

781 raise 

782 else: 

783 # TCPServer automatically opens a socket even if bind_and_activate is False. 

784 # Close it to silence a ResourceWarning. 

785 self.server_close() 

786 

787 # Use the passed in socket directly. 

788 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM) 

789 self.server_address = self.socket.getsockname() 

790 

791 if address_family != af_unix: 

792 # If port was 0, this will record the bound port. 

793 self.port = self.server_address[1] 

794 

795 if ssl_context is not None: 

796 if isinstance(ssl_context, tuple): 

797 ssl_context = load_ssl_context(*ssl_context) 

798 elif ssl_context == "adhoc": 

799 ssl_context = generate_adhoc_ssl_context() 

800 

801 self.socket = ssl_context.wrap_socket(self.socket, server_side=True) 

802 self.ssl_context: ssl.SSLContext | None = ssl_context 

803 else: 

804 self.ssl_context = None 

805 

806 import importlib.metadata 

807 

808 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}" 

809 

810 def log(self, type: str, message: str, *args: t.Any) -> None: 

811 _log(type, message, *args) 

812 

813 def serve_forever(self, poll_interval: float = 0.5) -> None: 

814 try: 

815 super().serve_forever(poll_interval=poll_interval) 

816 except KeyboardInterrupt: 

817 pass 

818 finally: 

819 self.server_close() 

820 

821 def handle_error( 

822 self, request: t.Any, client_address: tuple[str, int] | str 

823 ) -> None: 

824 if self.passthrough_errors: 

825 raise 

826 

827 return super().handle_error(request, client_address) 

828 

829 def log_startup(self) -> None: 

830 """Show information about the address when starting the server.""" 

831 dev_warning = ( 

832 "WARNING: This is a development server. Do not use it in a production" 

833 " deployment. Use a production WSGI server instead." 

834 ) 

835 dev_warning = _ansi_style(dev_warning, "bold", "red") 

836 messages = [dev_warning] 

837 

838 if self.address_family == af_unix: 

839 messages.append(f" * Running on {self.host}") 

840 else: 

841 scheme = "http" if self.ssl_context is None else "https" 

842 display_hostname = self.host 

843 

844 if self.host in {"0.0.0.0", "::"}: 

845 messages.append(f" * Running on all addresses ({self.host})") 

846 

847 if self.host == "0.0.0.0": 

848 localhost = "127.0.0.1" 

849 display_hostname = get_interface_ip(socket.AF_INET) 

850 else: 

851 localhost = "[::1]" 

852 display_hostname = get_interface_ip(socket.AF_INET6) 

853 

854 messages.append(f" * Running on {scheme}://{localhost}:{self.port}") 

855 

856 if ":" in display_hostname: 

857 display_hostname = f"[{display_hostname}]" 

858 

859 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}") 

860 

861 _log("info", "\n".join(messages)) 

862 

863 

864class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer): 

865 """A WSGI server that handles concurrent requests in separate 

866 threads. 

867 

868 Use :func:`make_server` to create a server instance. 

869 """ 

870 

871 multithread = True 

872 daemon_threads = True 

873 

874 

875class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer): 

876 """A WSGI server that handles concurrent requests in separate forked 

877 processes. 

878 

879 Use :func:`make_server` to create a server instance. 

880 """ 

881 

882 multiprocess = True 

883 

884 def __init__( 

885 self, 

886 host: str, 

887 port: int, 

888 app: WSGIApplication, 

889 processes: int = 40, 

890 handler: type[WSGIRequestHandler] | None = None, 

891 passthrough_errors: bool = False, 

892 ssl_context: _TSSLContextArg = None, 

893 fd: int | None = None, 

894 ) -> None: 

895 if not can_fork: 

896 raise ValueError("Your platform does not support forking.") 

897 

898 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd) 

899 self.max_children = processes 

900 

901 

902def make_server( 

903 host: str, 

904 port: int, 

905 app: WSGIApplication, 

906 threaded: bool = False, 

907 processes: int = 1, 

908 request_handler: type[WSGIRequestHandler] | None = None, 

909 passthrough_errors: bool = False, 

910 ssl_context: _TSSLContextArg = None, 

911 fd: int | None = None, 

912) -> BaseWSGIServer: 

913 """Create an appropriate WSGI server instance based on the value of 

914 ``threaded`` and ``processes``. 

915 

916 This is called from :func:`run_simple`, but can be used separately 

917 to have access to the server object, such as to run it in a separate 

918 thread. 

919 

920 See :func:`run_simple` for parameter docs. 

921 """ 

922 if threaded and processes > 1: 

923 raise ValueError("Cannot have a multi-thread and multi-process server.") 

924 

925 if threaded: 

926 return ThreadedWSGIServer( 

927 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

928 ) 

929 

930 if processes > 1: 

931 return ForkingWSGIServer( 

932 host, 

933 port, 

934 app, 

935 processes, 

936 request_handler, 

937 passthrough_errors, 

938 ssl_context, 

939 fd=fd, 

940 ) 

941 

942 return BaseWSGIServer( 

943 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

944 ) 

945 

946 

947def is_running_from_reloader() -> bool: 

948 """Check if the server is running as a subprocess within the 

949 Werkzeug reloader. 

950 

951 .. versionadded:: 0.10 

952 """ 

953 return os.environ.get("WERKZEUG_RUN_MAIN") == "true" 

954 

955 

956def run_simple( 

957 hostname: str, 

958 port: int, 

959 application: WSGIApplication, 

960 use_reloader: bool = False, 

961 use_debugger: bool = False, 

962 use_evalex: bool = True, 

963 extra_files: t.Iterable[str] | None = None, 

964 exclude_patterns: t.Iterable[str] | None = None, 

965 reloader_interval: int = 1, 

966 reloader_type: str = "auto", 

967 threaded: bool = False, 

968 processes: int = 1, 

969 request_handler: type[WSGIRequestHandler] | None = None, 

970 static_files: dict[str, str | tuple[str, str]] | None = None, 

971 passthrough_errors: bool = False, 

972 ssl_context: _TSSLContextArg = None, 

973) -> None: 

974 """Start a development server for a WSGI application. Various 

975 optional features can be enabled. 

976 

977 .. warning:: 

978 

979 Do not use the development server when deploying to production. 

980 It is intended for use only during local development. It is not 

981 designed to be particularly efficient, stable, or secure. 

982 

983 :param hostname: The host to bind to, for example ``'localhost'``. 

984 Can be a domain, IPv4 or IPv6 address, or file path starting 

985 with ``unix://`` for a Unix socket. 

986 :param port: The port to bind to, for example ``8080``. Using ``0`` 

987 tells the OS to pick a random free port. 

988 :param application: The WSGI application to run. 

989 :param use_reloader: Use a reloader process to restart the server 

990 process when files are changed. 

991 :param use_debugger: Use Werkzeug's debugger, which will show 

992 formatted tracebacks on unhandled exceptions. 

993 :param use_evalex: Make the debugger interactive. A Python terminal 

994 can be opened for any frame in the traceback. Some protection is 

995 provided by requiring a PIN, but this should never be enabled 

996 on a publicly visible server. 

997 :param extra_files: The reloader will watch these files for changes 

998 in addition to Python modules. For example, watch a 

999 configuration file. 

1000 :param exclude_patterns: The reloader will ignore changes to any 

1001 files matching these :mod:`fnmatch` patterns. For example, 

1002 ignore cache files. 

1003 :param reloader_interval: How often the reloader tries to check for 

1004 changes. 

1005 :param reloader_type: The reloader to use. The ``'stat'`` reloader 

1006 is built in, but may require significant CPU to watch files. The 

1007 ``'watchdog'`` reloader is much more efficient but requires 

1008 installing the ``watchdog`` package first. 

1009 :param threaded: Handle concurrent requests using threads. Cannot be 

1010 used with ``processes``. 

1011 :param processes: Handle concurrent requests using up to this number 

1012 of processes. Cannot be used with ``threaded``. 

1013 :param request_handler: Use a different 

1014 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to 

1015 handle requests. 

1016 :param static_files: A dict mapping URL prefixes to directories to 

1017 serve static files from using 

1018 :class:`~werkzeug.middleware.SharedDataMiddleware`. 

1019 :param passthrough_errors: Don't catch unhandled exceptions at the 

1020 server level, let the server crash instead. If ``use_debugger`` 

1021 is enabled, the debugger will still catch such errors. 

1022 :param ssl_context: Configure TLS to serve over HTTPS. Can be an 

1023 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)`` 

1024 tuple to create a typical context, or the string ``'adhoc'`` to 

1025 generate a temporary self-signed certificate. 

1026 

1027 .. versionchanged:: 2.1 

1028 Instructions are shown for dealing with an "address already in 

1029 use" error. 

1030 

1031 .. versionchanged:: 2.1 

1032 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in 

1033 addition to a real IP. 

1034 

1035 .. versionchanged:: 2.1 

1036 The command-line interface was removed. 

1037 

1038 .. versionchanged:: 2.0 

1039 Running on ``0.0.0.0`` or ``::`` shows a real IP address that 

1040 was bound as well as a warning not to run the development server 

1041 in production. 

1042 

1043 .. versionchanged:: 2.0 

1044 The ``exclude_patterns`` parameter was added. 

1045 

1046 .. versionchanged:: 0.15 

1047 Bind to a Unix socket by passing a ``hostname`` that starts with 

1048 ``unix://``. 

1049 

1050 .. versionchanged:: 0.10 

1051 Improved the reloader and added support for changing the backend 

1052 through the ``reloader_type`` parameter. 

1053 

1054 .. versionchanged:: 0.9 

1055 A command-line interface was added. 

1056 

1057 .. versionchanged:: 0.8 

1058 ``ssl_context`` can be a tuple of paths to the certificate and 

1059 private key files. 

1060 

1061 .. versionchanged:: 0.6 

1062 The ``ssl_context`` parameter was added. 

1063 

1064 .. versionchanged:: 0.5 

1065 The ``static_files`` and ``passthrough_errors`` parameters were 

1066 added. 

1067 """ 

1068 if not isinstance(port, int): 

1069 raise TypeError("port must be an integer") 

1070 

1071 if static_files: 

1072 from .middleware.shared_data import SharedDataMiddleware 

1073 

1074 application = SharedDataMiddleware(application, static_files) 

1075 

1076 if use_debugger: 

1077 from .debug import DebuggedApplication 

1078 

1079 application = DebuggedApplication(application, evalex=use_evalex) 

1080 

1081 # Allow the specified hostname to use the debugger, in addition to 

1082 # localhost domains. 

1083 if hostname not in {"0.0.0.0", "[::]"}: 

1084 application.trusted_hosts.append(hostname) 

1085 

1086 if not is_running_from_reloader(): 

1087 fd = None 

1088 else: 

1089 fd = int(os.environ["WERKZEUG_SERVER_FD"]) 

1090 

1091 srv = make_server( 

1092 hostname, 

1093 port, 

1094 application, 

1095 threaded, 

1096 processes, 

1097 request_handler, 

1098 passthrough_errors, 

1099 ssl_context, 

1100 fd=fd, 

1101 ) 

1102 srv.socket.set_inheritable(True) 

1103 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno()) 

1104 

1105 if not is_running_from_reloader(): 

1106 srv.log_startup() 

1107 _log("info", _ansi_style("Press CTRL+C to quit", "yellow")) 

1108 

1109 if use_reloader: 

1110 from ._reloader import run_with_reloader 

1111 

1112 try: 

1113 run_with_reloader( 

1114 srv.serve_forever, 

1115 extra_files=extra_files, 

1116 exclude_patterns=exclude_patterns, 

1117 interval=reloader_interval, 

1118 reloader_type=reloader_type, 

1119 ) 

1120 finally: 

1121 srv.server_close() 

1122 else: 

1123 srv.serve_forever()