Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/serving.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

485 statements  

1"""A WSGI and HTTP server for use **during development only**. This 

2server is convenient to use, but is not designed to be particularly 

3stable, secure, or efficient. Use a dedicate WSGI server and HTTP 

4server when deploying to production. 

5 

6It provides features like interactive debugging and code reloading. Use 

7``run_simple`` to start the server. Put this in a ``run.py`` script: 

8 

9.. code-block:: python 

10 

11 from myapp import create_app 

12 from werkzeug import run_simple 

13""" 

14 

15from __future__ import annotations 

16 

17import errno 

18import io 

19import os 

20import selectors 

21import socket 

22import socketserver 

23import sys 

24import typing as t 

25from datetime import datetime as dt 

26from datetime import timedelta 

27from datetime import timezone 

28from http.server import BaseHTTPRequestHandler 

29from http.server import HTTPServer 

30from urllib.parse import unquote 

31from urllib.parse import urlsplit 

32 

33from ._internal import _log 

34from ._internal import _wsgi_encoding_dance 

35from .exceptions import InternalServerError 

36from .urls import uri_to_iri 

37 

38try: 

39 import ssl 

40 

41 connection_dropped_errors: tuple[type[Exception], ...] = ( 

42 ConnectionError, 

43 socket.timeout, 

44 ssl.SSLEOFError, 

45 ) 

46except ImportError: 

47 

48 class _SslDummy: 

49 def __getattr__(self, name: str) -> t.Any: 

50 raise RuntimeError( # noqa: B904 

51 "SSL is unavailable because this Python runtime was not" 

52 " compiled with SSL/TLS support." 

53 ) 

54 

55 ssl = _SslDummy() # type: ignore 

56 connection_dropped_errors = (ConnectionError, socket.timeout) 

57 

58_log_add_style = True 

59 

60if os.name == "nt": 

61 try: 

62 __import__("colorama") 

63 except ImportError: 

64 _log_add_style = False 

65 

66can_fork = hasattr(os, "fork") 

67 

68if can_fork: 

69 ForkingMixIn = socketserver.ForkingMixIn 

70else: 

71 

72 class ForkingMixIn: # type: ignore 

73 pass 

74 

75 

76try: 

77 af_unix = socket.AF_UNIX 

78except AttributeError: 

79 af_unix = None # type: ignore 

80 

81LISTEN_QUEUE = 128 

82 

83_TSSLContextArg = t.Optional[ 

84 t.Union["ssl.SSLContext", tuple[str, t.Optional[str]], t.Literal["adhoc"]] 

85] 

86 

87if t.TYPE_CHECKING: 

88 from _typeshed.wsgi import WSGIApplication 

89 from _typeshed.wsgi import WSGIEnvironment 

90 from cryptography.hazmat.primitives.asymmetric.rsa import ( 

91 RSAPrivateKeyWithSerialization, 

92 ) 

93 from cryptography.x509 import Certificate 

94 

95 

96class DechunkedInput(io.RawIOBase): 

97 """An input stream that handles Transfer-Encoding 'chunked'""" 

98 

99 def __init__(self, rfile: t.IO[bytes]) -> None: 

100 self._rfile = rfile 

101 self._done = False 

102 self._len = 0 

103 

104 def readable(self) -> bool: 

105 return True 

106 

107 def read_chunk_len(self) -> int: 

108 try: 

109 line = self._rfile.readline().decode("latin1") 

110 _len = int(line.strip(), 16) 

111 except ValueError as e: 

112 raise OSError("Invalid chunk header") from e 

113 if _len < 0: 

114 raise OSError("Negative chunk length not allowed") 

115 return _len 

116 

117 def readinto(self, buf: bytearray) -> int: # type: ignore 

118 read = 0 

119 while not self._done and read < len(buf): 

120 if self._len == 0: 

121 # This is the first chunk or we fully consumed the previous 

122 # one. Read the next length of the next chunk 

123 self._len = self.read_chunk_len() 

124 

125 if self._len == 0: 

126 # Found the final chunk of size 0. The stream is now exhausted, 

127 # but there is still a final newline that should be consumed 

128 self._done = True 

129 

130 if self._len > 0: 

131 # There is data (left) in this chunk, so append it to the 

132 # buffer. If this operation fully consumes the chunk, this will 

133 # reset self._len to 0. 

134 n = min(len(buf), self._len) 

135 

136 # If (read + chunk size) becomes more than len(buf), buf will 

137 # grow beyond the original size and read more data than 

138 # required. So only read as much data as can fit in buf. 

139 if read + n > len(buf): 

140 buf[read:] = self._rfile.read(len(buf) - read) 

141 self._len -= len(buf) - read 

142 read = len(buf) 

143 else: 

144 buf[read : read + n] = self._rfile.read(n) 

145 self._len -= n 

146 read += n 

147 

148 if self._len == 0: 

149 # Skip the terminating newline of a chunk that has been fully 

