Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
9.. code-block:: python
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
15from __future__ import annotations
17import errno
18import io
19import os
20import selectors
21import socket
22import socketserver
23import sys
24import typing as t
25from datetime import datetime as dt
26from datetime import timedelta
27from datetime import timezone
28from http.server import BaseHTTPRequestHandler
29from http.server import HTTPServer
30from urllib.parse import unquote
31from urllib.parse import urlsplit
33from ._internal import _log
34from ._internal import _wsgi_encoding_dance
35from .exceptions import InternalServerError
36from .urls import uri_to_iri
38try:
39 import ssl
41 connection_dropped_errors: tuple[type[Exception], ...] = (
42 ConnectionError,
43 socket.timeout,
44 ssl.SSLEOFError,
45 )
46except ImportError:
48 class _SslDummy:
49 def __getattr__(self, name: str) -> t.Any:
50 raise RuntimeError( # noqa: B904
51 "SSL is unavailable because this Python runtime was not"
52 " compiled with SSL/TLS support."
53 )
55 ssl = _SslDummy() # type: ignore
56 connection_dropped_errors = (ConnectionError, socket.timeout)
58_log_add_style = True
60if os.name == "nt":
61 try:
62 __import__("colorama")
63 except ImportError:
64 _log_add_style = False
66can_fork = hasattr(os, "fork")
68if can_fork:
69 ForkingMixIn = socketserver.ForkingMixIn
70else:
72 class ForkingMixIn: # type: ignore
73 pass
76try:
77 af_unix = socket.AF_UNIX
78except AttributeError:
79 af_unix = None # type: ignore
81LISTEN_QUEUE = 128
83_TSSLContextArg = t.Optional[
84 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], t.Literal["adhoc"]]
85]
87if t.TYPE_CHECKING:
88 from _typeshed.wsgi import WSGIApplication
89 from _typeshed.wsgi import WSGIEnvironment
90 from cryptography.hazmat.primitives.asymmetric.rsa import (
91 RSAPrivateKeyWithSerialization,
92 )
93 from cryptography.x509 import Certificate
96class DechunkedInput(io.RawIOBase):
97 """An input stream that handles Transfer-Encoding 'chunked'"""
99 def __init__(self, rfile: t.IO[bytes]) -> None:
100 self._rfile = rfile
101 self._done = False
102 self._len = 0
104 def readable(self) -> bool:
105 return True
107 def read_chunk_len(self) -> int:
108 try:
109 line = self._rfile.readline().decode("latin1")
110 _len = int(line.strip(), 16)
111 except ValueError as e:
112 raise OSError("Invalid chunk header") from e
113 if _len < 0:
114 raise OSError("Negative chunk length not allowed")
115 return _len
117 def readinto(self, buf: bytearray) -> int: # type: ignore
118 read = 0
119 while not self._done and read < len(buf):
120 if self._len == 0:
121 # This is the first chunk or we fully consumed the previous
122 # one. Read the next length of the next chunk
123 self._len = self.read_chunk_len()
125 if self._len == 0:
126 # Found the final chunk of size 0. The stream is now exhausted,
127 # but there is still a final newline that should be consumed
128 self._done = True
130 if self._len > 0:
131 # There is data (left) in this chunk, so append it to the
132 # buffer. If this operation fully consumes the chunk, this will
133 # reset self._len to 0.
134 n = min(len(buf), self._len)
136 # If (read + chunk size) becomes more than len(buf), buf will
137 # grow beyond the original size and read more data than
138 # required. So only read as much data as can fit in buf.
139 if read + n > len(buf):
140 buf[read:] = self._rfile.read(len(buf) - read)
141 self._len -= len(buf) - read
142 read = len(buf)
143 else:
144 buf[read : read + n] = self._rfile.read(n)
145 self._len -= n
146 read += n
148 if self._len == 0:
149 # Skip the terminating newline of a chunk that has been fully
150 # consumed. This also applies to the 0-sized final chunk
151 terminator = self._rfile.readline()
152 if terminator not in (b"\n", b"\r\n", b"\r"):
153 raise OSError("Missing chunk terminating newline")
155 return read
158class WSGIRequestHandler(BaseHTTPRequestHandler):
159 """A request handler that implements WSGI dispatching."""
161 server: BaseWSGIServer
163 @property
164 def server_version(self) -> str: # type: ignore
165 return self.server._server_version
167 def make_environ(self) -> WSGIEnvironment:
168 request_url = urlsplit(self.path)
169 url_scheme = "http" if self.server.ssl_context is None else "https"
171 if not self.client_address:
172 self.client_address = ("<local>", 0)
173 elif isinstance(self.client_address, str):
174 self.client_address = (self.client_address, 0)
176 # If there was no scheme but the path started with two slashes,
177 # the first segment may have been incorrectly parsed as the
178 # netloc, prepend it to the path again.
179 if not request_url.scheme and request_url.netloc:
180 path_info = f"/{request_url.netloc}{request_url.path}"
181 else:
182 path_info = request_url.path
184 path_info = unquote(path_info)
186 environ: WSGIEnvironment = {
187 "wsgi.version": (1, 0),
188 "wsgi.url_scheme": url_scheme,
189 "wsgi.input": self.rfile,
190 "wsgi.errors": sys.stderr,
191 "wsgi.multithread": self.server.multithread,
192 "wsgi.multiprocess": self.server.multiprocess,
193 "wsgi.run_once": False,
194 "werkzeug.socket": self.connection,
195 "SERVER_SOFTWARE": self.server_version,
196 "REQUEST_METHOD": self.command,
197 "SCRIPT_NAME": "",
198 "PATH_INFO": _wsgi_encoding_dance(path_info),
199 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
200 # Non-standard, added by mod_wsgi, uWSGI
201 "REQUEST_URI": _wsgi_encoding_dance(self.path),
202 # Non-standard, added by gunicorn
203 "RAW_URI": _wsgi_encoding_dance(self.path),
204 "REMOTE_ADDR": self.address_string(),
205 "REMOTE_PORT": self.port_integer(),
206 "SERVER_NAME": self.server.server_address[0],
207 "SERVER_PORT": str(self.server.server_address[1]),
208 "SERVER_PROTOCOL": self.request_version,
209 }
211 for key, value in self.headers.items():
212 if "_" in key:
213 continue
215 key = key.upper().replace("-", "_")
216 value = value.replace("\r\n", "")
217 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
218 key = f"HTTP_{key}"
219 if key in environ:
220 value = f"{environ[key]},{value}"
221 environ[key] = value
223 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
224 environ["wsgi.input_terminated"] = True
225 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
227 # Per RFC 2616, if the URL is absolute, use that as the host.
228 # We're using "has a scheme" to indicate an absolute URL.
229 if request_url.scheme and request_url.netloc:
230 environ["HTTP_HOST"] = request_url.netloc
232 try:
233 # binary_form=False gives nicer information, but wouldn't be compatible with
234 # what Nginx or Apache could return.
235 peer_cert = self.connection.getpeercert(binary_form=True)
236 if peer_cert is not None:
237 # Nginx and Apache use PEM format.
238 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
239 except ValueError:
240 # SSL handshake hasn't finished.
241 self.server.log("error", "Cannot fetch SSL peer certificate info")
242 except AttributeError:
243 # Not using TLS, the socket will not have getpeercert().
244 pass
246 return environ
248 def run_wsgi(self) -> None:
249 if self.headers.get("Expect", "").lower().strip() == "100-continue":
250 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
252 self.environ = environ = self.make_environ()
253 status_set: str | None = None
254 headers_set: list[tuple[str, str]] | None = None
255 status_sent: str | None = None
256 headers_sent: list[tuple[str, str]] | None = None
257 chunk_response: bool = False
259 def write(data: bytes) -> None:
260 nonlocal status_sent, headers_sent, chunk_response
261 assert status_set is not None, "write() before start_response"
262 assert headers_set is not None, "write() before start_response"
263 if status_sent is None:
264 status_sent = status_set
265 headers_sent = headers_set
266 try:
267 code_str, msg = status_sent.split(None, 1)
268 except ValueError:
269 code_str, msg = status_sent, ""
270 code = int(code_str)
271 self.send_response(code, msg)
272 header_keys = set()
273 for key, value in headers_sent:
274 self.send_header(key, value)
275 header_keys.add(key.lower())
277 # Use chunked transfer encoding if there is no content
278 # length. Do not use for 1xx and 204 responses. 304
279 # responses and HEAD requests are also excluded, which
280 # is the more conservative behavior and matches other
281 # parts of the code.
282 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
283 if (
284 not (
285 "content-length" in header_keys
286 or environ["REQUEST_METHOD"] == "HEAD"
287 or (100 <= code < 200)
288 or code in {204, 304}
289 )
290 and self.protocol_version >= "HTTP/1.1"
291 ):
292 chunk_response = True
293 self.send_header("Transfer-Encoding", "chunked")
295 # Always close the connection. This disables HTTP/1.1
296 # keep-alive connections. They aren't handled well by
297 # Python's http.server because it doesn't know how to
298 # drain the stream before the next request line.
299 self.send_header("Connection", "close")
300 self.end_headers()
302 assert isinstance(data, bytes), "applications must write bytes"
304 if data:
305 if chunk_response:
306 self.wfile.write(hex(len(data))[2:].encode())
307 self.wfile.write(b"\r\n")
309 self.wfile.write(data)
311 if chunk_response:
312 self.wfile.write(b"\r\n")
314 self.wfile.flush()
316 def start_response(status, headers, exc_info=None): # type: ignore
317 nonlocal status_set, headers_set
318 if exc_info:
319 try:
320 if headers_sent:
321 raise exc_info[1].with_traceback(exc_info[2])
322 finally:
323 exc_info = None
324 elif headers_set:
325 raise AssertionError("Headers already set")
326 status_set = status
327 headers_set = headers
328 return write
330 def execute(app: WSGIApplication) -> None:
331 application_iter = app(environ, start_response)
332 try:
333 for data in application_iter:
334 write(data)
335 if not headers_sent:
336 write(b"")
337 if chunk_response:
338 self.wfile.write(b"0\r\n\r\n")
339 finally:
340 # Check for any remaining data in the read socket, and discard it. This
341 # will read past request.max_content_length, but lets the client see a
342 # 413 response instead of a connection reset failure. If we supported
343 # keep-alive connections, this naive approach would break by reading the
344 # next request line. Since we know that write (above) closes every
345 # connection we can read everything.
346 selector = selectors.DefaultSelector()
347 selector.register(self.connection, selectors.EVENT_READ)
348 total_size = 0
349 total_reads = 0
351 # A timeout of 0 tends to fail because a client needs a small amount of
352 # time to continue sending its data.
353 while selector.select(timeout=0.01):
354 # Only read 10MB into memory at a time.
355 data = self.rfile.read(10_000_000)
356 total_size += len(data)
357 total_reads += 1
359 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends
360 # more than that, they'll get a connection reset failure.
361 if not data or total_size >= 10_000_000_000 or total_reads > 1000:
362 break
364 selector.close()
366 if hasattr(application_iter, "close"):
367 application_iter.close()
369 try:
370 execute(self.server.app)
371 except connection_dropped_errors as e:
372 self.connection_dropped(e, environ)
373 except Exception as e:
374 if self.server.passthrough_errors:
375 raise
377 if status_sent is not None and chunk_response:
378 self.close_connection = True
380 try:
381 # if we haven't yet sent the headers but they are set
382 # we roll back to be able to set them again.
383 if status_sent is None:
384 status_set = None
385 headers_set = None
386 execute(InternalServerError())
387 except Exception:
388 pass
390 from .debug.tbtools import DebugTraceback
392 msg = DebugTraceback(e).render_traceback_text()
393 self.server.log("error", f"Error on request:\n{msg}")
395 def handle(self) -> None:
396 """Handles a request ignoring dropped connections."""
397 try:
398 super().handle()
399 except (ConnectionError, socket.timeout) as e:
400 self.connection_dropped(e)
401 except Exception as e:
402 if self.server.ssl_context is not None and is_ssl_error(e):
403 self.log_error("SSL error occurred: %s", e)
404 else:
405 raise
407 def connection_dropped(
408 self, error: BaseException, environ: WSGIEnvironment | None = None
409 ) -> None:
410 """Called if the connection was closed by the client. By default
411 nothing happens.
412 """
414 def __getattr__(self, name: str) -> t.Any:
415 # All HTTP methods are handled by run_wsgi.
416 if name.startswith("do_"):
417 return self.run_wsgi
419 # All other attributes are forwarded to the base class.
420 return getattr(super(), name)
422 def address_string(self) -> str:
423 if getattr(self, "environ", None):
424 return self.environ["REMOTE_ADDR"] # type: ignore
426 if not self.client_address:
427 return "<local>"
429 return self.client_address[0]
431 def port_integer(self) -> int:
432 return self.client_address[1]
434 # Escape control characters. This is defined (but private) in Python 3.12.
435 _control_char_table = str.maketrans(
436 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]}
437 )
438 _control_char_table[ord("\\")] = r"\\"
440 def log_request(self, code: int | str = "-", size: int | str = "-") -> None:
441 try:
442 path = uri_to_iri(self.path)
443 msg = f"{self.command} {path} {self.request_version}"
444 except AttributeError:
445 # path isn't set if the requestline was bad
446 msg = self.requestline
448 # Escape control characters that may be in the decoded path.
449 msg = msg.translate(self._control_char_table)
450 code = str(code)
452 if code[0] == "1": # 1xx - Informational
453 msg = _ansi_style(msg, "bold")
454 elif code == "200": # 2xx - Success
455 pass
456 elif code == "304": # 304 - Resource Not Modified
457 msg = _ansi_style(msg, "cyan")
458 elif code[0] == "3": # 3xx - Redirection
459 msg = _ansi_style(msg, "green")
460 elif code == "404": # 404 - Resource Not Found
461 msg = _ansi_style(msg, "yellow")
462 elif code[0] == "4": # 4xx - Client Error
463 msg = _ansi_style(msg, "bold", "red")
464 else: # 5xx, or any other response
465 msg = _ansi_style(msg, "bold", "magenta")
467 self.log("info", '"%s" %s %s', msg, code, size)
469 def log_error(self, format: str, *args: t.Any) -> None:
470 self.log("error", format, *args)
472 def log_message(self, format: str, *args: t.Any) -> None:
473 self.log("info", format, *args)
475 def log(self, type: str, message: str, *args: t.Any) -> None:
476 _log(
477 type,
478 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n",
479 *args,
480 )
483def _ansi_style(value: str, *styles: str) -> str:
484 if not _log_add_style:
485 return value
487 codes = {
488 "bold": 1,
489 "red": 31,
490 "green": 32,
491 "yellow": 33,
492 "magenta": 35,
493 "cyan": 36,
494 }
496 for style in styles:
497 value = f"\x1b[{codes[style]}m{value}"
499 return f"{value}\x1b[0m"
502def generate_adhoc_ssl_pair(
503 cn: str | None = None,
504) -> tuple[Certificate, RSAPrivateKeyWithSerialization]:
505 try:
506 from cryptography import x509
507 from cryptography.hazmat.backends import default_backend
508 from cryptography.hazmat.primitives import hashes
509 from cryptography.hazmat.primitives.asymmetric import rsa
510 from cryptography.x509.oid import NameOID
511 except ImportError:
512 raise TypeError(
513 "Using ad-hoc certificates requires the cryptography library."
514 ) from None
516 backend = default_backend()
517 pkey = rsa.generate_private_key(
518 public_exponent=65537, key_size=2048, backend=backend
519 )
521 # pretty damn sure that this is not actually accepted by anyone
522 if cn is None:
523 cn = "*"
525 subject = x509.Name(
526 [
527 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
528 x509.NameAttribute(NameOID.COMMON_NAME, cn),
529 ]
530 )
532 backend = default_backend()
533 cert = (
534 x509.CertificateBuilder()
535 .subject_name(subject)
536 .issuer_name(subject)
537 .public_key(pkey.public_key())
538 .serial_number(x509.random_serial_number())
539 .not_valid_before(dt.now(timezone.utc))
540 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
541 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
542 .add_extension(
543 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]),
544 critical=False,
545 )
546 .sign(pkey, hashes.SHA256(), backend)
547 )
548 return cert, pkey
551def make_ssl_devcert(
552 base_path: str, host: str | None = None, cn: str | None = None
553) -> tuple[str, str]:
554 """Creates an SSL key for development. This should be used instead of
555 the ``'adhoc'`` key which generates a new cert on each server start.
556 It accepts a path for where it should store the key and cert and
557 either a host or CN. If a host is given it will use the CN
558 ``*.host/CN=host``.
560 For more information see :func:`run_simple`.
562 .. versionadded:: 0.9
564 :param base_path: the path to the certificate and key. The extension
565 ``.crt`` is added for the certificate, ``.key`` is
566 added for the key.
567 :param host: the name of the host. This can be used as an alternative
568 for the `cn`.
569 :param cn: the `CN` to use.
570 """
572 if host is not None:
573 cn = host
574 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
576 from cryptography.hazmat.primitives import serialization
578 cert_file = f"{base_path}.crt"
579 pkey_file = f"{base_path}.key"
581 with open(cert_file, "wb") as f:
582 f.write(cert.public_bytes(serialization.Encoding.PEM))
583 with open(pkey_file, "wb") as f:
584 f.write(
585 pkey.private_bytes(
586 encoding=serialization.Encoding.PEM,
587 format=serialization.PrivateFormat.TraditionalOpenSSL,
588 encryption_algorithm=serialization.NoEncryption(),
589 )
590 )
592 return cert_file, pkey_file
595def generate_adhoc_ssl_context() -> ssl.SSLContext:
596 """Generates an adhoc SSL context for the development server."""
597 import atexit
598 import tempfile
600 cert, pkey = generate_adhoc_ssl_pair()
602 from cryptography.hazmat.primitives import serialization
604 cert_handle, cert_file = tempfile.mkstemp()
605 pkey_handle, pkey_file = tempfile.mkstemp()
606 atexit.register(os.remove, pkey_file)
607 atexit.register(os.remove, cert_file)
609 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
610 os.write(
611 pkey_handle,
612 pkey.private_bytes(
613 encoding=serialization.Encoding.PEM,
614 format=serialization.PrivateFormat.TraditionalOpenSSL,
615 encryption_algorithm=serialization.NoEncryption(),
616 ),
617 )
619 os.close(cert_handle)
620 os.close(pkey_handle)
621 ctx = load_ssl_context(cert_file, pkey_file)
622 return ctx
625def load_ssl_context(
626 cert_file: str, pkey_file: str | None = None, protocol: int | None = None
627) -> ssl.SSLContext:
628 """Loads SSL context from cert/private key files and optional protocol.
629 Many parameters are directly taken from the API of
630 :py:class:`ssl.SSLContext`.
632 :param cert_file: Path of the certificate to use.
633 :param pkey_file: Path of the private key to use. If not given, the key
634 will be obtained from the certificate file.
635 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
636 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
637 """
638 if protocol is None:
639 protocol = ssl.PROTOCOL_TLS_SERVER
641 ctx = ssl.SSLContext(protocol)
642 ctx.load_cert_chain(cert_file, pkey_file)
643 return ctx
646def is_ssl_error(error: Exception | None = None) -> bool:
647 """Checks if the given error (or the current one) is an SSL error."""
648 if error is None:
649 error = t.cast(Exception, sys.exc_info()[1])
650 return isinstance(error, ssl.SSLError)
653def select_address_family(host: str, port: int) -> socket.AddressFamily:
654 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
655 the host and port."""
656 if host.startswith("unix://"):
657 return socket.AF_UNIX
658 elif ":" in host and hasattr(socket, "AF_INET6"):
659 return socket.AF_INET6
660 return socket.AF_INET
663def get_sockaddr(
664 host: str, port: int, family: socket.AddressFamily
665) -> tuple[str, int] | str:
666 """Return a fully qualified socket address that can be passed to
667 :func:`socket.bind`."""
668 if family == af_unix:
669 # Absolute path avoids IDNA encoding error when path starts with dot.
670 return os.path.abspath(host.partition("://")[2])
671 try:
672 res = socket.getaddrinfo(
673 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
674 )
675 except socket.gaierror:
676 return host, port
677 return res[0][4] # type: ignore
680def get_interface_ip(family: socket.AddressFamily) -> str:
681 """Get the IP address of an external interface. Used when binding to
682 0.0.0.0 or ::1 to show a more useful URL.
684 :meta private:
685 """
686 # arbitrary private address
687 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
689 with socket.socket(family, socket.SOCK_DGRAM) as s:
690 try:
691 s.connect((host, 58162))
692 except OSError:
693 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
695 return s.getsockname()[0] # type: ignore
698class BaseWSGIServer(HTTPServer):
699 """A WSGI server that that handles one request at a time.
701 Use :func:`make_server` to create a server instance.
702 """
704 multithread = False
705 multiprocess = False
706 request_queue_size = LISTEN_QUEUE
707 allow_reuse_address = True
709 def __init__(
710 self,
711 host: str,
712 port: int,
713 app: WSGIApplication,
714 handler: type[WSGIRequestHandler] | None = None,
715 passthrough_errors: bool = False,
716 ssl_context: _TSSLContextArg | None = None,
717 fd: int | None = None,
718 ) -> None:
719 if handler is None:
720 handler = WSGIRequestHandler
722 # If the handler doesn't directly set a protocol version and
723 # thread or process workers are used, then allow chunked
724 # responses and keep-alive connections by enabling HTTP/1.1.
725 if "protocol_version" not in vars(handler) and (
726 self.multithread or self.multiprocess
727 ):
728 handler.protocol_version = "HTTP/1.1"
730 self.host = host
731 self.port = port
732 self.app = app
733 self.passthrough_errors = passthrough_errors
735 self.address_family = address_family = select_address_family(host, port)
736 server_address = get_sockaddr(host, int(port), address_family)
738 # Remove a leftover Unix socket file from a previous run. Don't
739 # remove a file that was set up by run_simple.
740 if address_family == af_unix and fd is None:
741 server_address = t.cast(str, server_address)
743 if os.path.exists(server_address):
744 os.unlink(server_address)
746 # Bind and activate will be handled manually, it should only
747 # happen if we're not using a socket that was already set up.
748 super().__init__(
749 server_address, # type: ignore[arg-type]
750 handler,
751 bind_and_activate=False,
752 )
754 if fd is None:
755 # No existing socket descriptor, do bind_and_activate=True.
756 try:
757 self.server_bind()
758 self.server_activate()
759 except OSError as e:
760 # Catch connection issues and show them without the traceback. Show
761 # extra instructions for address not found, and for macOS.
762 self.server_close()
763 print(e.strerror, file=sys.stderr)
765 if e.errno == errno.EADDRINUSE:
766 print(
767 f"Port {port} is in use by another program. Either identify and"
768 " stop that program, or start the server with a different"
769 " port.",
770 file=sys.stderr,
771 )
773 if sys.platform == "darwin" and port == 5000:
774 print(
775 "On macOS, try disabling the 'AirPlay Receiver' service"
776 " from System Preferences -> General -> AirDrop & Handoff.",
777 file=sys.stderr,
778 )
780 sys.exit(1)
781 except BaseException:
782 self.server_close()
783 raise
784 else:
785 # TCPServer automatically opens a socket even if bind_and_activate is False.
786 # Close it to silence a ResourceWarning.
787 self.server_close()
789 # Use the passed in socket directly.
790 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
791 self.server_address = self.socket.getsockname()
793 if address_family != af_unix:
794 # If port was 0, this will record the bound port.
795 self.port = self.server_address[1]
797 if ssl_context is not None:
798 if isinstance(ssl_context, tuple):
799 ssl_context = load_ssl_context(*ssl_context)
800 elif ssl_context == "adhoc":
801 ssl_context = generate_adhoc_ssl_context()
803 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
804 self.ssl_context: ssl.SSLContext | None = ssl_context
805 else:
806 self.ssl_context = None
808 import importlib.metadata
810 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}"
812 def log(self, type: str, message: str, *args: t.Any) -> None:
813 _log(type, message, *args)
815 def serve_forever(self, poll_interval: float = 0.5) -> None:
816 try:
817 super().serve_forever(poll_interval=poll_interval)
818 except KeyboardInterrupt:
819 pass
820 finally:
821 self.server_close()
823 def handle_error(
824 self, request: t.Any, client_address: tuple[str, int] | str
825 ) -> None:
826 if self.passthrough_errors:
827 raise
829 return super().handle_error(request, client_address)
831 def log_startup(self) -> None:
832 """Show information about the address when starting the server."""
833 dev_warning = (
834 "WARNING: This is a development server. Do not use it in a production"
835 " deployment. Use a production WSGI server instead."
836 )
837 dev_warning = _ansi_style(dev_warning, "bold", "red")
838 messages = [dev_warning]
840 if self.address_family == af_unix:
841 messages.append(f" * Running on {self.host}")
842 else:
843 scheme = "http" if self.ssl_context is None else "https"
844 display_hostname = self.host
846 if self.host in {"0.0.0.0", "::"}:
847 messages.append(f" * Running on all addresses ({self.host})")
849 if self.host == "0.0.0.0":
850 localhost = "127.0.0.1"
851 display_hostname = get_interface_ip(socket.AF_INET)
852 else:
853 localhost = "[::1]"
854 display_hostname = get_interface_ip(socket.AF_INET6)
856 messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
858 if ":" in display_hostname:
859 display_hostname = f"[{display_hostname}]"
861 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
863 _log("info", "\n".join(messages))
866class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
867 """A WSGI server that handles concurrent requests in separate
868 threads.
870 Use :func:`make_server` to create a server instance.
871 """
873 multithread = True
874 daemon_threads = True
877class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
878 """A WSGI server that handles concurrent requests in separate forked
879 processes.
881 Use :func:`make_server` to create a server instance.
882 """
884 multiprocess = True
886 def __init__(
887 self,
888 host: str,
889 port: int,
890 app: WSGIApplication,
891 processes: int = 40,
892 handler: type[WSGIRequestHandler] | None = None,
893 passthrough_errors: bool = False,
894 ssl_context: _TSSLContextArg | None = None,
895 fd: int | None = None,
896 ) -> None:
897 if not can_fork:
898 raise ValueError("Your platform does not support forking.")
900 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
901 self.max_children = processes
904def make_server(
905 host: str,
906 port: int,
907 app: WSGIApplication,
908 threaded: bool = False,
909 processes: int = 1,
910 request_handler: type[WSGIRequestHandler] | None = None,
911 passthrough_errors: bool = False,
912 ssl_context: _TSSLContextArg | None = None,
913 fd: int | None = None,
914) -> BaseWSGIServer:
915 """Create an appropriate WSGI server instance based on the value of
916 ``threaded`` and ``processes``.
918 This is called from :func:`run_simple`, but can be used separately
919 to have access to the server object, such as to run it in a separate
920 thread.
922 See :func:`run_simple` for parameter docs.
923 """
924 if threaded and processes > 1:
925 raise ValueError("Cannot have a multi-thread and multi-process server.")
927 if threaded:
928 return ThreadedWSGIServer(
929 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
930 )
932 if processes > 1:
933 return ForkingWSGIServer(
934 host,
935 port,
936 app,
937 processes,
938 request_handler,
939 passthrough_errors,
940 ssl_context,
941 fd=fd,
942 )
944 return BaseWSGIServer(
945 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
946 )
949def is_running_from_reloader() -> bool:
950 """Check if the server is running as a subprocess within the
951 Werkzeug reloader.
953 .. versionadded:: 0.10
954 """
955 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
958def run_simple(
959 hostname: str,
960 port: int,
961 application: WSGIApplication,
962 use_reloader: bool = False,
963 use_debugger: bool = False,
964 use_evalex: bool = True,
965 extra_files: t.Iterable[str] | None = None,
966 exclude_patterns: t.Iterable[str] | None = None,
967 reloader_interval: int = 1,
968 reloader_type: str = "auto",
969 threaded: bool = False,
970 processes: int = 1,
971 request_handler: type[WSGIRequestHandler] | None = None,
972 static_files: dict[str, str | tuple[str, str]] | None = None,
973 passthrough_errors: bool = False,
974 ssl_context: _TSSLContextArg | None = None,
975) -> None:
976 """Start a development server for a WSGI application. Various
977 optional features can be enabled.
979 .. warning::
981 Do not use the development server when deploying to production.
982 It is intended for use only during local development. It is not
983 designed to be particularly efficient, stable, or secure.
985 :param hostname: The host to bind to, for example ``'localhost'``.
986 Can be a domain, IPv4 or IPv6 address, or file path starting
987 with ``unix://`` for a Unix socket.
988 :param port: The port to bind to, for example ``8080``. Using ``0``
989 tells the OS to pick a random free port.
990 :param application: The WSGI application to run.
991 :param use_reloader: Use a reloader process to restart the server
992 process when files are changed.
993 :param use_debugger: Use Werkzeug's debugger, which will show
994 formatted tracebacks on unhandled exceptions.
995 :param use_evalex: Make the debugger interactive. A Python terminal
996 can be opened for any frame in the traceback. Some protection is
997 provided by requiring a PIN, but this should never be enabled
998 on a publicly visible server.
999 :param extra_files: The reloader will watch these files for changes
1000 in addition to Python modules. For example, watch a
1001 configuration file.
1002 :param exclude_patterns: The reloader will ignore changes to any
1003 files matching these :mod:`fnmatch` patterns. For example,
1004 ignore cache files.
1005 :param reloader_interval: How often the reloader tries to check for
1006 changes.
1007 :param reloader_type: The reloader to use. The ``'stat'`` reloader
1008 is built in, but may require significant CPU to watch files. The
1009 ``'watchdog'`` reloader is much more efficient but requires
1010 installing the ``watchdog`` package first.
1011 :param threaded: Handle concurrent requests using threads. Cannot be
1012 used with ``processes``.
1013 :param processes: Handle concurrent requests using up to this number
1014 of processes. Cannot be used with ``threaded``.
1015 :param request_handler: Use a different
1016 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
1017 handle requests.
1018 :param static_files: A dict mapping URL prefixes to directories to
1019 serve static files from using
1020 :class:`~werkzeug.middleware.SharedDataMiddleware`.
1021 :param passthrough_errors: Don't catch unhandled exceptions at the
1022 server level, let the server crash instead. If ``use_debugger``
1023 is enabled, the debugger will still catch such errors.
1024 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
1025 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
1026 tuple to create a typical context, or the string ``'adhoc'`` to
1027 generate a temporary self-signed certificate.
1029 .. versionchanged:: 2.1
1030 Instructions are shown for dealing with an "address already in
1031 use" error.
1033 .. versionchanged:: 2.1
1034 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
1035 addition to a real IP.
1037 .. versionchanged:: 2.1
1038 The command-line interface was removed.
1040 .. versionchanged:: 2.0
1041 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
1042 was bound as well as a warning not to run the development server
1043 in production.
1045 .. versionchanged:: 2.0
1046 The ``exclude_patterns`` parameter was added.
1048 .. versionchanged:: 0.15
1049 Bind to a Unix socket by passing a ``hostname`` that starts with
1050 ``unix://``.
1052 .. versionchanged:: 0.10
1053 Improved the reloader and added support for changing the backend
1054 through the ``reloader_type`` parameter.
1056 .. versionchanged:: 0.9
1057 A command-line interface was added.
1059 .. versionchanged:: 0.8
1060 ``ssl_context`` can be a tuple of paths to the certificate and
1061 private key files.
1063 .. versionchanged:: 0.6
1064 The ``ssl_context`` parameter was added.
1066 .. versionchanged:: 0.5
1067 The ``static_files`` and ``passthrough_errors`` parameters were
1068 added.
1069 """
1070 if not isinstance(port, int):
1071 raise TypeError("port must be an integer")
1073 if static_files:
1074 from .middleware.shared_data import SharedDataMiddleware
1076 application = SharedDataMiddleware(application, static_files)
1078 if use_debugger:
1079 from .debug import DebuggedApplication
1081 application = DebuggedApplication(application, evalex=use_evalex)
1082 # Allow the specified hostname to use the debugger, in addition to
1083 # localhost domains.
1084 application.trusted_hosts.append(hostname)
1086 if not is_running_from_reloader():
1087 fd = None
1088 else:
1089 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1091 srv = make_server(
1092 hostname,
1093 port,
1094 application,
1095 threaded,
1096 processes,
1097 request_handler,
1098 passthrough_errors,
1099 ssl_context,
1100 fd=fd,
1101 )
1102 srv.socket.set_inheritable(True)
1103 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno())
1105 if not is_running_from_reloader():
1106 srv.log_startup()
1107 _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
1109 if use_reloader:
1110 from ._reloader import run_with_reloader
1112 try:
1113 run_with_reloader(
1114 srv.serve_forever,
1115 extra_files=extra_files,
1116 exclude_patterns=exclude_patterns,
1117 interval=reloader_interval,
1118 reloader_type=reloader_type,
1119 )
1120 finally:
1121 srv.server_close()
1122 else:
1123 srv.serve_forever()