Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%

479 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-09 06:08 +0000

1"""A WSGI and HTTP server for use **during development only**. This 

2server is convenient to use, but is not designed to be particularly 

3stable, secure, or efficient. Use a dedicate WSGI server and HTTP 

4server when deploying to production. 

5 

6It provides features like interactive debugging and code reloading. Use 

7``run_simple`` to start the server. Put this in a ``run.py`` script: 

8 

9.. code-block:: python 

10 

11 from myapp import create_app 

12 from werkzeug import run_simple 

13""" 

14from __future__ import annotations 

15 

16import errno 

17import io 

18import os 

19import selectors 

20import socket 

21import socketserver 

22import sys 

23import typing as t 

24from datetime import datetime as dt 

25from datetime import timedelta 

26from datetime import timezone 

27from http.server import BaseHTTPRequestHandler 

28from http.server import HTTPServer 

29from urllib.parse import unquote 

30from urllib.parse import urlsplit 

31 

32from ._internal import _log 

33from ._internal import _wsgi_encoding_dance 

34from .exceptions import InternalServerError 

35from .urls import uri_to_iri 

36 

37try: 

38 import ssl 

39except ImportError: 

40 

41 class _SslDummy: 

42 def __getattr__(self, name: str) -> t.Any: 

43 raise RuntimeError( # noqa: B904 

44 "SSL is unavailable because this Python runtime was not" 

45 " compiled with SSL/TLS support." 

46 ) 

47 

48 ssl = _SslDummy() # type: ignore 

49 

50_log_add_style = True 

51 

52if os.name == "nt": 

53 try: 

54 __import__("colorama") 

55 except ImportError: 

56 _log_add_style = False 

57 

58can_fork = hasattr(os, "fork") 

59 

60if can_fork: 

61 ForkingMixIn = socketserver.ForkingMixIn 

62else: 

63 

64 class ForkingMixIn: # type: ignore 

65 pass 

66 

67 

68try: 

69 af_unix = socket.AF_UNIX 

70except AttributeError: 

71 af_unix = None # type: ignore 

72 

73LISTEN_QUEUE = 128 

74 

75_TSSLContextArg = t.Optional[ 

76 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], t.Literal["adhoc"]] 

77] 

78 

79if t.TYPE_CHECKING: 

80 from _typeshed.wsgi import WSGIApplication 

81 from _typeshed.wsgi import WSGIEnvironment 

82 from cryptography.hazmat.primitives.asymmetric.rsa import ( 

83 RSAPrivateKeyWithSerialization, 

84 ) 

85 from cryptography.x509 import Certificate 

86 

87 

88class DechunkedInput(io.RawIOBase): 

89 """An input stream that handles Transfer-Encoding 'chunked'""" 

90 

91 def __init__(self, rfile: t.IO[bytes]) -> None: 

92 self._rfile = rfile 

93 self._done = False 

94 self._len = 0 

95 

96 def readable(self) -> bool: 

97 return True 

98 

99 def read_chunk_len(self) -> int: 

100 try: 

101 line = self._rfile.readline().decode("latin1") 

102 _len = int(line.strip(), 16) 

103 except ValueError as e: 

104 raise OSError("Invalid chunk header") from e 

105 if _len < 0: 

106 raise OSError("Negative chunk length not allowed") 

107 return _len 

108 

109 def readinto(self, buf: bytearray) -> int: # type: ignore 

110 read = 0 

111 while not self._done and read < len(buf): 

112 if self._len == 0: 

113 # This is the first chunk or we fully consumed the previous 

114 # one. Read the next length of the next chunk 

115 self._len = self.read_chunk_len() 

116 

117 if self._len == 0: 

118 # Found the final chunk of size 0. The stream is now exhausted, 

119 # but there is still a final newline that should be consumed 

120 self._done = True 

121 

122 if self._len > 0: 

123 # There is data (left) in this chunk, so append it to the 

124 # buffer. If this operation fully consumes the chunk, this will 

125 # reset self._len to 0. 

126 n = min(len(buf), self._len) 

127 

128 # If (read + chunk size) becomes more than len(buf), buf will 

129 # grow beyond the original size and read more data than 

130 # required. So only read as much data as can fit in buf. 

131 if read + n > len(buf): 

132 buf[read:] = self._rfile.read(len(buf) - read) 

133 self._len -= len(buf) - read 

134 read = len(buf) 

135 else: 

136 buf[read : read + n] = self._rfile.read(n) 

137 self._len -= n 

138 read += n 

139 

140 if self._len == 0: 

141 # Skip the terminating newline of a chunk that has been fully 

142 # consumed. This also applies to the 0-sized final chunk 

143 terminator = self._rfile.readline() 

144 if terminator not in (b"\n", b"\r\n", b"\r"): 

145 raise OSError("Missing chunk terminating newline") 

146 

147 return read 

148 

149 