150 # consumed. This also applies to the 0-sized final chunk 

151 terminator = self._rfile.readline() 

152 if terminator not in (b"\n", b"\r\n", b"\r"): 

153 raise OSError("Missing chunk terminating newline") 

154 

155 return read 

156 

157 

158class WSGIRequestHandler(BaseHTTPRequestHandler): 

159 """A request handler that implements WSGI dispatching.""" 

160 

161 server: BaseWSGIServer 

162 

163 @property 

164 def server_version(self) -> str: # type: ignore 

165 return self.server._server_version 

166 

167 def make_environ(self) -> WSGIEnvironment: 

168 request_url = urlsplit(self.path) 

169 url_scheme = "http" if self.server.ssl_context is None else "https" 

170 

171 if not self.client_address: 

172 self.client_address = ("<local>", 0) 

173 elif isinstance(self.client_address, str): 

174 self.client_address = (self.client_address, 0) 

175 

176 # If there was no scheme but the path started with two slashes, 

177 # the first segment may have been incorrectly parsed as the 

178 # netloc, prepend it to the path again. 

179 if not request_url.scheme and request_url.netloc: 

180 path_info = f"/{request_url.netloc}{request_url.path}" 

181 else: 

182 path_info = request_url.path 

183 

184 path_info = unquote(path_info) 

185 

186 environ: WSGIEnvironment = { 

187 "wsgi.version": (1, 0), 

188 "wsgi.url_scheme": url_scheme, 

189 "wsgi.input": self.rfile, 

190 "wsgi.errors": sys.stderr, 

191 "wsgi.multithread": self.server.multithread, 

192 "wsgi.multiprocess": self.server.multiprocess, 

193 "wsgi.run_once": False, 

194 "werkzeug.socket": self.connection, 

195 "SERVER_SOFTWARE": self.server_version, 

196 "REQUEST_METHOD": self.command, 

197 "SCRIPT_NAME": "", 

198 "PATH_INFO": _wsgi_encoding_dance(path_info), 

199 "QUERY_STRING": _wsgi_encoding_dance(request_url.query), 

200 # Non-standard, added by mod_wsgi, uWSGI 

201 "REQUEST_URI": _wsgi_encoding_dance(self.path), 

202 # Non-standard, added by gunicorn 

203 "RAW_URI": _wsgi_encoding_dance(self.path), 

204 "REMOTE_ADDR": self.address_string(), 

205 "REMOTE_PORT": self.port_integer(), 

206 "SERVER_NAME": self.server.server_address[0], 

207 "SERVER_PORT": str(self.server.server_address[1]), 

208 "SERVER_PROTOCOL": self.request_version, 

209 } 

210 

211 for key, value in self.headers.items(): 

212 if "_" in key: 

213 continue 

214 

215 key = key.upper().replace("-", "_") 

216 value = value.replace("\r\n", "") 

217 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): 

218 key = f"HTTP_{key}" 

219 if key in environ: 

220 value = f"{environ[key]},{value}" 

221 environ[key] = value 

222 

223 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked": 

224 environ["wsgi.input_terminated"] = True 

225 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"]) 

226 

227 # Per RFC 2616, if the URL is absolute, use that as the host. 

228 # We're using "has a scheme" to indicate an absolute URL. 

229 if request_url.scheme and request_url.netloc: 

230 environ["HTTP_HOST"] = request_url.netloc 

231 

232 try: 

233 # binary_form=False gives nicer information, but wouldn't be compatible with 

234 # what Nginx or Apache could return. 

235 peer_cert = self.connection.getpeercert(binary_form=True) 

236 if peer_cert is not None: 

237 # Nginx and Apache use PEM format. 

238 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert) 

239 except ValueError: 

240 # SSL handshake hasn't finished. 

241 self.server.log("error", "Cannot fetch SSL peer certificate info") 

242 except AttributeError: 

243 # Not using TLS, the socket will not have getpeercert(). 

244 pass 

245 

246 return environ 

247 

248 def run_wsgi(self) -> None: 

249 if self.headers.get("Expect", "").lower().strip() == "100-continue": 

250 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

251 

252 self.environ = environ = self.make_environ() 

253 status_set: str | None = None 

254 headers_set: list[tuple[str, str]] | None = None 

255 status_sent: str | None = None 

256 headers_sent: list[tuple[str, str]] | None = None 

257 chunk_response: bool = False 

258 

259 def write(data: bytes) -> None: 

260 nonlocal status_sent, headers_sent, chunk_response 

261 assert status_set is not None, "write() before start_response" 

262 assert headers_set is not None, "write() before start_response" 

263 if status_sent is None: 

264 status_sent = status_set 

265 headers_sent = headers_set 

266 try: 

267 code_str, msg = status_sent.split(None, 1) 

268 except ValueError: 

269 code_str, msg = status_sent, "" 

270 code = int(code_str) 

271 self.send_response(code, msg) 

272 header_keys = set() 

273 for key, value in headers_sent: 

