Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/serving.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
9.. code-block:: python
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
15from __future__ import annotations
17import errno
18import io
19import os
20import selectors
21import socket
22import socketserver
23import sys
24import typing as t
25from datetime import datetime as dt
26from datetime import timedelta
27from datetime import timezone
28from http.server import BaseHTTPRequestHandler
29from http.server import HTTPServer
30from urllib.parse import unquote
31from urllib.parse import urlsplit
33from ._internal import _log
34from ._internal import _wsgi_encoding_dance
35from .exceptions import InternalServerError
36from .http import parse_set_header
37from .urls import uri_to_iri
39try:
40 import ssl
42 connection_dropped_errors: tuple[type[Exception], ...] = (
43 ConnectionError,
44 socket.timeout,
45 ssl.SSLEOFError,
46 )
47except ImportError:
49 class _SslDummy:
50 def __getattr__(self, name: str) -> t.Any:
51 raise RuntimeError( # noqa: B904
52 "SSL is unavailable because this Python runtime was not"
53 " compiled with SSL/TLS support."
54 )
56 ssl = _SslDummy() # type: ignore
57 connection_dropped_errors = (ConnectionError, socket.timeout)
59_log_add_style = True
61if os.name == "nt":
62 try:
63 __import__("colorama")
64 except ImportError:
65 _log_add_style = False
67can_fork = hasattr(os, "fork")
69if can_fork:
70 ForkingMixIn = socketserver.ForkingMixIn
71else:
73 class ForkingMixIn: # type: ignore
74 pass
77try:
78 af_unix = socket.AF_UNIX
79except AttributeError:
80 af_unix = None # type: ignore
82LISTEN_QUEUE = 128
84_TSSLContextArg = t.Optional[
85 t.Union["ssl.SSLContext", tuple[str, t.Optional[str]], t.Literal["adhoc"]]
86]
88if t.TYPE_CHECKING:
89 from _typeshed.wsgi import WSGIApplication
90 from _typeshed.wsgi import WSGIEnvironment
91 from cryptography.hazmat.primitives.asymmetric.rsa import (
92 RSAPrivateKeyWithSerialization,
93 )
94 from cryptography.x509 import Certificate
97class DechunkedInput(io.RawIOBase):
98 """An input stream that handles Transfer-Encoding 'chunked'"""
100 def __init__(self, rfile: t.IO[bytes]) -> None:
101 self._rfile = rfile
102 self._done = False
103 self._len = 0
105 def readable(self) -> bool:
106 return True
108 def read_chunk_len(self) -> int:
109 try:
110 line = self._rfile.readline().decode("latin1")
111 _len = int(line.strip(), 16)
112 except ValueError as e:
113 raise OSError("Invalid chunk header") from e
114 if _len < 0:
115 raise OSError("Negative chunk length not allowed")
116 return _len
118 def readinto(self, buf: bytearray) -> int: # type: ignore
119 read = 0
120 while not self._done and read < len(buf):
121 if self._len == 0:
122 # This is the first chunk or we fully consumed the previous
123 # one. Read the next length of the next chunk
124 self._len = self.read_chunk_len()
126 if self._len == 0:
127 # Found the final chunk of size 0. The stream is now exhausted,
128 # but there is still a final newline that should be consumed
129 self._done = True
131 if self._len > 0:
132 # There is data (left) in this chunk, so append it to the
133 # buffer. If this operation fully consumes the chunk, this will
134 # reset self._len to 0.
135 n = min(len(buf), self._len)
137 # If (read + chunk size) becomes more than len(buf), buf will
138 # grow beyond the original size and read more data than
139 # required. So only read as much data as can fit in buf.
140 if read + n > len(buf):
141 buf[read:] = self._rfile.read(len(buf) - read)
142 self._len -= len(buf) - read
143 read = len(buf)
144 else:
145 buf[read : read + n] = self._rfile.read(n)
146 self._len -= n
147 read += n
149 if self._len == 0:
150 # Skip the terminating newline of a chunk that has been fully
151 # consumed. This also applies to the 0-sized final chunk
152 terminator = self._rfile.readline()
153 if terminator not in (b"\n", b"\r\n", b"\r"):
154 raise OSError("Missing chunk terminating newline")
156 return read
159class WSGIRequestHandler(BaseHTTPRequestHandler):
160 """A request handler that implements WSGI dispatching."""
162 server: BaseWSGIServer
164 @property
165 def server_version(self) -> str: # type: ignore
166 return self.server._server_version
168 def make_environ(self) -> WSGIEnvironment:
169 request_url = urlsplit(self.path)
170 url_scheme = "http" if self.server.ssl_context is None else "https"
172 if not self.client_address:
173 self.client_address = ("<local>", 0)
174 elif isinstance(self.client_address, str):
175 self.client_address = (self.client_address, 0)
177 # If there was no scheme but the path started with two slashes,
178 # the first segment may have been incorrectly parsed as the
179 # netloc, prepend it to the path again.
180 if not request_url.scheme and request_url.netloc:
181 path_info = f"/{request_url.netloc}{request_url.path}"
182 else:
183 path_info = request_url.path
185 path_info = unquote(path_info)
187 environ: WSGIEnvironment = {
188 "wsgi.version": (1, 0),
189 "wsgi.url_scheme": url_scheme,
190 "wsgi.input": self.rfile,
191 "wsgi.errors": sys.stderr,
192 "wsgi.multithread": self.server.multithread,
193 "wsgi.multiprocess": self.server.multiprocess,
194 "wsgi.run_once": False,
195 "werkzeug.socket": self.connection,
196 "SERVER_SOFTWARE": self.server_version,
197 "REQUEST_METHOD": self.command,
198 "SCRIPT_NAME": "",
199 "PATH_INFO": _wsgi_encoding_dance(path_info),
200 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
201 # Non-standard, added by mod_wsgi, uWSGI
202 "REQUEST_URI": _wsgi_encoding_dance(self.path),
203 # Non-standard, added by gunicorn
204 "RAW_URI": _wsgi_encoding_dance(self.path),
205 "REMOTE_ADDR": self.address_string(),
206 "REMOTE_PORT": self.port_integer(),
207 "SERVER_NAME": self.server.server_address[0],
208 "SERVER_PORT": str(self.server.server_address[1]),
209 "SERVER_PROTOCOL": self.request_version,
210 }
212 for key, value in self.headers.items():
213 if "_" in key:
214 continue
216 key = key.upper().replace("-", "_")
217 value = value.replace("\r\n", "")
218 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
219 key = f"HTTP_{key}"
220 if key in environ:
221 value = f"{environ[key]},{value}"
222 environ[key] = value
224 if "chunked" in parse_set_header(environ.get("HTTP_TRANSFER_ENCODING")):
225 environ["wsgi.input_terminated"] = True
226 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
228 # Per RFC 2616, if the URL is absolute, use that as the host.
229 # We're using "has a scheme" to indicate an absolute URL.
230 if request_url.scheme and request_url.netloc:
231 environ["HTTP_HOST"] = request_url.netloc
233 try:
234 # binary_form=False gives nicer information, but wouldn't be compatible with
235 # what Nginx or Apache could return.
236 peer_cert = self.connection.getpeercert(binary_form=True)
237 if peer_cert is not None:
238 # Nginx and Apache use PEM format.
239 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
240 except ValueError:
241 # SSL handshake hasn't finished.
242 self.server.log("error", "Cannot fetch SSL peer certificate info")
243 except AttributeError:
244 # Not using TLS, the socket will not have getpeercert().
245 pass
247 return environ
249 def run_wsgi(self) -> None:
250 if self.headers.get("Expect", "").lower().strip() == "100-continue":
251 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
253 self.environ = environ = self.make_environ()
254 status_set: str | None = None
255 headers_set: list[tuple[str, str]] | None = None
256 status_sent: str | None = None
257 headers_sent: list[tuple[str, str]] | None = None
258 chunk_response: bool = False
260 def write(data: bytes) -> None:
261 nonlocal status_sent, headers_sent, chunk_response
262 assert status_set is not None, "write() before start_response"
263 assert headers_set is not None, "write() before start_response"
264 if status_sent is None:
265 status_sent = status_set
266 headers_sent = headers_set
267 try:
268 code_str, msg = status_sent.split(None, 1)
269 except ValueError:
270 code_str, msg = status_sent, ""
271 code = int(code_str)
272 self.send_response(code, msg)
273 header_keys = set()
274 for key, value in headers_sent:
275 self.send_header(key, value)
276 header_keys.add(key.lower())
278 # Use chunked transfer encoding if there is no content
279 # length. Do not use for 1xx and 204 responses. 304
280 # responses and HEAD requests are also excluded, which
281 # is the more conservative behavior and matches other
282 # parts of the code.
283 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
284 if (
285 not (
286 "content-length" in header_keys
287 or environ["REQUEST_METHOD"] == "HEAD"
288 or (100 <= code < 200)
289 or code in {204, 304}
290 )
291 and self.protocol_version >= "HTTP/1.1"
292 ):
293 chunk_response = True
294 self.send_header("Transfer-Encoding", "chunked")
296 # Always close the connection. This disables HTTP/1.1
297 # keep-alive connections. They aren't handled well by
298 # Python's http.server because it doesn't know how to
299 # drain the stream before the next request line.
300 self.send_header("Connection", "close")
301 self.end_headers()
303 assert isinstance(data, bytes), "applications must write bytes"
305 if data:
306 if chunk_response:
307 self.wfile.write(hex(len(data))[2:].encode())
308 self.wfile.write(b"\r\n")
310 self.wfile.write(data)
312 if chunk_response:
313 self.wfile.write(b"\r\n")
315 self.wfile.flush()
317 def start_response(status, headers, exc_info=None): # type: ignore
318 nonlocal status_set, headers_set
319 if exc_info:
320 try:
321 if headers_sent:
322 raise exc_info[1].with_traceback(exc_info[2])
323 finally:
324 exc_info = None
325 elif headers_set:
326 raise AssertionError("Headers already set")
327 status_set = status
328 headers_set = headers
329 return write
331 def execute(app: WSGIApplication) -> None:
332 application_iter = app(environ, start_response)
333 try:
334 for data in application_iter:
335 write(data)
336 if not headers_sent:
337 write(b"")
338 if chunk_response:
339 self.wfile.write(b"0\r\n\r\n")
340 finally:
341 # Check for any remaining data in the read socket, and discard it. This
342 # will read past request.max_content_length, but lets the client see a
343 # 413 response instead of a connection reset failure. If we supported
344 # keep-alive connections, this naive approach would break by reading the
345 # next request line. Since we know that write (above) closes every
346 # connection we can read everything.
347 selector = selectors.DefaultSelector()
348 selector.register(self.connection, selectors.EVENT_READ)
349 total_size = 0
350 total_reads = 0
352 # A timeout of 0 tends to fail because a client needs a small amount of
353 # time to continue sending its data.
354 while selector.select(timeout=0.01):
355 # Only read 10MB into memory at a time.
356 data = self.rfile.read(10_000_000)
357 total_size += len(data)
358 total_reads += 1
360 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends
361 # more than that, they'll get a connection reset failure.
362 if not data or total_size >= 10_000_000_000 or total_reads > 1000:
363 break
365 selector.close()
367 if hasattr(application_iter, "close"):
368 application_iter.close()
370 try:
371 execute(self.server.app)
372 except connection_dropped_errors as e:
373 self.connection_dropped(e, environ)
374 except Exception as e:
375 if self.server.passthrough_errors:
376 raise
378 if status_sent is not None and chunk_response:
379 self.close_connection = True
381 try:
382 # if we haven't yet sent the headers but they are set
383 # we roll back to be able to set them again.
384 if status_sent is None:
385 status_set = None
386 headers_set = None
387 execute(InternalServerError())
388 except Exception:
389 pass
391 from .debug.tbtools import DebugTraceback
393 msg = DebugTraceback(e).render_traceback_text()
394 self.server.log("error", f"Error on request:\n{msg}")
396 def handle(self) -> None:
397 """Handles a request ignoring dropped connections."""
398 try:
399 super().handle()
400 except (ConnectionError, socket.timeout) as e:
401 self.connection_dropped(e)
402 except Exception as e:
403 if self.server.ssl_context is not None and is_ssl_error(e):
404 self.log_error("SSL error occurred: %s", e)
405 else:
406 raise
408 def connection_dropped(
409 self, error: BaseException, environ: WSGIEnvironment | None = None
410 ) -> None:
411 """Called if the connection was closed by the client. By default
412 nothing happens.
413 """
415 def __getattr__(self, name: str) -> t.Any:
416 # All HTTP methods are handled by run_wsgi.
417 if name.startswith("do_"):
418 return self.run_wsgi
420 # All other attributes are forwarded to the base class.
421 return getattr(super(), name)
423 def address_string(self) -> str:
424 if getattr(self, "environ", None):
425 return self.environ["REMOTE_ADDR"] # type: ignore
427 if not self.client_address:
428 return "<local>"
430 return self.client_address[0]
432 def port_integer(self) -> int:
433 return self.client_address[1]
435 # Escape control characters. This is defined (but private) in Python 3.12.
436 _control_char_table = str.maketrans(
437 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]}
438 )
439 _control_char_table[ord("\\")] = r"\\"
441 def log_request(self, code: int | str = "-", size: int | str = "-") -> None:
442 try:
443 path = uri_to_iri(self.path)
444 msg = f"{self.command} {path} {self.request_version}"
445 except AttributeError:
446 # path isn't set if the requestline was bad
447 msg = self.requestline
449 # Escape control characters that may be in the decoded path.
450 msg = msg.translate(self._control_char_table)
451 code = str(code)
453 if code[0] == "1": # 1xx - Informational
454 msg = _ansi_style(msg, "bold")
455 elif code == "200": # 2xx - Success
456 pass
457 elif code == "304": # 304 - Resource Not Modified
458 msg = _ansi_style(msg, "cyan")
459 elif code[0] == "3": # 3xx - Redirection
460 msg = _ansi_style(msg, "green")
461 elif code == "404": # 404 - Resource Not Found
462 msg = _ansi_style(msg, "yellow")
463 elif code[0] == "4": # 4xx - Client Error
464 msg = _ansi_style(msg, "bold", "red")
465 else: # 5xx, or any other response
466 msg = _ansi_style(msg, "bold", "magenta")
468 self.log("info", '"%s" %s %s', msg, code, size)
470 def log_error(self, format: str, *args: t.Any) -> None:
471 self.log("error", format, *args)
473 def log_message(self, format: str, *args: t.Any) -> None:
474 self.log("info", format, *args)
476 def log(self, type: str, message: str, *args: t.Any) -> None:
477 # an IPv6 scoped address contains "%" which breaks logging
478 address_string = self.address_string().replace("%", "%%")
479 _log(
480 type,
481 f"{address_string} - - [{self.log_date_time_string()}] {message}\n",
482 *args,
483 )
486def _ansi_style(value: str, *styles: str) -> str:
487 if not _log_add_style:
488 return value
490 codes = {
491 "bold": 1,
492 "red": 31,
493 "green": 32,
494 "yellow": 33,
495 "magenta": 35,
496 "cyan": 36,
497 }
499 for style in styles:
500 value = f"\x1b[{codes[style]}m{value}"
502 return f"{value}\x1b[0m"
505def generate_adhoc_ssl_pair(
506 cn: str | None = None,
507) -> tuple[Certificate, RSAPrivateKeyWithSerialization]:
508 try:
509 from cryptography import x509
510 from cryptography.hazmat.backends import default_backend
511 from cryptography.hazmat.primitives import hashes
512 from cryptography.hazmat.primitives.asymmetric import rsa
513 from cryptography.x509.oid import NameOID
514 except ImportError:
515 raise TypeError(
516 "Using ad-hoc certificates requires the cryptography library."
517 ) from None
519 backend = default_backend()
520 pkey = rsa.generate_private_key(
521 public_exponent=65537, key_size=2048, backend=backend
522 )
524 # pretty damn sure that this is not actually accepted by anyone
525 if cn is None:
526 cn = "*"
528 subject = x509.Name(
529 [
530 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
531 x509.NameAttribute(NameOID.COMMON_NAME, cn),
532 ]
533 )
535 backend = default_backend()
536 cert = (
537 x509.CertificateBuilder()
538 .subject_name(subject)
539 .issuer_name(subject)
540 .public_key(pkey.public_key())
541 .serial_number(x509.random_serial_number())
542 .not_valid_before(dt.now(timezone.utc))
543 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
544 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
545 .add_extension(
546 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]),
547 critical=False,
548 )
549 .sign(pkey, hashes.SHA256(), backend)
550 )
551 return cert, pkey
554def make_ssl_devcert(
555 base_path: str, host: str | None = None, cn: str | None = None
556) -> tuple[str, str]:
557 """Creates an SSL key for development. This should be used instead of
558 the ``'adhoc'`` key which generates a new cert on each server start.
559 It accepts a path for where it should store the key and cert and
560 either a host or CN. If a host is given it will use the CN
561 ``*.host/CN=host``.
563 For more information see :func:`run_simple`.
565 .. versionadded:: 0.9
567 :param base_path: the path to the certificate and key. The extension
568 ``.crt`` is added for the certificate, ``.key`` is
569 added for the key.
570 :param host: the name of the host. This can be used as an alternative
571 for the `cn`.
572 :param cn: the `CN` to use.
573 """
575 if host is not None:
576 cn = host
577 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
579 from cryptography.hazmat.primitives import serialization
581 cert_file = f"{base_path}.crt"
582 pkey_file = f"{base_path}.key"
584 with open(cert_file, "wb") as f:
585 f.write(cert.public_bytes(serialization.Encoding.PEM))
586 with open(pkey_file, "wb") as f:
587 f.write(
588 pkey.private_bytes(
589 encoding=serialization.Encoding.PEM,
590 format=serialization.PrivateFormat.TraditionalOpenSSL,
591 encryption_algorithm=serialization.NoEncryption(),
592 )
593 )
595 return cert_file, pkey_file
598def generate_adhoc_ssl_context() -> ssl.SSLContext:
599 """Generates an adhoc SSL context for the development server."""
600 import atexit
601 import tempfile
603 cert, pkey = generate_adhoc_ssl_pair()
605 from cryptography.hazmat.primitives import serialization
607 cert_handle, cert_file = tempfile.mkstemp()
608 pkey_handle, pkey_file = tempfile.mkstemp()
609 atexit.register(os.remove, pkey_file)
610 atexit.register(os.remove, cert_file)
612 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
613 os.write(
614 pkey_handle,
615 pkey.private_bytes(
616 encoding=serialization.Encoding.PEM,
617 format=serialization.PrivateFormat.TraditionalOpenSSL,
618 encryption_algorithm=serialization.NoEncryption(),
619 ),
620 )
622 os.close(cert_handle)
623 os.close(pkey_handle)
624 ctx = load_ssl_context(cert_file, pkey_file)
625 return ctx
628def load_ssl_context(
629 cert_file: str, pkey_file: str | None = None, protocol: int | None = None
630) -> ssl.SSLContext:
631 """Loads SSL context from cert/private key files and optional protocol.
632 Many parameters are directly taken from the API of
633 :py:class:`ssl.SSLContext`.
635 :param cert_file: Path of the certificate to use.
636 :param pkey_file: Path of the private key to use. If not given, the key
637 will be obtained from the certificate file.
638 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
639 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
640 """
641 if protocol is None:
642 protocol = ssl.PROTOCOL_TLS_SERVER
644 ctx = ssl.SSLContext(protocol)
645 ctx.load_cert_chain(cert_file, pkey_file)
646 return ctx
649def is_ssl_error(error: Exception | None = None) -> bool:
650 """Checks if the given error (or the current one) is an SSL error."""
651 if error is None:
652 error = t.cast(Exception, sys.exc_info()[1])
653 return isinstance(error, ssl.SSLError)
656def select_address_family(host: str, port: int) -> socket.AddressFamily:
657 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
658 the host and port."""
659 if host.startswith("unix://"):
660 return socket.AF_UNIX
661 elif ":" in host and hasattr(socket, "AF_INET6"):
662 return socket.AF_INET6
663 return socket.AF_INET
666def get_sockaddr(
667 host: str, port: int, family: socket.AddressFamily
668) -> tuple[str, int] | str:
669 """Return a fully qualified socket address that can be passed to
670 :func:`socket.bind`."""
671 if family == af_unix:
672 # Absolute path avoids IDNA encoding error when path starts with dot.
673 return os.path.abspath(host.partition("://")[2])
674 try:
675 res = socket.getaddrinfo(
676 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
677 )
678 except socket.gaierror:
679 return host, port
680 return res[0][4] # type: ignore
683def get_interface_ip(family: socket.AddressFamily) -> str:
684 """Get the IP address of an external interface. Used when binding to
685 0.0.0.0 or ::1 to show a more useful URL.
687 :meta private:
688 """
689 # arbitrary private address
690 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
692 with socket.socket(family, socket.SOCK_DGRAM) as s:
693 try:
694 s.connect((host, 58162))
695 except OSError:
696 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
698 return s.getsockname()[0] # type: ignore
701class BaseWSGIServer(HTTPServer):
702 """A WSGI server that that handles one request at a time.
704 Use :func:`make_server` to create a server instance.
705 """
707 multithread = False
708 multiprocess = False
709 request_queue_size = LISTEN_QUEUE
710 allow_reuse_address = True
712 def __init__(
713 self,
714 host: str,
715 port: int,
716 app: WSGIApplication,
717 handler: type[WSGIRequestHandler] | None = None,
718 passthrough_errors: bool = False,
719 ssl_context: _TSSLContextArg | None = None,
720 fd: int | None = None,
721 ) -> None:
722 if handler is None:
723 handler = WSGIRequestHandler
725 # If the handler doesn't directly set a protocol version and
726 # thread or process workers are used, then allow chunked
727 # responses and keep-alive connections by enabling HTTP/1.1.
728 if "protocol_version" not in vars(handler) and (
729 self.multithread or self.multiprocess
730 ):
731 handler.protocol_version = "HTTP/1.1"
733 self.host = host
734 self.port = port
735 self.app = app
736 self.passthrough_errors = passthrough_errors
738 self.address_family = address_family = select_address_family(host, port)
739 server_address = get_sockaddr(host, int(port), address_family)
741 # Remove a leftover Unix socket file from a previous run. Don't
742 # remove a file that was set up by run_simple.
743 if address_family == af_unix and fd is None:
744 server_address = t.cast(str, server_address)
746 if os.path.exists(server_address):
747 os.unlink(server_address)
749 # Bind and activate will be handled manually, it should only
750 # happen if we're not using a socket that was already set up.
751 super().__init__(
752 server_address, # type: ignore[arg-type]
753 handler,
754 bind_and_activate=False,
755 )
757 if fd is None:
758 # No existing socket descriptor, do bind_and_activate=True.
759 try:
760 self.server_bind()
761 self.server_activate()
762 except OSError as e:
763 # Catch connection issues and show them without the traceback. Show
764 # extra instructions for address not found, and for macOS.
765 self.server_close()
766 print(e.strerror, file=sys.stderr)
768 if e.errno == errno.EADDRINUSE:
769 print(
770 f"Port {port} is in use by another program. Either identify and"
771 " stop that program, or start the server with a different"
772 " port.",
773 file=sys.stderr,
774 )
776 if sys.platform == "darwin" and port == 5000:
777 print(
778 "On macOS, try searching for and disabling"
779 " 'AirPlay Receiver' in System Settings.",
780 file=sys.stderr,
781 )
783 sys.exit(1)
784 except BaseException:
785 self.server_close()
786 raise
787 else:
788 # TCPServer automatically opens a socket even if bind_and_activate is False.
789 # Close it to silence a ResourceWarning.
790 self.server_close()
792 # Use the passed in socket directly.
793 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
794 self.server_address = self.socket.getsockname()
796 if address_family != af_unix:
797 # If port was 0, this will record the bound port.
798 self.port = self.server_address[1]
800 if ssl_context is not None:
801 if isinstance(ssl_context, tuple):
802 ssl_context = load_ssl_context(*ssl_context)
803 elif ssl_context == "adhoc":
804 ssl_context = generate_adhoc_ssl_context()
806 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
807 self.ssl_context: ssl.SSLContext | None = ssl_context
808 else:
809 self.ssl_context = None
811 import importlib.metadata
813 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}"
815 def log(self, type: str, message: str, *args: t.Any) -> None:
816 _log(type, message, *args)
818 def serve_forever(self, poll_interval: float = 0.5) -> None:
819 try:
820 super().serve_forever(poll_interval=poll_interval)
821 except KeyboardInterrupt:
822 pass
823 finally:
824 self.server_close()
826 def handle_error(
827 self, request: t.Any, client_address: tuple[str, int] | str
828 ) -> None:
829 if self.passthrough_errors:
830 raise
832 return super().handle_error(request, client_address)
834 def log_startup(self) -> None:
835 """Show information about the address when starting the server."""
836 dev_warning = (
837 "WARNING: This is a development server. Do not use it in a production"
838 " deployment. Use a production WSGI server instead."
839 )
840 dev_warning = _ansi_style(dev_warning, "bold", "red")
841 messages = [dev_warning]
843 if self.address_family == af_unix:
844 messages.append(f" * Running on {self.host}")
845 else:
846 scheme = "http" if self.ssl_context is None else "https"
847 display_hostname = self.host
849 if self.host in {"0.0.0.0", "::"}:
850 messages.append(f" * Running on all addresses ({self.host})")
852 if self.host == "0.0.0.0":
853 localhost = "127.0.0.1"
854 display_hostname = get_interface_ip(socket.AF_INET)
855 else:
856 localhost = "[::1]"
857 display_hostname = get_interface_ip(socket.AF_INET6)
859 messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
861 if ":" in display_hostname:
862 display_hostname = f"[{display_hostname}]"
864 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
866 _log("info", "\n".join(messages))
869class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
870 """A WSGI server that handles concurrent requests in separate
871 threads.
873 Use :func:`make_server` to create a server instance.
874 """
876 multithread = True
877 daemon_threads = True
880class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
881 """A WSGI server that handles concurrent requests in separate forked
882 processes.
884 Use :func:`make_server` to create a server instance.
885 """
887 multiprocess = True
889 def __init__(
890 self,
891 host: str,
892 port: int,
893 app: WSGIApplication,
894 processes: int = 40,
895 handler: type[WSGIRequestHandler] | None = None,
896 passthrough_errors: bool = False,
897 ssl_context: _TSSLContextArg | None = None,
898 fd: int | None = None,
899 ) -> None:
900 if not can_fork:
901 raise ValueError("Your platform does not support forking.")
903 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
904 self.max_children = processes
907def make_server(
908 host: str,
909 port: int,
910 app: WSGIApplication,
911 threaded: bool = False,
912 processes: int = 1,
913 request_handler: type[WSGIRequestHandler] | None = None,
914 passthrough_errors: bool = False,
915 ssl_context: _TSSLContextArg | None = None,
916 fd: int | None = None,
917) -> BaseWSGIServer:
918 """Create an appropriate WSGI server instance based on the value of
919 ``threaded`` and ``processes``.
921 This is called from :func:`run_simple`, but can be used separately
922 to have access to the server object, such as to run it in a separate
923 thread.
925 See :func:`run_simple` for parameter docs.
926 """
927 if threaded and processes > 1:
928 raise ValueError("Cannot have a multi-thread and multi-process server.")
930 if threaded:
931 return ThreadedWSGIServer(
932 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
933 )
935 if processes > 1:
936 return ForkingWSGIServer(
937 host,
938 port,
939 app,
940 processes,
941 request_handler,
942 passthrough_errors,
943 ssl_context,
944 fd=fd,
945 )
947 return BaseWSGIServer(
948 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
949 )
952def is_running_from_reloader() -> bool:
953 """Check if the server is running as a subprocess within the
954 Werkzeug reloader.
956 .. versionadded:: 0.10
957 """
958 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
961def run_simple(
962 hostname: str,
963 port: int,
964 application: WSGIApplication,
965 use_reloader: bool = False,
966 use_debugger: bool = False,
967 use_evalex: bool = True,
968 extra_files: t.Iterable[str] | None = None,
969 exclude_patterns: t.Iterable[str] | None = None,
970 reloader_interval: int = 1,
971 reloader_type: str = "auto",
972 threaded: bool = False,
973 processes: int = 1,
974 request_handler: type[WSGIRequestHandler] | None = None,
975 static_files: dict[str, str | tuple[str, str]] | None = None,
976 passthrough_errors: bool = False,
977 ssl_context: _TSSLContextArg | None = None,
978) -> None:
979 """Start a development server for a WSGI application. Various
980 optional features can be enabled.
982 .. warning::
984 Do not use the development server when deploying to production.
985 It is intended for use only during local development. It is not
986 designed to be particularly efficient, stable, or secure.
988 :param hostname: The host to bind to, for example ``'localhost'``.
989 Can be a domain, IPv4 or IPv6 address, or file path starting
990 with ``unix://`` for a Unix socket.
991 :param port: The port to bind to, for example ``8080``. Using ``0``
992 tells the OS to pick a random free port.
993 :param application: The WSGI application to run.
994 :param use_reloader: Use a reloader process to restart the server
995 process when files are changed.
996 :param use_debugger: Use Werkzeug's debugger, which will show
997 formatted tracebacks on unhandled exceptions.
998 :param use_evalex: Make the debugger interactive. A Python terminal
999 can be opened for any frame in the traceback. Some protection is
1000 provided by requiring a PIN, but this should never be enabled
1001 on a publicly visible server.
1002 :param extra_files: The reloader will watch these files for changes
1003 in addition to Python modules. For example, watch a
1004 configuration file.
1005 :param exclude_patterns: The reloader will ignore changes to any
1006 files matching these :mod:`fnmatch` patterns. For example,
1007 ignore cache files.
1008 :param reloader_interval: How often the reloader tries to check for
1009 changes.
1010 :param reloader_type: The reloader to use. The ``'stat'`` reloader
1011 is built in, but may require significant CPU to watch files. The
1012 ``'watchdog'`` reloader is much more efficient but requires
1013 installing the ``watchdog`` package first.
1014 :param threaded: Handle concurrent requests using threads. Cannot be
1015 used with ``processes``.
1016 :param processes: Handle concurrent requests using up to this number
1017 of processes. Cannot be used with ``threaded``.
1018 :param request_handler: Use a different
1019 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
1020 handle requests.
1021 :param static_files: A dict mapping URL prefixes to directories to
1022 serve static files from using
1023 :class:`~werkzeug.middleware.SharedDataMiddleware`.
1024 :param passthrough_errors: Don't catch unhandled exceptions at the
1025 server level, let the server crash instead. If ``use_debugger``
1026 is enabled, the debugger will still catch such errors.
1027 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
1028 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
1029 tuple to create a typical context, or the string ``'adhoc'`` to
1030 generate a temporary self-signed certificate.
1032 .. versionchanged:: 2.1
1033 Instructions are shown for dealing with an "address already in
1034 use" error.
1036 .. versionchanged:: 2.1
1037 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
1038 addition to a real IP.
1040 .. versionchanged:: 2.1
1041 The command-line interface was removed.
1043 .. versionchanged:: 2.0
1044 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
1045 was bound as well as a warning not to run the development server
1046 in production.
1048 .. versionchanged:: 2.0
1049 The ``exclude_patterns`` parameter was added.
1051 .. versionchanged:: 0.15
1052 Bind to a Unix socket by passing a ``hostname`` that starts with
1053 ``unix://``.
1055 .. versionchanged:: 0.10
1056 Improved the reloader and added support for changing the backend
1057 through the ``reloader_type`` parameter.
1059 .. versionchanged:: 0.9
1060 A command-line interface was added.
1062 .. versionchanged:: 0.8
1063 ``ssl_context`` can be a tuple of paths to the certificate and
1064 private key files.
1066 .. versionchanged:: 0.6
1067 The ``ssl_context`` parameter was added.
1069 .. versionchanged:: 0.5
1070 The ``static_files`` and ``passthrough_errors`` parameters were
1071 added.
1072 """
1073 if not isinstance(port, int):
1074 raise TypeError("port must be an integer")
1076 if static_files:
1077 from .middleware.shared_data import SharedDataMiddleware
1079 application = SharedDataMiddleware(application, static_files)
1081 if use_debugger:
1082 from .debug import DebuggedApplication
1084 application = DebuggedApplication(application, evalex=use_evalex)
1085 # Allow the specified hostname to use the debugger, in addition to
1086 # localhost domains.
1087 application.trusted_hosts.append(hostname)
1089 if not is_running_from_reloader():
1090 fd = None
1091 else:
1092 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1094 srv = make_server(
1095 hostname,
1096 port,
1097 application,
1098 threaded,
1099 processes,
1100 request_handler,
1101 passthrough_errors,
1102 ssl_context,
1103 fd=fd,
1104 )
1105 srv.socket.set_inheritable(True)
1106 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno())
1108 if not is_running_from_reloader():
1109 srv.log_startup()
1110 _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
1112 if use_reloader:
1113 from ._reloader import run_with_reloader
1115 try:
1116 run_with_reloader(
1117 srv.serve_forever,
1118 extra_files=extra_files,
1119 exclude_patterns=exclude_patterns,
1120 interval=reloader_interval,
1121 reloader_type=reloader_type,
1122 )
1123 finally:
1124 srv.server_close()
1125 else:
1126 srv.serve_forever()