150class WSGIRequestHandler(BaseHTTPRequestHandler): 

151 """A request handler that implements WSGI dispatching.""" 

152 

153 server: BaseWSGIServer 

154 

155 @property 

156 def server_version(self) -> str: # type: ignore 

157 from . import __version__ 

158 

159 return f"Werkzeug/{__version__}" 

160 

161 def make_environ(self) -> WSGIEnvironment: 

162 request_url = urlsplit(self.path) 

163 url_scheme = "http" if self.server.ssl_context is None else "https" 

164 

165 if not self.client_address: 

166 self.client_address = ("<local>", 0) 

167 elif isinstance(self.client_address, str): 

168 self.client_address = (self.client_address, 0) 

169 

170 # If there was no scheme but the path started with two slashes, 

171 # the first segment may have been incorrectly parsed as the 

172 # netloc, prepend it to the path again. 

173 if not request_url.scheme and request_url.netloc: 

174 path_info = f"/{request_url.netloc}{request_url.path}" 

175 else: 

176 path_info = request_url.path 

177 

178 path_info = unquote(path_info) 

179 

180 environ: WSGIEnvironment = { 

181 "wsgi.version": (1, 0), 

182 "wsgi.url_scheme": url_scheme, 

183 "wsgi.input": self.rfile, 

184 "wsgi.errors": sys.stderr, 

185 "wsgi.multithread": self.server.multithread, 

186 "wsgi.multiprocess": self.server.multiprocess, 

187 "wsgi.run_once": False, 

188 "werkzeug.socket": self.connection, 

189 "SERVER_SOFTWARE": self.server_version, 

190 "REQUEST_METHOD": self.command, 

191 "SCRIPT_NAME": "", 

192 "PATH_INFO": _wsgi_encoding_dance(path_info), 

193 "QUERY_STRING": _wsgi_encoding_dance(request_url.query), 

194 # Non-standard, added by mod_wsgi, uWSGI 

195 "REQUEST_URI": _wsgi_encoding_dance(self.path), 

196 # Non-standard, added by gunicorn 

197 "RAW_URI": _wsgi_encoding_dance(self.path), 

198 "REMOTE_ADDR": self.address_string(), 

199 "REMOTE_PORT": self.port_integer(), 

200 "SERVER_NAME": self.server.server_address[0], 

201 "SERVER_PORT": str(self.server.server_address[1]), 

202 "SERVER_PROTOCOL": self.request_version, 

203 } 

204 

205 for key, value in self.headers.items(): 

206 if "_" in key: 

207 continue 

208 

209 key = key.upper().replace("-", "_") 

210 value = value.replace("\r\n", "") 

211 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): 

212 key = f"HTTP_{key}" 

213 if key in environ: 

214 value = f"{environ[key]},{value}" 

215 environ[key] = value 

216 

217 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked": 

218 environ["wsgi.input_terminated"] = True 

219 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"]) 

220 

221 # Per RFC 2616, if the URL is absolute, use that as the host. 

222 # We're using "has a scheme" to indicate an absolute URL. 

223 if request_url.scheme and request_url.netloc: 

224 environ["HTTP_HOST"] = request_url.netloc 

225 

226 try: 

227 # binary_form=False gives nicer information, but wouldn't be compatible with 

228 # what Nginx or Apache could return. 

229 peer_cert = self.connection.getpeercert(binary_form=True) 

230 if peer_cert is not None: 

231 # Nginx and Apache use PEM format. 

232 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert) 

233 except ValueError: 

234 # SSL handshake hasn't finished. 

235 self.server.log("error", "Cannot fetch SSL peer certificate info") 

236 except AttributeError: 

237 # Not using TLS, the socket will not have getpeercert(). 

238 pass 

239 

240 return environ 

241 

242 def run_wsgi(self) -> None: 

243 if self.headers.get("Expect", "").lower().strip() == "100-continue": 

244 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n") 

245 

246 self.environ = environ = self.make_environ() 

247 status_set: str | None = None 

248 headers_set: list[tuple[str, str]] | None = None 

249 status_sent: str | None = None 

250 headers_sent: list[tuple[str, str]] | None = None 

251 chunk_response: bool = False 

252 

253 def write(data: bytes) -> None: 

254 nonlocal status_sent, headers_sent, chunk_response 

255 assert status_set is not None, "write() before start_response" 

256 assert headers_set is not None, "write() before start_response" 

257 if status_sent is None: 

258 status_sent = status_set 

259 headers_sent = headers_set 

260 try: 

261 code_str, msg = status_sent.split(None, 1) 

262 except ValueError: 

263 code_str, msg = status_sent, "" 

264 code = int(code_str) 

265 self.send_response(code, msg) 

266 header_keys = set() 

267 for key, value in headers_sent: 

268 self.send_header(key, value) 

269 header_keys.add(key.lower()) 

270 