274 self.send_header(key, value) 

275 header_keys.add(key.lower()) 

276 

277 # Use chunked transfer encoding if there is no content 

278 # length. Do not use for 1xx and 204 responses. 304 

279 # responses and HEAD requests are also excluded, which 

280 # is the more conservative behavior and matches other 

281 # parts of the code. 

282 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1 

283 if ( 

284 not ( 

285 "content-length" in header_keys 

286 or environ["REQUEST_METHOD"] == "HEAD" 

287 or (100 <= code < 200) 

288 or code in {204, 304} 

289 ) 

290 and self.protocol_version >= "HTTP/1.1" 

291 ): 

292 chunk_response = True 

293 self.send_header("Transfer-Encoding", "chunked") 

294 

295 # Always close the connection. This disables HTTP/1.1 

296 # keep-alive connections. They aren't handled well by 

297 # Python's http.server because it doesn't know how to 

298 # drain the stream before the next request line. 

299 self.send_header("Connection", "close") 

300 self.end_headers() 

301 

302 assert isinstance(data, bytes), "applications must write bytes" 

303 

304 if data: 

305 if chunk_response: 

306 self.wfile.write(hex(len(data))[2:].encode()) 

307 self.wfile.write(b"\r\n") 

308 

309 self.wfile.write(data) 

310 

311 if chunk_response: 

312 self.wfile.write(b"\r\n") 

313 

314 self.wfile.flush() 

315 

316 def start_response(status, headers, exc_info=None): # type: ignore 

317 nonlocal status_set, headers_set 

318 if exc_info: 

319 try: 

320 if headers_sent: 

321 raise exc_info[1].with_traceback(exc_info[2]) 

322 finally: 

323 exc_info = None 

324 elif headers_set: 

325 raise AssertionError("Headers already set") 

326 status_set = status 

327 headers_set = headers 

328 return write 

329 

330 def execute(app: WSGIApplication) -> None: 

331 application_iter = app(environ, start_response) 

332 try: 

333 for data in application_iter: 

334 write(data) 

335 if not headers_sent: 

336 write(b"") 

337 if chunk_response: 

338 self.wfile.write(b"0\r\n\r\n") 

339 finally: 

340 # Check for any remaining data in the read socket, and discard it. This 

341 # will read past request.max_content_length, but lets the client see a 

342 # 413 response instead of a connection reset failure. If we supported 

343 # keep-alive connections, this naive approach would break by reading the 

344 # next request line. Since we know that write (above) closes every 

345 # connection we can read everything. 

346 selector = selectors.DefaultSelector() 

347 selector.register(self.connection, selectors.EVENT_READ) 

348 total_size = 0 

349 total_reads = 0 

350 

351 # A timeout of 0 tends to fail because a client needs a small amount of 

352 # time to continue sending its data. 

353 while selector.select(timeout=0.01): 

354 # Only read 10MB into memory at a time. 

355 data = self.rfile.read(10_000_000) 

356 total_size += len(data) 

357 total_reads += 1 

358 

359 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends 

360 # more than that, they'll get a connection reset failure. 

361 if not data or total_size >= 10_000_000_000 or total_reads > 1000: 

362 break 

363 

364 selector.close() 

365 

366 if hasattr(application_iter, "close"): 

367 application_iter.close() 

368 

369 try: 

370 execute(self.server.app) 

371 except connection_dropped_errors as e: 

372 self.connection_dropped(e, environ) 

373 except Exception as e: 

374 if self.server.passthrough_errors: 

375 raise 

376 

377 if status_sent is not None and chunk_response: 

378 self.close_connection = True 

379 

380 try: 

381 # if we haven't yet sent the headers but they are set 

382 # we roll back to be able to set them again. 

383 if status_sent is None: 

384 status_set = None 

385 headers_set = None 

386 execute(InternalServerError()) 

387 except Exception: 

388 pass 

389 

390 from .debug.tbtools import DebugTraceback 

391 

392 msg = DebugTraceback(e).render_traceback_text() 

393 self.server.log("error", f"Error on request:\n{msg}") 

394 

395 def handle(self) -> None: 

396 """Handles a request ignoring dropped connections.""" 

397 try: 

398 super().handle() 

399 except (ConnectionError, socket.timeout) as e: 

400 self.connection_dropped(e) 

401 except Exception as e: 

402 if self.server.ssl_context is not None and is_ssl_error(e): 

403 self.log_error("SSL error occurred: %s", e) 

404 else: 

405 raise 

406 

407 def connection_dropped( 

408 self, error: BaseException, environ: WSGIEnvironment | None = None 

409 ) -> None: 

410 """Called if the connection was closed by the client. By default 

411 nothing happens. 

412 """ 

413 

414 def __getattr__(self, name: str) -> t.Any: 

415 # All HTTP methods are handled by run_wsgi. 

416 if name.startswith("do_"): 

417 return self.run_wsgi 

418 

419 # All other attributes are forwarded to the base class. 

