Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/serving.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
9.. code-block:: python
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
15from __future__ import annotations
17import errno
18import io
19import os
20import selectors
21import socket
22import socketserver
23import sys
24import typing as t
25from datetime import datetime as dt
26from datetime import timedelta
27from datetime import timezone
28from http.server import BaseHTTPRequestHandler
29from http.server import HTTPServer
30from urllib.parse import unquote
31from urllib.parse import urlsplit
33from ._internal import _log
34from ._internal import _wsgi_encoding_dance
35from .exceptions import InternalServerError
36from .urls import uri_to_iri
38try:
39 import ssl
41 connection_dropped_errors: tuple[type[Exception], ...] = (
42 ConnectionError,
43 socket.timeout,
44 ssl.SSLEOFError,
45 )
46except ImportError:
48 class _SslDummy:
49 def __getattr__(self, name: str) -> t.Any:
50 raise RuntimeError( # noqa: B904
51 "SSL is unavailable because this Python runtime was not"
52 " compiled with SSL/TLS support."
53 )
55 ssl = _SslDummy() # type: ignore
56 connection_dropped_errors = (ConnectionError, socket.timeout)
58_log_add_style = True
60if os.name == "nt":
61 try:
62 __import__("colorama")
63 except ImportError:
64 _log_add_style = False
66can_fork = hasattr(os, "fork")
68if can_fork:
69 ForkingMixIn = socketserver.ForkingMixIn
70else:
72 class ForkingMixIn: # type: ignore
73 pass
76try:
77 af_unix = socket.AF_UNIX
78except AttributeError:
79 af_unix = None # type: ignore
81LISTEN_QUEUE = 128
83_TSSLContextArg = t.Optional[
84 t.Union["ssl.SSLContext", tuple[str, t.Optional[str]], t.Literal["adhoc"]]
85]
87if t.TYPE_CHECKING:
88 from _typeshed.wsgi import WSGIApplication
89 from _typeshed.wsgi import WSGIEnvironment
90 from cryptography.hazmat.primitives.asymmetric.rsa import (
91 RSAPrivateKeyWithSerialization,
92 )
93 from cryptography.x509 import Certificate
96class DechunkedInput(io.RawIOBase):
97 """An input stream that handles Transfer-Encoding 'chunked'"""
99 def __init__(self, rfile: t.IO[bytes]) -> None:
100 self._rfile = rfile
101 self._done = False
102 self._len = 0
104 def readable(self) -> bool:
105 return True
107 def read_chunk_len(self) -> int:
108 try:
109 line = self._rfile.readline().decode("latin1")
110 _len = int(line.strip(), 16)
111 except ValueError as e:
112 raise OSError("Invalid chunk header") from e
113 if _len < 0:
114 raise OSError("Negative chunk length not allowed")
115 return _len
117 def readinto(self, buf: bytearray) -> int: # type: ignore
118 read = 0
119 while not self._done and read < len(buf):
120 if self._len == 0:
121 # This is the first chunk or we fully consumed the previous
122 # one. Read the next length of the next chunk
123 self._len = self.read_chunk_len()
125 if self._len == 0:
126 # Found the final chunk of size 0. The stream is now exhausted,
127 # but there is still a final newline that should be consumed
128 self._done = True
130 if self._len > 0:
131 # There is data (left) in this chunk, so append it to the
132 # buffer. If this operation fully consumes the chunk, this will
133 # reset self._len to 0.
134 n = min(len(buf), self._len)
136 # If (read + chunk size) becomes more than len(buf), buf will
137 # grow beyond the original size and read more data than
138 # required. So only read as much data as can fit in buf.
139 if read + n > len(buf):
140 buf[read:] = self._rfile.read(len(buf) - read)
141 self._len -= len(buf) - read
142 read = len(buf)
143 else:
144 buf[read : read + n] = self._rfile.read(n)
145 self._len -= n
146 read += n
148 if self._len == 0:
149 # Skip the terminating newline of a chunk that has been fully
150 # consumed. This also applies to the 0-sized final chunk
151 terminator = self._rfile.readline()
152 if terminator not in (b"\n", b"\r\n", b"\r"):
153 raise OSError("Missing chunk terminating newline")
155 return read
158class WSGIRequestHandler(BaseHTTPRequestHandler):
159 """A request handler that implements WSGI dispatching."""
161 server: BaseWSGIServer
163 @property
164 def server_version(self) -> str: # type: ignore
165 return self.server._server_version
167 def make_environ(self) -> WSGIEnvironment:
168 request_url = urlsplit(self.path)
169 url_scheme = "http" if self.server.ssl_context is None else "https"
171 if not self.client_address:
172 self.client_address = ("<local>", 0)
173 elif isinstance(self.client_address, str):
174 self.client_address = (self.client_address, 0)
176 # If there was no scheme but the path started with two slashes,
177 # the first segment may have been incorrectly parsed as the
178 # netloc, prepend it to the path again.
179 if not request_url.scheme and request_url.netloc:
180 path_info = f"/{request_url.netloc}{request_url.path}"
181 else:
182 path_info = request_url.path
184 path_info = unquote(path_info)
186 environ: WSGIEnvironment = {
187 "wsgi.version": (1, 0),
188 "wsgi.url_scheme": url_scheme,
189 "wsgi.input": self.rfile,
190 "wsgi.errors": sys.stderr,
191 "wsgi.multithread": self.server.multithread,
192 "wsgi.multiprocess": self.server.multiprocess,
193 "wsgi.run_once": False,
194 "werkzeug.socket": self.connection,
195 "SERVER_SOFTWARE": self.server_version,
196 "REQUEST_METHOD": self.command,
197 "SCRIPT_NAME": "",
198 "PATH_INFO": _wsgi_encoding_dance(path_info),
199 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
200 # Non-standard, added by mod_wsgi, uWSGI
201 "REQUEST_URI": _wsgi_encoding_dance(self.path),
202 # Non-standard, added by gunicorn
203 "RAW_URI": _wsgi_encoding_dance(self.path),
204 "REMOTE_ADDR": self.address_string(),
205 "REMOTE_PORT": self.port_integer(),
206 "SERVER_NAME": self.server.server_address[0],
207 "SERVER_PORT": str(self.server.server_address[1]),
208 "SERVER_PROTOCOL": self.request_version,
209 }
211 for key, value in self.headers.items():
212 if "_" in key:
213 continue
215 key = key.upper().replace("-", "_")
216 value = value.replace("\r\n", "")
217 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
218 key = f"HTTP_{key}"
219 if key in environ:
220 value = f"{environ[key]},{value}"
221 environ[key] = value
223 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
224 environ["wsgi.input_terminated"] = True
225 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
227 # Per RFC 2616, if the URL is absolute, use that as the host.
228 # We're using "has a scheme" to indicate an absolute URL.
229 if request_url.scheme and request_url.netloc:
230 environ["HTTP_HOST"] = request_url.netloc
232 try:
233 # binary_form=False gives nicer information, but wouldn't be compatible with
234 # what Nginx or Apache could return.
235 peer_cert = self.connection.getpeercert(binary_form=True)
236 if peer_cert is not None:
237 # Nginx and Apache use PEM format.
238 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
239 except ValueError:
240 # SSL handshake hasn't finished.
241 self.server.log("error", "Cannot fetch SSL peer certificate info")
242 except AttributeError:
243 # Not using TLS, the socket will not have getpeercert().
244 pass
246 return environ
248 def run_wsgi(self) -> None:
249 if self.headers.get("Expect", "").lower().strip() == "100-continue":
250 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
252 self.environ = environ = self.make_environ()
253 status_set: str | None = None
254 headers_set: list[tuple[str, str]] | None = None
255 status_sent: str | None = None
256 headers_sent: list[tuple[str, str]] | None = None
257 chunk_response: bool = False
259 def write(data: bytes) -> None:
260 nonlocal status_sent, headers_sent, chunk_response
261 assert status_set is not None, "write() before start_response"
262 assert headers_set is not None, "write() before start_response"
263 if status_sent is None:
264 status_sent = status_set
265 headers_sent = headers_set
266 try:
267 code_str, msg = status_sent.split(None, 1)
268 except ValueError:
269 code_str, msg = status_sent, ""
270 code = int(code_str)
271 self.send_response(code, msg)
272 header_keys = set()
273 for key, value in headers_sent:
274 self.send_header(key, value)
275 header_keys.add(key.lower())
277 # Use chunked transfer encoding if there is no content
278 # length. Do not use for 1xx and 204 responses. 304
279 # responses and HEAD requests are also excluded, which
280 # is the more conservative behavior and matches other
281 # parts of the code.
282 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
283 if (
284 not (
285 "content-length" in header_keys
286 or environ["REQUEST_METHOD"] == "HEAD"
287 or (100 <= code < 200)
288 or code in {204, 304}
289 )
290 and self.protocol_version >= "HTTP/1.1"
291 ):
292 chunk_response = True
293 self.send_header("Transfer-Encoding", "chunked")
295 # Always close the connection. This disables HTTP/1.1
296 # keep-alive connections. They aren't handled well by
297 # Python's http.server because it doesn't know how to
298 # drain the stream before the next request line.
299 self.send_header("Connection", "close")
300 self.end_headers()
302 assert isinstance(data, bytes), "applications must write bytes"
304 if data:
305 if chunk_response:
306 self.wfile.write(hex(len(data))[2:].encode())
307 self.wfile.write(b"\r\n")
309 self.wfile.write(data)
311 if chunk_response:
312 self.wfile.write(b"\r\n")
314 self.wfile.flush()
316 def start_response(status, headers, exc_info=None): # type: ignore
317 nonlocal status_set, headers_set
318 if exc_info:
319 try:
320 if headers_sent:
321 raise exc_info[1].with_traceback(exc_info[2])
322 finally:
323 exc_info = None
324 elif headers_set:
325 raise AssertionError("Headers already set")
326 status_set = status
327 headers_set = headers
328 return write
330 def execute(app: WSGIApplication) -> None:
331 application_iter = app(environ, start_response)
332 try:
333 for data in application_iter:
334 write(data)
335 if not headers_sent:
336 write(b"")
337 if chunk_response:
338 self.wfile.write(b"0\r\n\r\n")
339 finally:
340 # Check for any remaining data in the read socket, and discard it. This
341 # will read past request.max_content_length, but lets the client see a
342 # 413 response instead of a connection reset failure. If we supported
343 # keep-alive connections, this naive approach would break by reading the
344 # next request line. Since we know that write (above) closes every
345 # connection we can read everything.
346 selector = selectors.DefaultSelector()
347 selector.register(self.connection, selectors.EVENT_READ)
348 total_size = 0
349 total_reads = 0
351 # A timeout of 0 tends to fail because a client needs a small amount of
352 # time to continue sending its data.
353 while selector.select(timeout=0.01):
354 # Only read 10MB into memory at a time.
355 data = self.rfile.read(10_000_000)
356 total_size += len(data)
357 total_reads += 1
359 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends
360 # more than that, they'll get a connection reset failure.
361 if not data or total_size >= 10_000_000_000 or total_reads > 1000:
362 break
364 selector.close()
366 if hasattr(application_iter, "close"):
367 application_iter.close()
369 try:
370 execute(self.server.app)
371 except connection_dropped_errors as e:
372 self.connection_dropped(e, environ)
373 except Exception as e:
374 if self.server.passthrough_errors:
375 raise
377 if status_sent is not None and chunk_response:
378 self.close_connection = True
380 try:
381 # if we haven't yet sent the headers but they are set
382 # we roll back to be able to set them again.
383 if status_sent is None:
384 status_set = None
385 headers_set = None
386 execute(InternalServerError())
387 except Exception:
388 pass
390 from .debug.tbtools import DebugTraceback
392 msg = DebugTraceback(e).render_traceback_text()
393 self.server.log("error", f"Error on request:\n{msg}")
395 def handle(self) -> None:
396 """Handles a request ignoring dropped connections."""
397 try:
398 super().handle()
399 except (ConnectionError, socket.timeout) as e:
400 self.connection_dropped(e)
401 except Exception as e:
402 if self.server.ssl_context is not None and is_ssl_error(e):
403 self.log_error("SSL error occurred: %s", e)
404 else:
405 raise
407 def connection_dropped(
408 self, error: BaseException, environ: WSGIEnvironment | None = None
409 ) -> None:
410 """Called if the connection was closed by the client. By default
411 nothing happens.
412 """
414 def __getattr__(self, name: str) -> t.Any:
415 # All HTTP methods are handled by run_wsgi.
416 if name.startswith("do_"):
417 return self.run_wsgi
419 # All other attributes are forwarded to the base class.
420 return getattr(super(), name)
422 def address_string(self) -> str:
423 if getattr(self, "environ", None):
424 return self.environ["REMOTE_ADDR"] # type: ignore
426 if not self.client_address:
427 return "<local>"
429 return self.client_address[0]
431 def port_integer(self) -> int:
432 return self.client_address[1]
434 # Escape control characters. This is defined (but private) in Python 3.12.
435 _control_char_table = str.maketrans(
436 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]}
437 )
438 _control_char_table[ord("\\")] = r"\\"
440 def log_request(self, code: int | str = "-", size: int | str = "-") -> None:
441 try:
442 path = uri_to_iri(self.path)
443 msg = f"{self.command} {path} {self.request_version}"
444 except AttributeError:
445 # path isn't set if the requestline was bad
446 msg = self.requestline
448 # Escape control characters that may be in the decoded path.
449 msg = msg.translate(self._control_char_table)
450 code = str(code)
452 if code[0] == "1": # 1xx - Informational
453 msg = _ansi_style(msg, "bold")
454 elif code == "200": # 2xx - Success
455 pass
456 elif code == "304": # 304 - Resource Not Modified
457 msg = _ansi_style(msg, "cyan")
458 elif code[0] == "3": # 3xx - Redirection
459 msg = _ansi_style(msg, "green")
460 elif code == "404": # 404 - Resource Not Found
461 msg = _ansi_style(msg, "yellow")
462 elif code[0] == "4": # 4xx - Client Error
463 msg = _ansi_style(msg, "bold", "red")
464 else: # 5xx, or any other response
465 msg = _ansi_style(msg, "bold", "magenta")
467 self.log("info", '"%s" %s %s', msg, code, size)
469 def log_error(self, format: str, *args: t.Any) -> None:
470 self.log("error", format, *args)
472 def log_message(self, format: str, *args: t.Any) -> None:
473 self.log("info", format, *args)
475 def log(self, type: str, message: str, *args: t.Any) -> None:
476 # an IPv6 scoped address contains "%" which breaks logging
477 address_string = self.address_string().replace("%", "%%")
478 _log(
479 type,
480 f"{address_string} - - [{self.log_date_time_string()}] {message}\n",
481 *args,
482 )
485def _ansi_style(value: str, *styles: str) -> str:
486 if not _log_add_style:
487 return value
489 codes = {
490 "bold": 1,
491 "red": 31,
492 "green": 32,
493 "yellow": 33,
494 "magenta": 35,
495 "cyan": 36,
496 }
498 for style in styles:
499 value = f"\x1b[{codes[style]}m{value}"
501 return f"{value}\x1b[0m"
504def generate_adhoc_ssl_pair(
505 cn: str | None = None,
506) -> tuple[Certificate, RSAPrivateKeyWithSerialization]:
507 try:
508 from cryptography import x509
509 from cryptography.hazmat.backends import default_backend
510 from cryptography.hazmat.primitives import hashes
511 from cryptography.hazmat.primitives.asymmetric import rsa
512 from cryptography.x509.oid import NameOID
513 except ImportError:
514 raise TypeError(
515 "Using ad-hoc certificates requires the cryptography library."
516 ) from None
518 backend = default_backend()
519 pkey = rsa.generate_private_key(
520 public_exponent=65537, key_size=2048, backend=backend
521 )
523 # pretty damn sure that this is not actually accepted by anyone
524 if cn is None:
525 cn = "*"
527 subject = x509.Name(
528 [
529 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
530 x509.NameAttribute(NameOID.COMMON_NAME, cn),
531 ]
532 )
534 backend = default_backend()
535 cert = (
536 x509.CertificateBuilder()
537 .subject_name(subject)
538 .issuer_name(subject)
539 .public_key(pkey.public_key())
540 .serial_number(x509.random_serial_number())
541 .not_valid_before(dt.now(timezone.utc))
542 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
543 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
544 .add_extension(
545 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]),
546 critical=False,
547 )
548 .sign(pkey, hashes.SHA256(), backend)
549 )
550 return cert, pkey
553def make_ssl_devcert(
554 base_path: str, host: str | None = None, cn: str | None = None
555) -> tuple[str, str]:
556 """Creates an SSL key for development. This should be used instead of
557 the ``'adhoc'`` key which generates a new cert on each server start.
558 It accepts a path for where it should store the key and cert and
559 either a host or CN. If a host is given it will use the CN
560 ``*.host/CN=host``.
562 For more information see :func:`run_simple`.
564 .. versionadded:: 0.9
566 :param base_path: the path to the certificate and key. The extension
567 ``.crt`` is added for the certificate, ``.key`` is
568 added for the key.
569 :param host: the name of the host. This can be used as an alternative
570 for the `cn`.
571 :param cn: the `CN` to use.
572 """
574 if host is not None:
575 cn = host
576 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
578 from cryptography.hazmat.primitives import serialization
580 cert_file = f"{base_path}.crt"
581 pkey_file = f"{base_path}.key"
583 with open(cert_file, "wb") as f:
584 f.write(cert.public_bytes(serialization.Encoding.PEM))
585 with open(pkey_file, "wb") as f:
586 f.write(
587 pkey.private_bytes(
588 encoding=serialization.Encoding.PEM,
589 format=serialization.PrivateFormat.TraditionalOpenSSL,
590 encryption_algorithm=serialization.NoEncryption(),
591 )
592 )
594 return cert_file, pkey_file
597def generate_adhoc_ssl_context() -> ssl.SSLContext:
598 """Generates an adhoc SSL context for the development server."""
599 import atexit
600 import tempfile
602 cert, pkey = generate_adhoc_ssl_pair()
604 from cryptography.hazmat.primitives import serialization
606 cert_handle, cert_file = tempfile.mkstemp()
607 pkey_handle, pkey_file = tempfile.mkstemp()
608 atexit.register(os.remove, pkey_file)
609 atexit.register(os.remove, cert_file)
611 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
612 os.write(
613 pkey_handle,
614 pkey.private_bytes(
615 encoding=serialization.Encoding.PEM,
616 format=serialization.PrivateFormat.TraditionalOpenSSL,
617 encryption_algorithm=serialization.NoEncryption(),
618 ),
619 )
621 os.close(cert_handle)
622 os.close(pkey_handle)
623 ctx = load_ssl_context(cert_file, pkey_file)
624 return ctx
627def load_ssl_context(
628 cert_file: str, pkey_file: str | None = None, protocol: int | None = None
629) -> ssl.SSLContext:
630 """Loads SSL context from cert/private key files and optional protocol.
631 Many parameters are directly taken from the API of
632 :py:class:`ssl.SSLContext`.
634 :param cert_file: Path of the certificate to use.
635 :param pkey_file: Path of the private key to use. If not given, the key
636 will be obtained from the certificate file.
637 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
638 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
639 """
640 if protocol is None:
641 protocol = ssl.PROTOCOL_TLS_SERVER
643 ctx = ssl.SSLContext(protocol)
644 ctx.load_cert_chain(cert_file, pkey_file)
645 return ctx
648def is_ssl_error(error: Exception | None = None) -> bool:
649 """Checks if the given error (or the current one) is an SSL error."""
650 if error is None:
651 error = t.cast(Exception, sys.exc_info()[1])
652 return isinstance(error, ssl.SSLError)
655def select_address_family(host: str, port: int) -> socket.AddressFamily:
656 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
657 the host and port."""
658 if host.startswith("unix://"):
659 return socket.AF_UNIX
660 elif ":" in host and hasattr(socket, "AF_INET6"):
661 return socket.AF_INET6
662 return socket.AF_INET
665def get_sockaddr(
666 host: str, port: int, family: socket.AddressFamily
667) -> tuple[str, int] | str:
668 """Return a fully qualified socket address that can be passed to
669 :func:`socket.bind`."""
670 if family == af_unix:
671 # Absolute path avoids IDNA encoding error when path starts with dot.
672 return os.path.abspath(host.partition("://")[2])
673 try:
674 res = socket.getaddrinfo(
675 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
676 )
677 except socket.gaierror:
678 return host, port
679 return res[0][4] # type: ignore
682def get_interface_ip(family: socket.AddressFamily) -> str:
683 """Get the IP address of an external interface. Used when binding to
684 0.0.0.0 or ::1 to show a more useful URL.
686 :meta private:
687 """
688 # arbitrary private address
689 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
691 with socket.socket(family, socket.SOCK_DGRAM) as s:
692 try:
693 s.connect((host, 58162))
694 except OSError:
695 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
697 return s.getsockname()[0] # type: ignore
700class BaseWSGIServer(HTTPServer):
701 """A WSGI server that that handles one request at a time.
703 Use :func:`make_server` to create a server instance.
704 """
706 multithread = False
707 multiprocess = False
708 request_queue_size = LISTEN_QUEUE
709 allow_reuse_address = True
711 def __init__(
712 self,
713 host: str,
714 port: int,
715 app: WSGIApplication,
716 handler: type[WSGIRequestHandler] | None = None,
717 passthrough_errors: bool = False,
718 ssl_context: _TSSLContextArg | None = None,
719 fd: int | None = None,
720 ) -> None:
721 if handler is None:
722 handler = WSGIRequestHandler
724 # If the handler doesn't directly set a protocol version and
725 # thread or process workers are used, then allow chunked
726 # responses and keep-alive connections by enabling HTTP/1.1.
727 if "protocol_version" not in vars(handler) and (
728 self.multithread or self.multiprocess
729 ):
730 handler.protocol_version = "HTTP/1.1"
732 self.host = host
733 self.port = port
734 self.app = app
735 self.passthrough_errors = passthrough_errors
737 self.address_family = address_family = select_address_family(host, port)
738 server_address = get_sockaddr(host, int(port), address_family)
740 # Remove a leftover Unix socket file from a previous run. Don't
741 # remove a file that was set up by run_simple.
742 if address_family == af_unix and fd is None:
743 server_address = t.cast(str, server_address)
745 if os.path.exists(server_address):
746 os.unlink(server_address)
748 # Bind and activate will be handled manually, it should only
749 # happen if we're not using a socket that was already set up.
750 super().__init__(
751 server_address, # type: ignore[arg-type]
752 handler,
753 bind_and_activate=False,
754 )
756 if fd is None:
757 # No existing socket descriptor, do bind_and_activate=True.
758 try:
759 self.server_bind()
760 self.server_activate()
761 except OSError as e:
762 # Catch connection issues and show them without the traceback. Show
763 # extra instructions for address not found, and for macOS.
764 self.server_close()
765 print(e.strerror, file=sys.stderr)
767 if e.errno == errno.EADDRINUSE:
768 print(
769 f"Port {port} is in use by another program. Either identify and"
770 " stop that program, or start the server with a different"
771 " port.",
772 file=sys.stderr,
773 )
775 if sys.platform == "darwin" and port == 5000:
776 print(
777 "On macOS, try disabling the 'AirPlay Receiver' service"
778 " from System Preferences -> General -> AirDrop & Handoff.",
779 file=sys.stderr,
780 )
782 sys.exit(1)
783 except BaseException:
784 self.server_close()
785 raise
786 else:
787 # TCPServer automatically opens a socket even if bind_and_activate is False.
788 # Close it to silence a ResourceWarning.
789 self.server_close()
791 # Use the passed in socket directly.
792 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
793 self.server_address = self.socket.getsockname()
795 if address_family != af_unix:
796 # If port was 0, this will record the bound port.
797 self.port = self.server_address[1]
799 if ssl_context is not None:
800 if isinstance(ssl_context, tuple):
801 ssl_context = load_ssl_context(*ssl_context)
802 elif ssl_context == "adhoc":
803 ssl_context = generate_adhoc_ssl_context()
805 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
806 self.ssl_context: ssl.SSLContext | None = ssl_context
807 else:
808 self.ssl_context = None
810 import importlib.metadata
812 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}"
814 def log(self, type: str, message: str, *args: t.Any) -> None:
815 _log(type, message, *args)
817 def serve_forever(self, poll_interval: float = 0.5) -> None:
818 try:
819 super().serve_forever(poll_interval=poll_interval)
820 except KeyboardInterrupt:
821 pass
822 finally:
823 self.server_close()
825 def handle_error(
826 self, request: t.Any, client_address: tuple[str, int] | str
827 ) -> None:
828 if self.passthrough_errors:
829 raise
831 return super().handle_error(request, client_address)
833 def log_startup(self) -> None:
834 """Show information about the address when starting the server."""
835 dev_warning = (
836 "WARNING: This is a development server. Do not use it in a production"
837 " deployment. Use a production WSGI server instead."
838 )
839 dev_warning = _ansi_style(dev_warning, "bold", "red")
840 messages = [dev_warning]
842 if self.address_family == af_unix:
843 messages.append(f" * Running on {self.host}")
844 else:
845 scheme = "http" if self.ssl_context is None else "https"
846 display_hostname = self.host
848 if self.host in {"0.0.0.0", "::"}:
849 messages.append(f" * Running on all addresses ({self.host})")
851 if self.host == "0.0.0.0":
852 localhost = "127.0.0.1"
853 display_hostname = get_interface_ip(socket.AF_INET)
854 else:
855 localhost = "[::1]"
856 display_hostname = get_interface_ip(socket.AF_INET6)
858 messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
860 if ":" in display_hostname:
861 display_hostname = f"[{display_hostname}]"
863 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
865 _log("info", "\n".join(messages))
868class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
869 """A WSGI server that handles concurrent requests in separate
870 threads.
872 Use :func:`make_server` to create a server instance.
873 """
875 multithread = True
876 daemon_threads = True
879class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
880 """A WSGI server that handles concurrent requests in separate forked
881 processes.
883 Use :func:`make_server` to create a server instance.
884 """
886 multiprocess = True
888 def __init__(
889 self,
890 host: str,
891 port: int,
892 app: WSGIApplication,
893 processes: int = 40,
894 handler: type[WSGIRequestHandler] | None = None,
895 passthrough_errors: bool = False,
896 ssl_context: _TSSLContextArg | None = None,
897 fd: int | None = None,
898 ) -> None:
899 if not can_fork:
900 raise ValueError("Your platform does not support forking.")
902 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
903 self.max_children = processes
906def make_server(
907 host: str,
908 port: int,
909 app: WSGIApplication,
910 threaded: bool = False,
911 processes: int = 1,
912 request_handler: type[WSGIRequestHandler] | None = None,
913 passthrough_errors: bool = False,
914 ssl_context: _TSSLContextArg | None = None,
915 fd: int | None = None,
916) -> BaseWSGIServer:
917 """Create an appropriate WSGI server instance based on the value of
918 ``threaded`` and ``processes``.
920 This is called from :func:`run_simple`, but can be used separately
921 to have access to the server object, such as to run it in a separate
922 thread.
924 See :func:`run_simple` for parameter docs.
925 """
926 if threaded and processes > 1:
927 raise ValueError("Cannot have a multi-thread and multi-process server.")
929 if threaded:
930 return ThreadedWSGIServer(
931 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
932 )
934 if processes > 1:
935 return ForkingWSGIServer(
936 host,
937 port,
938 app,
939 processes,
940 request_handler,
941 passthrough_errors,
942 ssl_context,
943 fd=fd,
944 )
946 return BaseWSGIServer(
947 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
948 )
951def is_running_from_reloader() -> bool:
952 """Check if the server is running as a subprocess within the
953 Werkzeug reloader.
955 .. versionadded:: 0.10
956 """
957 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
960def run_simple(
961 hostname: str,
962 port: int,
963 application: WSGIApplication,
964 use_reloader: bool = False,
965 use_debugger: bool = False,
966 use_evalex: bool = True,
967 extra_files: t.Iterable[str] | None = None,
968 exclude_patterns: t.Iterable[str] | None = None,
969 reloader_interval: int = 1,
970 reloader_type: str = "auto",
971 threaded: bool = False,
972 processes: int = 1,
973 request_handler: type[WSGIRequestHandler] | None = None,
974 static_files: dict[str, str | tuple[str, str]] | None = None,
975 passthrough_errors: bool = False,
976 ssl_context: _TSSLContextArg | None = None,
977) -> None:
978 """Start a development server for a WSGI application. Various
979 optional features can be enabled.
981 .. warning::
983 Do not use the development server when deploying to production.
984 It is intended for use only during local development. It is not
985 designed to be particularly efficient, stable, or secure.
987 :param hostname: The host to bind to, for example ``'localhost'``.
988 Can be a domain, IPv4 or IPv6 address, or file path starting
989 with ``unix://`` for a Unix socket.
990 :param port: The port to bind to, for example ``8080``. Using ``0``
991 tells the OS to pick a random free port.
992 :param application: The WSGI application to run.
993 :param use_reloader: Use a reloader process to restart the server
994 process when files are changed.
995 :param use_debugger: Use Werkzeug's debugger, which will show
996 formatted tracebacks on unhandled exceptions.
997 :param use_evalex: Make the debugger interactive. A Python terminal
998 can be opened for any frame in the traceback. Some protection is
999 provided by requiring a PIN, but this should never be enabled
1000 on a publicly visible server.
1001 :param extra_files: The reloader will watch these files for changes
1002 in addition to Python modules. For example, watch a
1003 configuration file.
1004 :param exclude_patterns: The reloader will ignore changes to any
1005 files matching these :mod:`fnmatch` patterns. For example,
1006 ignore cache files.
1007 :param reloader_interval: How often the reloader tries to check for
1008 changes.
1009 :param reloader_type: The reloader to use. The ``'stat'`` reloader
1010 is built in, but may require significant CPU to watch files. The
1011 ``'watchdog'`` reloader is much more efficient but requires
1012 installing the ``watchdog`` package first.
1013 :param threaded: Handle concurrent requests using threads. Cannot be
1014 used with ``processes``.
1015 :param processes: Handle concurrent requests using up to this number
1016 of processes. Cannot be used with ``threaded``.
1017 :param request_handler: Use a different
1018 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
1019 handle requests.
1020 :param static_files: A dict mapping URL prefixes to directories to
1021 serve static files from using
1022 :class:`~werkzeug.middleware.SharedDataMiddleware`.
1023 :param passthrough_errors: Don't catch unhandled exceptions at the
1024 server level, let the server crash instead. If ``use_debugger``
1025 is enabled, the debugger will still catch such errors.
1026 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
1027 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
1028 tuple to create a typical context, or the string ``'adhoc'`` to
1029 generate a temporary self-signed certificate.
1031 .. versionchanged:: 2.1
1032 Instructions are shown for dealing with an "address already in
1033 use" error.
1035 .. versionchanged:: 2.1
1036 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
1037 addition to a real IP.
1039 .. versionchanged:: 2.1
1040 The command-line interface was removed.
1042 .. versionchanged:: 2.0
1043 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
1044 was bound as well as a warning not to run the development server
1045 in production.
1047 .. versionchanged:: 2.0
1048 The ``exclude_patterns`` parameter was added.
1050 .. versionchanged:: 0.15
1051 Bind to a Unix socket by passing a ``hostname`` that starts with
1052 ``unix://``.
1054 .. versionchanged:: 0.10
1055 Improved the reloader and added support for changing the backend
1056 through the ``reloader_type`` parameter.
1058 .. versionchanged:: 0.9
1059 A command-line interface was added.
1061 .. versionchanged:: 0.8
1062 ``ssl_context`` can be a tuple of paths to the certificate and
1063 private key files.
1065 .. versionchanged:: 0.6
1066 The ``ssl_context`` parameter was added.
1068 .. versionchanged:: 0.5
1069 The ``static_files`` and ``passthrough_errors`` parameters were
1070 added.
1071 """
1072 if not isinstance(port, int):
1073 raise TypeError("port must be an integer")
1075 if static_files:
1076 from .middleware.shared_data import SharedDataMiddleware
1078 application = SharedDataMiddleware(application, static_files)
1080 if use_debugger:
1081 from .debug import DebuggedApplication
1083 application = DebuggedApplication(application, evalex=use_evalex)
1084 # Allow the specified hostname to use the debugger, in addition to
1085 # localhost domains.
1086 application.trusted_hosts.append(hostname)
1088 if not is_running_from_reloader():
1089 fd = None
1090 else:
1091 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1093 srv = make_server(
1094 hostname,
1095 port,
1096 application,
1097 threaded,
1098 processes,
1099 request_handler,
1100 passthrough_errors,
1101 ssl_context,
1102 fd=fd,
1103 )
1104 srv.socket.set_inheritable(True)
1105 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno())
1107 if not is_running_from_reloader():
1108 srv.log_startup()
1109 _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
1111 if use_reloader:
1112 from ._reloader import run_with_reloader
1114 try:
1115 run_with_reloader(
1116 srv.serve_forever,
1117 extra_files=extra_files,
1118 exclude_patterns=exclude_patterns,
1119 interval=reloader_interval,
1120 reloader_type=reloader_type,
1121 )
1122 finally:
1123 srv.server_close()
1124 else:
1125 srv.serve_forever()