271 # Use chunked transfer encoding if there is no content 

272 # length. Do not use for 1xx and 204 responses. 304 

273 # responses and HEAD requests are also excluded, which 

274 # is the more conservative behavior and matches other 

275 # parts of the code. 

276 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1 

277 if ( 

278 not ( 

279 "content-length" in header_keys 

280 or environ["REQUEST_METHOD"] == "HEAD" 

281 or (100 <= code < 200) 

282 or code in {204, 304} 

283 ) 

284 and self.protocol_version >= "HTTP/1.1" 

285 ): 

286 chunk_response = True 

287 self.send_header("Transfer-Encoding", "chunked") 

288 

289 # Always close the connection. This disables HTTP/1.1 

290 # keep-alive connections. They aren't handled well by 

291 # Python's http.server because it doesn't know how to 

292 # drain the stream before the next request line. 

293 self.send_header("Connection", "close") 

294 self.end_headers() 

295 

296 assert isinstance(data, bytes), "applications must write bytes" 

297 

298 if data: 

299 if chunk_response: 

300 self.wfile.write(hex(len(data))[2:].encode()) 

301 self.wfile.write(b"\r\n") 

302 

303 self.wfile.write(data) 

304 

305 if chunk_response: 

306 self.wfile.write(b"\r\n") 

307 

308 self.wfile.flush() 

309 

310 def start_response(status, headers, exc_info=None): # type: ignore 

311 nonlocal status_set, headers_set 

312 if exc_info: 

313 try: 

314 if headers_sent: 

315 raise exc_info[1].with_traceback(exc_info[2]) 

316 finally: 

317 exc_info = None 

318 elif headers_set: 

319 raise AssertionError("Headers already set") 

320 status_set = status 

321 headers_set = headers 

322 return write 

323 

324 def execute(app: WSGIApplication) -> None: 

325 application_iter = app(environ, start_response) 

326 try: 

327 for data in application_iter: 

328 write(data) 

329 if not headers_sent: 

330 write(b"") 

331 if chunk_response: 

332 self.wfile.write(b"0\r\n\r\n") 

333 finally: 

334 # Check for any remaining data in the read socket, and discard it. This 

335 # will read past request.max_content_length, but lets the client see a 

336 # 413 response instead of a connection reset failure. If we supported 

337 # keep-alive connections, this naive approach would break by reading the 

338 # next request line. Since we know that write (above) closes every 

339 # connection we can read everything. 

340 selector = selectors.DefaultSelector() 

341 selector.register(self.connection, selectors.EVENT_READ) 

342 total_size = 0 

343 total_reads = 0 

344 

345 # A timeout of 0 tends to fail because a client needs a small amount of 

346 # time to continue sending its data. 

347 while selector.select(timeout=0.01): 

348 # Only read 10MB into memory at a time. 

349 data = self.rfile.read(10_000_000) 

350 total_size += len(data) 

351 total_reads += 1 

352 

353 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends 

354 # more than that, they'll get a connection reset failure. 

355 if not data or total_size >= 10_000_000_000 or total_reads > 1000: 

356 break 

357 

358 selector.close() 

359 

360 if hasattr(application_iter, "close"): 

361 application_iter.close() 

362 

363 try: 

364 execute(self.server.app) 

365 except (ConnectionError, socket.timeout) as e: 

366 self.connection_dropped(e, environ) 

367 except Exception as e: 

368 if self.server.passthrough_errors: 

369 raise 

370 

371 if status_sent is not None and chunk_response: 

372 self.close_connection = True 

373 

374 try: 

375 # if we haven't yet sent the headers but they are set 

376 # we roll back to be able to set them again. 

377 if status_sent is None: 

378 status_set = None 

379 headers_set = None 

380 execute(InternalServerError()) 

381 except Exception: 

382 pass 

383 

384 from .debug.tbtools import DebugTraceback 

385 

386 msg = DebugTraceback(e).render_traceback_text() 

387 self.server.log("error", f"Error on request:\n{msg}") 

388 

389 def handle(self) -> None: 

390 """Handles a request ignoring dropped connections.""" 

391 try: 

392 super().handle() 

393 except (ConnectionError, socket.timeout) as e: 

394 self.connection_dropped(e) 

395 except Exception as e: 

396 if self.server.ssl_context is not None and is_ssl_error(e): 

397 self.log_error("SSL error occurred: %s", e) 

398 else: 

399 raise 

400 

401 def connection_dropped( 

402 self, error: BaseException, environ: WSGIEnvironment | None = None 

403 ) -> None: 

404 """Called if the connection was closed by the client. By default 

405 nothing happens. 

406 """ 

407 

408 def __getattr__(self, name: str) -> t.Any: 

409 # All HTTP methods are handled by run_wsgi. 

410 if name.startswith("do_"): 

411 return self.run_wsgi 

412 

413 # All other attributes are forwarded to the base class. 