420 return getattr(super(), name) 

421 

422 def address_string(self) -> str: 

423 if getattr(self, "environ", None): 

424 return self.environ["REMOTE_ADDR"] # type: ignore 

425 

426 if not self.client_address: 

427 return "<local>" 

428 

429 return self.client_address[0] 

430 

431 def port_integer(self) -> int: 

432 return self.client_address[1] 

433 

434 # Escape control characters. This is defined (but private) in Python 3.12. 

435 _control_char_table = str.maketrans( 

436 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]} 

437 ) 

438 _control_char_table[ord("\\")] = r"\\" 

439 

440 def log_request(self, code: int | str = "-", size: int | str = "-") -> None: 

441 try: 

442 path = uri_to_iri(self.path) 

443 msg = f"{self.command} {path} {self.request_version}" 

444 except AttributeError: 

445 # path isn't set if the requestline was bad 

446 msg = self.requestline 

447 

448 # Escape control characters that may be in the decoded path. 

449 msg = msg.translate(self._control_char_table) 

450 code = str(code) 

451 

452 if code[0] == "1": # 1xx - Informational 

453 msg = _ansi_style(msg, "bold") 

454 elif code == "200": # 2xx - Success 

455 pass 

456 elif code == "304": # 304 - Resource Not Modified 

457 msg = _ansi_style(msg, "cyan") 

458 elif code[0] == "3": # 3xx - Redirection 

459 msg = _ansi_style(msg, "green") 

460 elif code == "404": # 404 - Resource Not Found 

461 msg = _ansi_style(msg, "yellow") 

462 elif code[0] == "4": # 4xx - Client Error 

463 msg = _ansi_style(msg, "bold", "red") 

464 else: # 5xx, or any other response 

465 msg = _ansi_style(msg, "bold", "magenta") 

466 

467 self.log("info", '"%s" %s %s', msg, code, size) 

468 

469 def log_error(self, format: str, *args: t.Any) -> None: 

470 self.log("error", format, *args) 

471 

472 def log_message(self, format: str, *args: t.Any) -> None: 

473 self.log("info", format, *args) 

474 

475 def log(self, type: str, message: str, *args: t.Any) -> None: 

476 # an IPv6 scoped address contains "%" which breaks logging 

477 address_string = self.address_string().replace("%", "%%") 

478 _log( 

479 type, 

480 f"{address_string} - - [{self.log_date_time_string()}] {message}\n", 

481 *args, 

482 ) 

483 

484 

485def _ansi_style(value: str, *styles: str) -> str: 

486 if not _log_add_style: 

487 return value 

488 

489 codes = { 

490 "bold": 1, 

491 "red": 31, 

492 "green": 32, 

493 "yellow": 33, 

494 "magenta": 35, 

495 "cyan": 36, 

496 } 

497 

498 for style in styles: 

499 value = f"\x1b[{codes[style]}m{value}" 

500 

501 return f"{value}\x1b[0m" 

502 

503 

504def generate_adhoc_ssl_pair( 

505 cn: str | None = None, 

506) -> tuple[Certificate, RSAPrivateKeyWithSerialization]: 

507 try: 

508 from cryptography import x509 

509 from cryptography.hazmat.backends import default_backend 

510 from cryptography.hazmat.primitives import hashes 

511 from cryptography.hazmat.primitives.asymmetric import rsa 

512 from cryptography.x509.oid import NameOID 

513 except ImportError: 

514 raise TypeError( 

515 "Using ad-hoc certificates requires the cryptography library." 

516 ) from None 

517 

518 backend = default_backend() 

519 pkey = rsa.generate_private_key( 

520 public_exponent=65537, key_size=2048, backend=backend 

521 ) 

522 

523 # pretty damn sure that this is not actually accepted by anyone 

524 if cn is None: 

525 cn = "*" 

526 

527 subject = x509.Name( 

528 [ 

529 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"), 

530 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

531 ] 

532 ) 

533 

534 backend = default_backend() 

535 cert = ( 

536 x509.CertificateBuilder() 

537 .subject_name(subject) 

538 .issuer_name(subject) 

539 .public_key(pkey.public_key()) 

540 .serial_number(x509.random_serial_number()) 

541 .not_valid_before(dt.now(timezone.utc)) 

542 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365)) 

543 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False) 

544 .add_extension( 

545 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]), 

546 critical=False, 

547 ) 

548 .sign(pkey, hashes.SHA256(), backend) 

549 ) 

550 return cert, pkey 

551 

552 

553def make_ssl_devcert( 

554 base_path: str, host: str | None = None, cn: str | None = None 

555) -> tuple[str, str]: 

556 """Creates an SSL key for development. This should be used instead of 

557 the ``'adhoc'`` key which generates a new cert on each server start. 

558 It accepts a path for where it should store the key and cert and 

559 either a host or CN. If a host is given it will use the CN 

560 ``*.host/CN=host``. 

561 

562 For more information see :func:`run_simple`. 

563 

564 .. versionadded:: 0.9 

565 

566 :param base_path: the path to the certificate and key. The extension 

567 ``.crt`` is added for the certificate, ``.key`` is 

568 added for the key. 

569 :param host: the name of the host. This can be used as an alternative 

570 for the `cn`. 

571 :param cn: the `CN` to use. 

572 """ 

573 

574 if host is not None: 

575 cn = host 

576 cert, pkey = generate_adhoc_ssl_pair(cn=cn) 

577 

578 from cryptography.hazmat.primitives import serialization 

579 

580 cert_file = f"{base_path}.crt" 

581 pkey_file = f"{base_path}.key" 

582 

583 with open(cert_file, "wb") as f: 

584 f.write(cert.public_bytes(serialization.Encoding.PEM)) 

585 with open(pkey_file, "wb") as f: 

586 f.write( 

587 pkey.private_bytes( 

588 encoding=serialization.Encoding.PEM, 

589 format=serialization.PrivateFormat.TraditionalOpenSSL, 

590 encryption_algorithm=serialization.NoEncryption(), 

591 ) 

592 ) 

593 

594 return cert_file, pkey_file 

595 

596 

597def generate_adhoc_ssl_context() -> ssl.SSLContext: 

598 """Generates an adhoc SSL context for the development server.""" 

599 import atexit 

600 import tempfile 

601 

602 cert, pkey = generate_adhoc_ssl_pair() 

603 

604 from cryptography.hazmat.primitives import serialization 

605 

606 cert_handle, cert_file = tempfile.mkstemp() 

607 pkey_handle, pkey_file = tempfile.mkstemp() 

608 atexit.register(os.remove, pkey_file) 

609 atexit.register(os.remove, cert_file) 

610 

611 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM)) 

612 os.write( 

613 pkey_handle, 

614 pkey.private_bytes( 

615 encoding=serialization.Encoding.PEM, 

616 format=serialization.PrivateFormat.TraditionalOpenSSL, 

617 encryption_algorithm=serialization.NoEncryption(), 

618 ), 

619 ) 

620 

621 os.close(cert_handle) 

622 os.close(pkey_handle) 

623 ctx = load_ssl_context(cert_file, pkey_file) 

624 return ctx 

625 

626 

627def load_ssl_context( 

628 cert_file: str, pkey_file: str | None = None, protocol: int | None = None 

629) -> ssl.SSLContext: 

630 """Loads SSL context from cert/private key files and optional protocol. 

631 Many parameters are directly taken from the API of 

632 :py:class:`ssl.SSLContext`. 

633 

634 :param cert_file: Path of the certificate to use. 

635 :param pkey_file: Path of the private key to use. If not given, the key 

636 will be obtained from the certificate file. 

637 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module. 

638 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`. 

639 """ 

640 if protocol is None: 

641 protocol = ssl.PROTOCOL_TLS_SERVER 

642 

643 ctx = ssl.SSLContext(protocol) 

644 ctx.load_cert_chain(cert_file, pkey_file) 

645 return ctx 

646 

647 

648def is_ssl_error(error: Exception | None = None) -> bool: 

649 """Checks if the given error (or the current one) is an SSL error.""" 

650 if error is None: 

651 error = t.cast(Exception, sys.exc_info()[1]) 

652 return isinstance(error, ssl.SSLError) 

653 

654 

655def select_address_family(host: str, port: int) -> socket.AddressFamily: 

656 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on 

657 the host and port.""" 

658 if host.startswith("unix://"): 

659 return socket.AF_UNIX 

660 elif ":" in host and hasattr(socket, "AF_INET6"): 

661 return socket.AF_INET6 

662 return socket.AF_INET 

663 

664 

665def get_sockaddr( 

666 host: str, port: int, family: socket.AddressFamily 

667) -> tuple[str, int] | str: 

668 """Return a fully qualified socket address that can be passed to 

669 :func:`socket.bind`.""" 

670 if family == af_unix: 

671 # Absolute path avoids IDNA encoding error when path starts with dot. 

672 return os.path.abspath(host.partition("://")[2]) 

673 try: 

674 res = socket.getaddrinfo( 

675 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP 

676 ) 

677 except socket.gaierror: 

678 return host, port 

679 return res[0][4] # type: ignore 

680 

681 

682def get_interface_ip(family: socket.AddressFamily) -> str: 

683 """Get the IP address of an external interface. Used when binding to 

684 0.0.0.0 or ::1 to show a more useful URL. 

685 

686 :meta private: 

687 """ 

688 # arbitrary private address 

689 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219" 

690 

691 with socket.socket(family, socket.SOCK_DGRAM) as s: 

692 try: 

693 s.connect((host, 58162)) 

694 except OSError: 

695 return "::1" if family == socket.AF_INET6 else "127.0.0.1" 

696 

697 return s.getsockname()[0] # type: ignore 

698 

699 

700class BaseWSGIServer(HTTPServer): 

701 """A WSGI server that that handles one request at a time. 