414 return getattr(super(), name) 

415 

416 def address_string(self) -> str: 

417 if getattr(self, "environ", None): 

418 return self.environ["REMOTE_ADDR"] # type: ignore 

419 

420 if not self.client_address: 

421 return "<local>" 

422 

423 return self.client_address[0] 

424 

425 def port_integer(self) -> int: 

426 return self.client_address[1] 

427 

428 # Escape control characters. This is defined (but private) in Python 3.12. 

429 _control_char_table = str.maketrans( 

430 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]} 

431 ) 

432 _control_char_table[ord("\\")] = r"\\" 

433 

434 def log_request(self, code: int | str = "-", size: int | str = "-") -> None: 

435 try: 

436 path = uri_to_iri(self.path) 

437 msg = f"{self.command} {path} {self.request_version}" 

438 except AttributeError: 

439 # path isn't set if the requestline was bad 

440 msg = self.requestline 

441 

442 # Escape control characters that may be in the decoded path. 

443 msg = msg.translate(self._control_char_table) 

444 code = str(code) 

445 

446 if code[0] == "1": # 1xx - Informational 

447 msg = _ansi_style(msg, "bold") 

448 elif code == "200": # 2xx - Success 

449 pass 

450 elif code == "304": # 304 - Resource Not Modified 

451 msg = _ansi_style(msg, "cyan") 

452 elif code[0] == "3": # 3xx - Redirection 

453 msg = _ansi_style(msg, "green") 

454 elif code == "404": # 404 - Resource Not Found 

455 msg = _ansi_style(msg, "yellow") 

456 elif code[0] == "4": # 4xx - Client Error 

457 msg = _ansi_style(msg, "bold", "red") 

458 else: # 5xx, or any other response 

459 msg = _ansi_style(msg, "bold", "magenta") 

460 

461 self.log("info", '"%s" %s %s', msg, code, size) 

462 

463 def log_error(self, format: str, *args: t.Any) -> None: 

464 self.log("error", format, *args) 

465 

466 def log_message(self, format: str, *args: t.Any) -> None: 

467 self.log("info", format, *args) 

468 

469 def log(self, type: str, message: str, *args: t.Any) -> None: 

470 _log( 

471 type, 

472 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n", 

473 *args, 

474 ) 

475 

476 

477def _ansi_style(value: str, *styles: str) -> str: 

478 if not _log_add_style: 

479 return value 

480 

481 codes = { 

482 "bold": 1, 

483 "red": 31, 

484 "green": 32, 

485 "yellow": 33, 

486 "magenta": 35, 

487 "cyan": 36, 

488 } 

489 

490 for style in styles: 

491 value = f"\x1b[{codes[style]}m{value}" 

492 

493 return f"{value}\x1b[0m" 

494 

495 

496def generate_adhoc_ssl_pair( 

497 cn: str | None = None, 

498) -> tuple[Certificate, RSAPrivateKeyWithSerialization]: 

499 try: 

500 from cryptography import x509 

501 from cryptography.x509.oid import NameOID 

502 from cryptography.hazmat.backends import default_backend 

503 from cryptography.hazmat.primitives import hashes 

504 from cryptography.hazmat.primitives.asymmetric import rsa 

505 except ImportError: 

506 raise TypeError( 

507 "Using ad-hoc certificates requires the cryptography library." 

508 ) from None 

509 

510 backend = default_backend() 

511 pkey = rsa.generate_private_key( 

512 public_exponent=65537, key_size=2048, backend=backend 

513 ) 

514 

515 # pretty damn sure that this is not actually accepted by anyone 

516 if cn is None: 

517 cn = "*" 

518 

519 subject = x509.Name( 

520 [ 

521 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"), 

522 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

523 ] 

524 ) 

525 

526 backend = default_backend() 

527 cert = ( 

528 x509.CertificateBuilder() 

529 .subject_name(subject) 

530 .issuer_name(subject) 

531 .public_key(pkey.public_key()) 

532 .serial_number(x509.random_serial_number()) 

533 .not_valid_before(dt.now(timezone.utc)) 

534 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365)) 

535 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False) 

536 .add_extension(x509.SubjectAlternativeName([x509.DNSName(cn)]), critical=False) 

537 .sign(pkey, hashes.SHA256(), backend) 

538 ) 

539 return cert, pkey 

540 

541 

542def make_ssl_devcert( 

543 base_path: str, host: str | None = None, cn: str | None = None 

544) -> tuple[str, str]: 

545 """Creates an SSL key for development. This should be used instead of 

546 the ``'adhoc'`` key which generates a new cert on each server start. 

547 It accepts a path for where it should store the key and cert and 

548 either a host or CN. If a host is given it will use the CN 

549 ``*.host/CN=host``. 

550 

551 For more information see :func:`run_simple`. 

552 

553 .. versionadded:: 0.9 

554 

555 :param base_path: the path to the certificate and key. The extension 

556 ``.crt`` is added for the certificate, ``.key`` is 

557 added for the key. 

558 :param host: the name of the host. This can be used as an alternative 

559 for the `cn`. 

560 :param cn: the `CN` to use. 

561 """ 