702 

703 Use :func:`make_server` to create a server instance. 

704 """ 

705 

706 multithread = False 

707 multiprocess = False 

708 request_queue_size = LISTEN_QUEUE 

709 allow_reuse_address = True 

710 

711 def __init__( 

712 self, 

713 host: str, 

714 port: int, 

715 app: WSGIApplication, 

716 handler: type[WSGIRequestHandler] | None = None, 

717 passthrough_errors: bool = False, 

718 ssl_context: _TSSLContextArg | None = None, 

719 fd: int | None = None, 

720 ) -> None: 

721 if handler is None: 

722 handler = WSGIRequestHandler 

723 

724 # If the handler doesn't directly set a protocol version and 

725 # thread or process workers are used, then allow chunked 

726 # responses and keep-alive connections by enabling HTTP/1.1. 

727 if "protocol_version" not in vars(handler) and ( 

728 self.multithread or self.multiprocess 

729 ): 

730 handler.protocol_version = "HTTP/1.1" 

731 

732 self.host = host 

733 self.port = port 

734 self.app = app 

735 self.passthrough_errors = passthrough_errors 

736 

737 self.address_family = address_family = select_address_family(host, port) 

738 server_address = get_sockaddr(host, int(port), address_family) 

739 

740 # Remove a leftover Unix socket file from a previous run. Don't 

741 # remove a file that was set up by run_simple. 

742 if address_family == af_unix and fd is None: 

743 server_address = t.cast(str, server_address) 

744 

745 if os.path.exists(server_address): 

746 os.unlink(server_address) 

747 

748 # Bind and activate will be handled manually, it should only 

749 # happen if we're not using a socket that was already set up. 

750 super().__init__( 

751 server_address, # type: ignore[arg-type] 

752 handler, 

753 bind_and_activate=False, 

754 ) 

755 

756 if fd is None: 

757 # No existing socket descriptor, do bind_and_activate=True. 

758 try: 

759 self.server_bind() 

760 self.server_activate() 

761 except OSError as e: 

762 # Catch connection issues and show them without the traceback. Show 

763 # extra instructions for address not found, and for macOS. 

764 self.server_close() 

765 print(e.strerror, file=sys.stderr) 

766 

767 if e.errno == errno.EADDRINUSE: 

768 print( 

769 f"Port {port} is in use by another program. Either identify and" 

770 " stop that program, or start the server with a different" 

771 " port.", 

772 file=sys.stderr, 

773 ) 

774 

775 if sys.platform == "darwin" and port == 5000: 

776 print( 

777 "On macOS, try disabling the 'AirPlay Receiver' service" 

778 " from System Preferences -> General -> AirDrop & Handoff.", 

779 file=sys.stderr, 

780 ) 

781 

782 sys.exit(1) 

783 except BaseException: 

784 self.server_close() 

785 raise 

786 else: 

787 # TCPServer automatically opens a socket even if bind_and_activate is False. 

788 # Close it to silence a ResourceWarning. 

789 self.server_close() 

790 

791 # Use the passed in socket directly. 

792 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM) 

793 self.server_address = self.socket.getsockname() 

794 

795 if address_family != af_unix: 

796 # If port was 0, this will record the bound port. 

797 self.port = self.server_address[1] 

798 

799 if ssl_context is not None: 

800 if isinstance(ssl_context, tuple): 

801 ssl_context = load_ssl_context(*ssl_context) 

802 elif ssl_context == "adhoc": 

803 ssl_context = generate_adhoc_ssl_context() 

804 

805 self.socket = ssl_context.wrap_socket(self.socket, server_side=True) 

806 self.ssl_context: ssl.SSLContext | None = ssl_context 

807 else: 

808 self.ssl_context = None 

809 

810 import importlib.metadata 

811 

812 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}" 

813 

814 def log(self, type: str, message: str, *args: t.Any) -> None: 

815 _log(type, message, *args) 

816 

817 def serve_forever(self, poll_interval: float = 0.5) -> None: 

818 try: 

819 super().serve_forever(poll_interval=poll_interval) 

820 except KeyboardInterrupt: 

821 pass 

822 finally: 

823 self.server_close() 

824 

825 def handle_error( 

826 self, request: t.Any, client_address: tuple[str, int] | str 

827 ) -> None: 

828 if self.passthrough_errors: 

829 raise 

830 

831 return super().handle_error(request, client_address) 

832 

833 def log_startup(self) -> None: 

834 """Show information about the address when starting the server.""" 

835 dev_warning = ( 

836 "WARNING: This is a development server. Do not use it in a production" 

837 " deployment. Use a production WSGI server instead." 

838 ) 

839 dev_warning = _ansi_style(dev_warning, "bold", "red") 

840 messages = [dev_warning] 

841 

842 if self.address_family == af_unix: 

843 messages.append(f" * Running on {self.host}") 

844 else: 

845 scheme = "http" if self.ssl_context is None else "https" 

846 display_hostname = self.host 

847 

848 if self.host in {"0.0.0.0", "::"}: 

849 messages.append(f" * Running on all addresses ({self.host})") 

850 

851 if self.host == "0.0.0.0": 

852 localhost = "127.0.0.1" 

853 display_hostname = get_interface_ip(socket.AF_INET) 

854 else: 

855 localhost = "[::1]" 

856 display_hostname = get_interface_ip(socket.AF_INET6) 

857 

858 messages.append(f" * Running on {scheme}://{localhost}:{self.port}") 

859 

860 if ":" in display_hostname: 

861 display_hostname = f"[{display_hostname}]" 

862 

863 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}") 

864 

865 _log("info", "\n".join(messages)) 

866 

867 

868class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer): 

869 """A WSGI server that handles concurrent requests in separate 