562 

563 if host is not None: 

564 cn = f"*.{host}/CN={host}" 

565 cert, pkey = generate_adhoc_ssl_pair(cn=cn) 

566 

567 from cryptography.hazmat.primitives import serialization 

568 

569 cert_file = f"{base_path}.crt" 

570 pkey_file = f"{base_path}.key" 

571 

572 with open(cert_file, "wb") as f: 

573 f.write(cert.public_bytes(serialization.Encoding.PEM)) 

574 with open(pkey_file, "wb") as f: 

575 f.write( 

576 pkey.private_bytes( 

577 encoding=serialization.Encoding.PEM, 

578 format=serialization.PrivateFormat.TraditionalOpenSSL, 

579 encryption_algorithm=serialization.NoEncryption(), 

580 ) 

581 ) 

582 

583 return cert_file, pkey_file 

584 

585 

586def generate_adhoc_ssl_context() -> ssl.SSLContext: 

587 """Generates an adhoc SSL context for the development server.""" 

588 import tempfile 

589 import atexit 

590 

591 cert, pkey = generate_adhoc_ssl_pair() 

592 

593 from cryptography.hazmat.primitives import serialization 

594 

595 cert_handle, cert_file = tempfile.mkstemp() 

596 pkey_handle, pkey_file = tempfile.mkstemp() 

597 atexit.register(os.remove, pkey_file) 

598 atexit.register(os.remove, cert_file) 

599 

600 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM)) 

601 os.write( 

602 pkey_handle, 

603 pkey.private_bytes( 

604 encoding=serialization.Encoding.PEM, 

605 format=serialization.PrivateFormat.TraditionalOpenSSL, 

606 encryption_algorithm=serialization.NoEncryption(), 

607 ), 

608 ) 

609 

610 os.close(cert_handle) 

611 os.close(pkey_handle) 

612 ctx = load_ssl_context(cert_file, pkey_file) 

613 return ctx 

614 

615 

616def load_ssl_context( 

617 cert_file: str, pkey_file: str | None = None, protocol: int | None = None 

618) -> ssl.SSLContext: 

619 """Loads SSL context from cert/private key files and optional protocol. 

620 Many parameters are directly taken from the API of 

621 :py:class:`ssl.SSLContext`. 

622 

623 :param cert_file: Path of the certificate to use. 

624 :param pkey_file: Path of the private key to use. If not given, the key 

625 will be obtained from the certificate file. 

626 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module. 

627 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`. 

628 """ 

629 if protocol is None: 

630 protocol = ssl.PROTOCOL_TLS_SERVER 

631 

632 ctx = ssl.SSLContext(protocol) 

633 ctx.load_cert_chain(cert_file, pkey_file) 

634 return ctx 

635 

636 

637def is_ssl_error(error: Exception | None = None) -> bool: 

638 """Checks if the given error (or the current one) is an SSL error.""" 

639 if error is None: 

640 error = t.cast(Exception, sys.exc_info()[1]) 

641 return isinstance(error, ssl.SSLError) 

642 

643 

644def select_address_family(host: str, port: int) -> socket.AddressFamily: 

645 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on 

646 the host and port.""" 

647 if host.startswith("unix://"): 

648 return socket.AF_UNIX 

649 elif ":" in host and hasattr(socket, "AF_INET6"): 

650 return socket.AF_INET6 

651 return socket.AF_INET 

652 

653 

654def get_sockaddr( 

655 host: str, port: int, family: socket.AddressFamily 

656) -> tuple[str, int] | str: 

657 """Return a fully qualified socket address that can be passed to 

658 :func:`socket.bind`.""" 

659 if family == af_unix: 

660 # Absolute path avoids IDNA encoding error when path starts with dot. 

661 return os.path.abspath(host.partition("://")[2]) 

662 try: 

663 res = socket.getaddrinfo( 

664 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP 

665 ) 

666 except socket.gaierror: 

667 return host, port 

668 return res[0][4] # type: ignore 

669 

670 

671def get_interface_ip(family: socket.AddressFamily) -> str: 

672 """Get the IP address of an external interface. Used when binding to 

673 0.0.0.0 or ::1 to show a more useful URL. 

674 

675 :meta private: 

676 """ 

677 # arbitrary private address 

678 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219" 

679 

680 with socket.socket(family, socket.SOCK_DGRAM) as s: 

681 try: 

682 s.connect((host, 58162)) 

683 except OSError: 

684 return "::1" if family == socket.AF_INET6 else "127.0.0.1" 

685 

686 return s.getsockname()[0] # type: ignore 

687 

688 

689class BaseWSGIServer(HTTPServer): 

690 """A WSGI server that that handles one request at a time. 

691 

692 Use :func:`make_server` to create a server instance. 

693 """ 

694 

695 multithread = False 

696 multiprocess = False 

697 request_queue_size = LISTEN_QUEUE 

698 allow_reuse_address = True 

699 

700 def __init__( 

701 self, 

702 host: str, 

703 port: int, 

704 app: WSGIApplication, 

705 handler: type[WSGIRequestHandler] | None = None, 

706 passthrough_errors: bool = False, 

707 ssl_context: _TSSLContextArg | None = None, 

708 fd: int | None = None, 

709 ) -> None: 

710 if handler is None: 

711 handler = WSGIRequestHandler 

712 

713 # If the handler doesn't directly set a protocol version and 

714 # thread or process workers are used, then allow chunked 

715 # responses and keep-alive connections by enabling HTTP/1.1. 

716 if "protocol_version" not in vars(handler) and ( 

717 self.multithread or self.multiprocess 

718 ): 

719 handler.protocol_version = "HTTP/1.1" 

720 

721 self.host = host 

722 self.port = port 

723 self.app = app 

724 self.passthrough_errors = passthrough_errors 

725 

726 self.address_family = address_family = select_address_family(host, port) 

727 server_address = get_sockaddr(host, int(port), address_family) 

728 

729 # Remove a leftover Unix socket file from a previous run. Don't 

730 # remove a file that was set up by run_simple. 

731 if address_family == af_unix and fd is None: 

732 server_address = t.cast(str, server_address) 

733 

734 if os.path.exists(server_address): 

735 os.unlink(server_address) 

736 

737 # Bind and activate will be handled manually, it should only 

738 # happen if we're not using a socket that was already set up. 

739 super().__init__( 

740 server_address, # type: ignore[arg-type] 

741 handler, 

742 bind_and_activate=False, 

743 ) 

744 

745 if fd is None: 

746 # No existing socket descriptor, do bind_and_activate=True. 

747 try: 

748 self.server_bind() 

749 self.server_activate() 

750 except OSError as e: 

751 # Catch connection issues and show them without the traceback. Show 

752 # extra instructions for address not found, and for macOS. 

753 self.server_close() 

754 print(e.strerror, file=sys.stderr) 

755 

756 if e.errno == errno.EADDRINUSE: 

757 print( 

758 f"Port {port} is in use by another program. Either identify and" 

759 " stop that program, or start the server with a different" 

760 " port.", 

761 file=sys.stderr, 

762 ) 

763 

764 if sys.platform == "darwin" and port == 5000: 

765 print( 

766 "On macOS, try disabling the 'AirPlay Receiver' service" 

767 " from System Preferences -> Sharing.", 

768 file=sys.stderr, 

769 ) 

770 

771 sys.exit(1) 

772 except BaseException: 

773 self.server_close() 

774 raise 

775 else: 

776 # TCPServer automatically opens a socket even if bind_and_activate is False. 

777 # Close it to silence a ResourceWarning. 

778 self.server_close() 

779 

780 # Use the passed in socket directly. 

781 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM) 

782 self.server_address = self.socket.getsockname() 

783 

784 if address_family != af_unix: 

785 # If port was 0, this will record the bound port. 

786 self.port = self.server_address[1] 

787 

788 if ssl_context is not None: 

789 if isinstance(ssl_context, tuple): 

790 ssl_context = load_ssl_context(*ssl_context) 

791 elif ssl_context == "adhoc": 

792 ssl_context = generate_adhoc_ssl_context() 

793 

794 self.socket = ssl_context.wrap_socket(self.socket, server_side=True) 

795 self.ssl_context: ssl.SSLContext | None = ssl_context 

796 else: 

797 self.ssl_context = None 

798 

799 def log(self, type: str, message: str, *args: t.Any) -> None: 

800 _log(type, message, *args) 

801 

802 def serve_forever(self, poll_interval: float = 0.5) -> None: 

803 try: 

804 super().serve_forever(poll_interval=poll_interval) 

805 except KeyboardInterrupt: 

806 pass 

807 finally: 

808 self.server_close() 

809 

810 def handle_error( 

811 self, request: t.Any, client_address: tuple[str, int] | str 

812 ) -> None: 

813 if self.passthrough_errors: 

814 raise 

815 

816 return super().handle_error(request, client_address) 

817 

818 def log_startup(self) -> None: 

819 """Show information about the address when starting the server.""" 

820 dev_warning = ( 

821 "WARNING: This is a development server. Do not use it in a production" 

822 " deployment. Use a production WSGI server instead." 

823 ) 

824 dev_warning = _ansi_style(dev_warning, "bold", "red") 

825 messages = [dev_warning] 

826 

827 if self.address_family == af_unix: 

828 messages.append(f" * Running on {self.host}") 

829 else: 

830 scheme = "http" if self.ssl_context is None else "https" 

831 display_hostname = self.host 

832 

833 if self.host in {"0.0.0.0", "::"}: 

834 messages.append(f" * Running on all addresses ({self.host})") 

835 

836 if self.host == "0.0.0.0": 

837 localhost = "127.0.0.1" 

838 display_hostname = get_interface_ip(socket.AF_INET) 

839 else: 

840 localhost = "[::1]" 

841 display_hostname = get_interface_ip(socket.AF_INET6) 

842 

843 messages.append(f" * Running on {scheme}://{localhost}:{self.port}") 

844 

845 if ":" in display_hostname: 

846 display_hostname = f"[{display_hostname}]" 

847 

848 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}") 

849 

850 _log("info", "\n".join(messages)) 

851 

852 

853class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer): 

854 """A WSGI server that handles concurrent requests in separate 