870 threads. 

871 

872 Use :func:`make_server` to create a server instance. 

873 """ 

874 

875 multithread = True 

876 daemon_threads = True 

877 

878 

879class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer): 

880 """A WSGI server that handles concurrent requests in separate forked 

881 processes. 

882 

883 Use :func:`make_server` to create a server instance. 

884 """ 

885 

886 multiprocess = True 

887 

888 def __init__( 

889 self, 

890 host: str, 

891 port: int, 

892 app: WSGIApplication, 

893 processes: int = 40, 

894 handler: type[WSGIRequestHandler] | None = None, 

895 passthrough_errors: bool = False, 

896 ssl_context: _TSSLContextArg | None = None, 

897 fd: int | None = None, 

898 ) -> None: 

899 if not can_fork: 

900 raise ValueError("Your platform does not support forking.") 

901 

902 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd) 

903 self.max_children = processes 

904 

905 

906def make_server( 

907 host: str, 

908 port: int, 

909 app: WSGIApplication, 

910 threaded: bool = False, 

911 processes: int = 1, 

912 request_handler: type[WSGIRequestHandler] | None = None, 

913 passthrough_errors: bool = False, 

914 ssl_context: _TSSLContextArg | None = None, 

915 fd: int | None = None, 

916) -> BaseWSGIServer: 

917 """Create an appropriate WSGI server instance based on the value of 

918 ``threaded`` and ``processes``. 

919 

920 This is called from :func:`run_simple`, but can be used separately 

921 to have access to the server object, such as to run it in a separate 

922 thread. 

923 

924 See :func:`run_simple` for parameter docs. 

925 """ 

926 if threaded and processes > 1: 

927 raise ValueError("Cannot have a multi-thread and multi-process server.") 

928 

929 if threaded: 

930 return ThreadedWSGIServer( 

931 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

932 ) 

933 

934 if processes > 1: 

935 return ForkingWSGIServer( 

936 host, 

937 port, 

938 app, 

939 processes, 

940 request_handler, 

941 passthrough_errors, 

942 ssl_context, 

943 fd=fd, 

944 ) 

945 

946 return BaseWSGIServer( 

947 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

948 ) 

949 

950 

951def is_running_from_reloader() -> bool: 

952 """Check if the server is running as a subprocess within the 

953 Werkzeug reloader. 

954 

955 .. versionadded:: 0.10 

956 """ 

957 return os.environ.get("WERKZEUG_RUN_MAIN") == "true" 

958 

959 

960def run_simple( 

961 hostname: str, 

962 port: int, 

963 application: WSGIApplication, 

964 use_reloader: bool = False, 

965 use_debugger: bool = False, 

966 use_evalex: bool = True, 

967 extra_files: t.Iterable[str] | None = None, 

968 exclude_patterns: t.Iterable[str] | None = None, 

969 reloader_interval: int = 1, 

970 reloader_type: str = "auto", 

971 threaded: bool = False, 

972 processes: int = 1, 

973 request_handler: type[WSGIRequestHandler] | None = None, 

974 static_files: dict[str, str | tuple[str, str]] | None = None, 

975 passthrough_errors: bool = False, 

976 ssl_context: _TSSLContextArg | None = None, 

977) -> None: 

978 """Start a development server for a WSGI application. Various 

979 optional features can be enabled. 

980 

981 .. warning:: 

982 

983 Do not use the development server when deploying to production. 

984 It is intended for use only during local development. It is not 

985 designed to be particularly efficient, stable, or secure. 

986 

987 :param hostname: The host to bind to, for example ``'localhost'``. 

988 Can be a domain, IPv4 or IPv6 address, or file path starting 

989 with ``unix://`` for a Unix socket. 

990 :param port: The port to bind to, for example ``8080``. Using ``0`` 

991 tells the OS to pick a random free port. 

992 :param application: The WSGI application to run. 

993 :param use_reloader: Use a reloader process to restart the server 

994 process when files are changed. 

995 :param use_debugger: Use Werkzeug's debugger, which will show 

996 formatted tracebacks on unhandled exceptions. 

997 :param use_evalex: Make the debugger interactive. A Python terminal 