855 threads. 

856 

857 Use :func:`make_server` to create a server instance. 

858 """ 

859 

860 multithread = True 

861 daemon_threads = True 

862 

863 

864class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer): 

865 """A WSGI server that handles concurrent requests in separate forked 

866 processes. 

867 

868 Use :func:`make_server` to create a server instance. 

869 """ 

870 

871 multiprocess = True 

872 

873 def __init__( 

874 self, 

875 host: str, 

876 port: int, 

877 app: WSGIApplication, 

878 processes: int = 40, 

879 handler: type[WSGIRequestHandler] | None = None, 

880 passthrough_errors: bool = False, 

881 ssl_context: _TSSLContextArg | None = None, 

882 fd: int | None = None, 

883 ) -> None: 

884 if not can_fork: 

885 raise ValueError("Your platform does not support forking.") 

886 

887 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd) 

888 self.max_children = processes 

889 

890 

891def make_server( 

892 host: str, 

893 port: int, 

894 app: WSGIApplication, 

895 threaded: bool = False, 

896 processes: int = 1, 

897 request_handler: type[WSGIRequestHandler] | None = None, 

898 passthrough_errors: bool = False, 

899 ssl_context: _TSSLContextArg | None = None, 

900 fd: int | None = None, 

901) -> BaseWSGIServer: 

902 """Create an appropriate WSGI server instance based on the value of 

903 ``threaded`` and ``processes``. 

904 

905 This is called from :func:`run_simple`, but can be used separately 

906 to have access to the server object, such as to run it in a separate 

907 thread. 

908 

909 See :func:`run_simple` for parameter docs. 

910 """ 

911 if threaded and processes > 1: 

912 raise ValueError("Cannot have a multi-thread and multi-process server.") 

913 

914 if threaded: 

915 return ThreadedWSGIServer( 

916 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

917 ) 

918 

919 if processes > 1: 

920 return ForkingWSGIServer( 

921 host, 

922 port, 

923 app, 

924 processes, 

925 request_handler, 

926 passthrough_errors, 

927 ssl_context, 

928 fd=fd, 

929 ) 

930 

931 return BaseWSGIServer( 

932 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd 

933 ) 

934 

935 

936def is_running_from_reloader() -> bool: 

937 """Check if the server is running as a subprocess within the 

938 Werkzeug reloader. 

939 

940 .. versionadded:: 0.10 

941 """ 

942 return os.environ.get("WERKZEUG_RUN_MAIN") == "true" 

943 

944 

945def run_simple( 

946 hostname: str, 

947 port: int, 

948 application: WSGIApplication, 

949 use_reloader: bool = False, 

950 use_debugger: bool = False, 

951 use_evalex: bool = True, 

952 extra_files: t.Iterable[str] | None = None, 

953 exclude_patterns: t.Iterable[str] | None = None, 

954 reloader_interval: int = 1, 

955 reloader_type: str = "auto", 

956 threaded: bool = False, 

957 processes: int = 1, 

958 request_handler: type[WSGIRequestHandler] | None = None, 

959 static_files: dict[str, str | tuple[str, str]] | None = None, 

960 passthrough_errors: bool = False, 

961 ssl_context: _TSSLContextArg | None = None, 

962) -> None: 

963 """Start a development server for a WSGI application. Various 

964 optional features can be enabled. 

965 

966 .. warning:: 

967 

968 Do not use the development server when deploying to production. 

969 It is intended for use only during local development. It is not 

970 designed to be particularly efficient, stable, or secure. 

971 

972 :param hostname: The host to bind to, for example ``'localhost'``. 

973 Can be a domain, IPv4 or IPv6 address, or file path starting 

974 with ``unix://`` for a Unix socket. 

975 :param port: The port to bind to, for example ``8080``. Using ``0`` 

976 tells the OS to pick a random free port. 

977 :param application: The WSGI application to run. 

978 :param use_reloader: Use a reloader process to restart the server 

979 process when files are changed. 

980 :param use_debugger: Use Werkzeug's debugger, which will show 

981 formatted tracebacks on unhandled exceptions. 

982 :param use_evalex: Make the debugger interactive. A Python terminal 

983 can be opened for any frame in the traceback. Some protection is 

984 provided by requiring a PIN, but this should never be enabled 

985 on a publicly visible server. 

986 :param extra_files: The reloader will watch these files for changes 

987 in addition to Python modules. For example, watch a 

988 configuration file. 

989 :param exclude_patterns: The reloader will ignore changes to any 

990 files matching these :mod:`fnmatch` patterns. For example, 

991 ignore cache files. 

992 :param reloader_interval: How often the reloader tries to check for 

993 changes. 

994 :param reloader_type: The reloader to use. The ``'stat'`` reloader 

995 is built in, but may require significant CPU to watch files. The 

996 ``'watchdog'`` reloader is much more efficient but requires 

997 installing the ``watchdog`` package first. 

998 :param threaded: Handle concurrent requests using threads. Cannot be 

999 used with ``processes``. 

1000 :param processes: Handle concurrent requests using up to this number 

1001 of processes. Cannot be used with ``threaded``. 

1002 :param request_handler: Use a different 

1003 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to 

1004 handle requests. 

1005 :param static_files: A dict mapping URL prefixes to directories to 

1006 serve static files from using 

1007 :class:`~werkzeug.middleware.SharedDataMiddleware`. 

1008 :param passthrough_errors: Don't catch unhandled exceptions at the 

1009 server level, let the server crash instead. If ``use_debugger`` 

1010 is enabled, the debugger will still catch such errors. 

1011 :param ssl_context: Configure TLS to serve over HTTPS. Can be an 

1012 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)`` 

1013 tuple to create a typical context, or the string ``'adhoc'`` to 

1014 generate a temporary self-signed certificate. 

1015 

1016 .. versionchanged:: 2.1 

1017 Instructions are shown for dealing with an "address already in 

1018 use" error. 

1019 

1020 .. versionchanged:: 2.1 

1021 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in 

1022 addition to a real IP. 

1023 

1024 .. versionchanged:: 2.1 

1025 The command-line interface was removed. 

1026 

1027 .. versionchanged:: 2.0 

1028 Running on ``0.0.0.0`` or ``::`` shows a real IP address that 

1029 was bound as well as a warning not to run the development server 

1030 in production. 

1031 

1032 .. versionchanged:: 2.0 

1033 The ``exclude_patterns`` parameter was added. 

1034 

1035 .. versionchanged:: 0.15 

1036 Bind to a Unix socket by passing a ``hostname`` that starts with 

1037 ``unix://``. 

1038 

1039 .. versionchanged:: 0.10 

1040 Improved the reloader and added support for changing the backend 

1041 through the ``reloader_type`` parameter. 

1042 

1043 .. versionchanged:: 0.9 

1044 A command-line interface was added. 

1045 

1046 .. versionchanged:: 0.8 

1047 ``ssl_context`` can be a tuple of paths to the certificate and 

1048 private key files. 

1049 

1050 .. versionchanged:: 0.6 

1051 The ``ssl_context`` parameter was added. 

1052 

1053 .. versionchanged:: 0.5 

1054 The ``static_files`` and ``passthrough_errors`` parameters were 

1055 added. 

1056 """ 

1057 if not isinstance(port, int): 

1058 raise TypeError("port must be an integer") 

1059 

1060 if static_files: 

1061 from .middleware.shared_data import SharedDataMiddleware 

1062 

1063 application = SharedDataMiddleware(application, static_files) 

1064 

1065 if use_debugger: 

1066 from .debug import DebuggedApplication 

1067 

1068 application = DebuggedApplication(application, evalex=use_evalex) 

1069 

1070 if not is_running_from_reloader(): 

1071 fd = None 

1072 else: 

1073 fd = int(os.environ["WERKZEUG_SERVER_FD"]) 

1074 

1075 srv = make_server( 

1076 hostname, 

1077 port, 

1078 application, 

1079 threaded, 

1080 processes, 

1081 request_handler, 

1082 passthrough_errors, 

1083 ssl_context, 

1084 fd=fd, 

1085 ) 

1086 srv.socket.set_inheritable(True) 

1087 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno()) 

1088 

1089 if not is_running_from_reloader(): 

1090 srv.log_startup() 

1091 _log("info", _ansi_style("Press CTRL+C to quit", "yellow")) 

1092 

1093 if use_reloader: 

1094 from ._reloader import run_with_reloader 

1095 

1096 try: 

1097 run_with_reloader( 

1098 srv.serve_forever, 

1099 extra_files=extra_files, 

1100 exclude_patterns=exclude_patterns, 

1101 interval=reloader_interval, 

1102 reloader_type=reloader_type, 

1103 ) 

1104 finally: 

1105 srv.server_close() 

1106 else: 

1107 srv.serve_forever()