998 can be opened for any frame in the traceback. Some protection is 

999 provided by requiring a PIN, but this should never be enabled 

1000 on a publicly visible server. 

1001 :param extra_files: The reloader will watch these files for changes 

1002 in addition to Python modules. For example, watch a 

1003 configuration file. 

1004 :param exclude_patterns: The reloader will ignore changes to any 

1005 files matching these :mod:`fnmatch` patterns. For example, 

1006 ignore cache files. 

1007 :param reloader_interval: How often the reloader tries to check for 

1008 changes. 

1009 :param reloader_type: The reloader to use. The ``'stat'`` reloader 

1010 is built in, but may require significant CPU to watch files. The 

1011 ``'watchdog'`` reloader is much more efficient but requires 

1012 installing the ``watchdog`` package first. 

1013 :param threaded: Handle concurrent requests using threads. Cannot be 

1014 used with ``processes``. 

1015 :param processes: Handle concurrent requests using up to this number 

1016 of processes. Cannot be used with ``threaded``. 

1017 :param request_handler: Use a different 

1018 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to 

1019 handle requests. 

1020 :param static_files: A dict mapping URL prefixes to directories to 

1021 serve static files from using 

1022 :class:`~werkzeug.middleware.SharedDataMiddleware`. 

1023 :param passthrough_errors: Don't catch unhandled exceptions at the 

1024 server level, let the server crash instead. If ``use_debugger`` 

1025 is enabled, the debugger will still catch such errors. 

1026 :param ssl_context: Configure TLS to serve over HTTPS. Can be an 

1027 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)`` 

1028 tuple to create a typical context, or the string ``'adhoc'`` to 

1029 generate a temporary self-signed certificate. 

1030 

1031 .. versionchanged:: 2.1 

1032 Instructions are shown for dealing with an "address already in 

1033 use" error. 

1034 

1035 .. versionchanged:: 2.1 

1036 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in 

1037 addition to a real IP. 

1038 

1039 .. versionchanged:: 2.1 

1040 The command-line interface was removed. 

1041 

1042 .. versionchanged:: 2.0 

1043 Running on ``0.0.0.0`` or ``::`` shows a real IP address that 

1044 was bound as well as a warning not to run the development server 

1045 in production. 

1046 

1047 .. versionchanged:: 2.0 

1048 The ``exclude_patterns`` parameter was added. 

1049 

1050 .. versionchanged:: 0.15 

1051 Bind to a Unix socket by passing a ``hostname`` that starts with 

1052 ``unix://``. 

1053 

1054 .. versionchanged:: 0.10 

1055 Improved the reloader and added support for changing the backend 

1056 through the ``reloader_type`` parameter. 

1057 

1058 .. versionchanged:: 0.9 

1059 A command-line interface was added. 

1060 

1061 .. versionchanged:: 0.8 

1062 ``ssl_context`` can be a tuple of paths to the certificate and 

1063 private key files. 

1064 

1065 .. versionchanged:: 0.6 

1066 The ``ssl_context`` parameter was added. 

1067 

1068 .. versionchanged:: 0.5 

1069 The ``static_files`` and ``passthrough_errors`` parameters were 

1070 added. 

1071 """ 

1072 if not isinstance(port, int): 

1073 raise TypeError("port must be an integer") 

1074 

1075 if static_files: 

1076 from .middleware.shared_data import SharedDataMiddleware 

1077 

1078 application = SharedDataMiddleware(application, static_files) 

1079 

1080 if use_debugger: 

1081 from .debug import DebuggedApplication 

1082 

1083 application = DebuggedApplication(application, evalex=use_evalex) 

1084 # Allow the specified hostname to use the debugger, in addition to 

1085 # localhost domains. 

1086 application.trusted_hosts.append(hostname) 

1087 

1088 if not is_running_from_reloader(): 

1089 fd = None 

1090 else: 

1091 fd = int(os.environ["WERKZEUG_SERVER_FD"]) 

1092 

1093 srv = make_server( 

1094 hostname, 

1095 port, 

1096 application, 

1097 threaded, 

1098 processes, 

1099 request_handler, 

1100 passthrough_errors, 

1101 ssl_context, 

1102 fd=fd, 

1103 ) 

1104 srv.socket.set_inheritable(True) 

1105 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno()) 

1106 

1107 if not is_running_from_reloader(): 

1108 srv.log_startup() 

1109 _log("info", _ansi_style("Press CTRL+C to quit", "yellow")) 

1110 

1111 if use_reloader: 

1112 from ._reloader import run_with_reloader 

1113 

1114 try: 

1115 run_with_reloader( 

1116 srv.serve_forever, 

1117 extra_files=extra_files, 

1118 exclude_patterns=exclude_patterns, 

1119 interval=reloader_interval, 

1120 reloader_type=reloader_type, 

1121 ) 

1122 finally: 

1123 srv.server_close() 

1124 else: 

1125 srv.serve_forever()