Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
9.. code-block:: python
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
15from __future__ import annotations
17import errno
18import io
19import os
20import selectors
21import socket
22import socketserver
23import sys
24import typing as t
25from datetime import datetime as dt
26from datetime import timedelta
27from datetime import timezone
28from http.server import BaseHTTPRequestHandler
29from http.server import HTTPServer
30from urllib.parse import unquote
31from urllib.parse import urlsplit
33from ._internal import _log
34from ._internal import _wsgi_encoding_dance
35from .exceptions import InternalServerError
36from .urls import uri_to_iri
38try:
39 import ssl
40except ImportError:
42 class _SslDummy:
43 def __getattr__(self, name: str) -> t.Any:
44 raise RuntimeError( # noqa: B904
45 "SSL is unavailable because this Python runtime was not"
46 " compiled with SSL/TLS support."
47 )
49 ssl = _SslDummy() # type: ignore
51_log_add_style = True
53if os.name == "nt":
54 try:
55 __import__("colorama")
56 except ImportError:
57 _log_add_style = False
59can_fork = hasattr(os, "fork")
61if can_fork:
62 ForkingMixIn = socketserver.ForkingMixIn
63else:
65 class ForkingMixIn: # type: ignore
66 pass
69try:
70 af_unix = socket.AF_UNIX
71except AttributeError:
72 af_unix = None # type: ignore
74LISTEN_QUEUE = 128
76_TSSLContextArg = t.Optional[
77 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], t.Literal["adhoc"]]
78]
80if t.TYPE_CHECKING:
81 from _typeshed.wsgi import WSGIApplication
82 from _typeshed.wsgi import WSGIEnvironment
83 from cryptography.hazmat.primitives.asymmetric.rsa import (
84 RSAPrivateKeyWithSerialization,
85 )
86 from cryptography.x509 import Certificate
89class DechunkedInput(io.RawIOBase):
90 """An input stream that handles Transfer-Encoding 'chunked'"""
92 def __init__(self, rfile: t.IO[bytes]) -> None:
93 self._rfile = rfile
94 self._done = False
95 self._len = 0
97 def readable(self) -> bool:
98 return True
100 def read_chunk_len(self) -> int:
101 try:
102 line = self._rfile.readline().decode("latin1")
103 _len = int(line.strip(), 16)
104 except ValueError as e:
105 raise OSError("Invalid chunk header") from e
106 if _len < 0:
107 raise OSError("Negative chunk length not allowed")
108 return _len
110 def readinto(self, buf: bytearray) -> int: # type: ignore
111 read = 0
112 while not self._done and read < len(buf):
113 if self._len == 0:
114 # This is the first chunk or we fully consumed the previous
115 # one. Read the next length of the next chunk
116 self._len = self.read_chunk_len()
118 if self._len == 0:
119 # Found the final chunk of size 0. The stream is now exhausted,
120 # but there is still a final newline that should be consumed
121 self._done = True
123 if self._len > 0:
124 # There is data (left) in this chunk, so append it to the
125 # buffer. If this operation fully consumes the chunk, this will
126 # reset self._len to 0.
127 n = min(len(buf), self._len)
129 # If (read + chunk size) becomes more than len(buf), buf will
130 # grow beyond the original size and read more data than
131 # required. So only read as much data as can fit in buf.
132 if read + n > len(buf):
133 buf[read:] = self._rfile.read(len(buf) - read)
134 self._len -= len(buf) - read
135 read = len(buf)
136 else:
137 buf[read : read + n] = self._rfile.read(n)
138 self._len -= n
139 read += n
141 if self._len == 0:
142 # Skip the terminating newline of a chunk that has been fully
143 # consumed. This also applies to the 0-sized final chunk
144 terminator = self._rfile.readline()
145 if terminator not in (b"\n", b"\r\n", b"\r"):
146 raise OSError("Missing chunk terminating newline")
148 return read
151class WSGIRequestHandler(BaseHTTPRequestHandler):
152 """A request handler that implements WSGI dispatching."""
154 server: BaseWSGIServer
156 @property
157 def server_version(self) -> str: # type: ignore
158 return self.server._server_version
160 def make_environ(self) -> WSGIEnvironment:
161 request_url = urlsplit(self.path)
162 url_scheme = "http" if self.server.ssl_context is None else "https"
164 if not self.client_address:
165 self.client_address = ("<local>", 0)
166 elif isinstance(self.client_address, str):
167 self.client_address = (self.client_address, 0)
169 # If there was no scheme but the path started with two slashes,
170 # the first segment may have been incorrectly parsed as the
171 # netloc, prepend it to the path again.
172 if not request_url.scheme and request_url.netloc:
173 path_info = f"/{request_url.netloc}{request_url.path}"
174 else:
175 path_info = request_url.path
177 path_info = unquote(path_info)
179 environ: WSGIEnvironment = {
180 "wsgi.version": (1, 0),
181 "wsgi.url_scheme": url_scheme,
182 "wsgi.input": self.rfile,
183 "wsgi.errors": sys.stderr,
184 "wsgi.multithread": self.server.multithread,
185 "wsgi.multiprocess": self.server.multiprocess,
186 "wsgi.run_once": False,
187 "werkzeug.socket": self.connection,
188 "SERVER_SOFTWARE": self.server_version,
189 "REQUEST_METHOD": self.command,
190 "SCRIPT_NAME": "",
191 "PATH_INFO": _wsgi_encoding_dance(path_info),
192 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
193 # Non-standard, added by mod_wsgi, uWSGI
194 "REQUEST_URI": _wsgi_encoding_dance(self.path),
195 # Non-standard, added by gunicorn
196 "RAW_URI": _wsgi_encoding_dance(self.path),
197 "REMOTE_ADDR": self.address_string(),
198 "REMOTE_PORT": self.port_integer(),
199 "SERVER_NAME": self.server.server_address[0],
200 "SERVER_PORT": str(self.server.server_address[1]),
201 "SERVER_PROTOCOL": self.request_version,
202 }
204 for key, value in self.headers.items():
205 if "_" in key:
206 continue
208 key = key.upper().replace("-", "_")
209 value = value.replace("\r\n", "")
210 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
211 key = f"HTTP_{key}"
212 if key in environ:
213 value = f"{environ[key]},{value}"
214 environ[key] = value
216 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
217 environ["wsgi.input_terminated"] = True
218 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
220 # Per RFC 2616, if the URL is absolute, use that as the host.
221 # We're using "has a scheme" to indicate an absolute URL.
222 if request_url.scheme and request_url.netloc:
223 environ["HTTP_HOST"] = request_url.netloc
225 try:
226 # binary_form=False gives nicer information, but wouldn't be compatible with
227 # what Nginx or Apache could return.
228 peer_cert = self.connection.getpeercert(binary_form=True)
229 if peer_cert is not None:
230 # Nginx and Apache use PEM format.
231 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
232 except ValueError:
233 # SSL handshake hasn't finished.
234 self.server.log("error", "Cannot fetch SSL peer certificate info")
235 except AttributeError:
236 # Not using TLS, the socket will not have getpeercert().
237 pass
239 return environ
241 def run_wsgi(self) -> None:
242 if self.headers.get("Expect", "").lower().strip() == "100-continue":
243 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
245 self.environ = environ = self.make_environ()
246 status_set: str | None = None
247 headers_set: list[tuple[str, str]] | None = None
248 status_sent: str | None = None
249 headers_sent: list[tuple[str, str]] | None = None
250 chunk_response: bool = False
252 def write(data: bytes) -> None:
253 nonlocal status_sent, headers_sent, chunk_response
254 assert status_set is not None, "write() before start_response"
255 assert headers_set is not None, "write() before start_response"
256 if status_sent is None:
257 status_sent = status_set
258 headers_sent = headers_set
259 try:
260 code_str, msg = status_sent.split(None, 1)
261 except ValueError:
262 code_str, msg = status_sent, ""
263 code = int(code_str)
264 self.send_response(code, msg)
265 header_keys = set()
266 for key, value in headers_sent:
267 self.send_header(key, value)
268 header_keys.add(key.lower())
270 # Use chunked transfer encoding if there is no content
271 # length. Do not use for 1xx and 204 responses. 304
272 # responses and HEAD requests are also excluded, which
273 # is the more conservative behavior and matches other
274 # parts of the code.
275 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
276 if (
277 not (
278 "content-length" in header_keys
279 or environ["REQUEST_METHOD"] == "HEAD"
280 or (100 <= code < 200)
281 or code in {204, 304}
282 )
283 and self.protocol_version >= "HTTP/1.1"
284 ):
285 chunk_response = True
286 self.send_header("Transfer-Encoding", "chunked")
288 # Always close the connection. This disables HTTP/1.1
289 # keep-alive connections. They aren't handled well by
290 # Python's http.server because it doesn't know how to
291 # drain the stream before the next request line.
292 self.send_header("Connection", "close")
293 self.end_headers()
295 assert isinstance(data, bytes), "applications must write bytes"
297 if data:
298 if chunk_response:
299 self.wfile.write(hex(len(data))[2:].encode())
300 self.wfile.write(b"\r\n")
302 self.wfile.write(data)
304 if chunk_response:
305 self.wfile.write(b"\r\n")
307 self.wfile.flush()
309 def start_response(status, headers, exc_info=None): # type: ignore
310 nonlocal status_set, headers_set
311 if exc_info:
312 try:
313 if headers_sent:
314 raise exc_info[1].with_traceback(exc_info[2])
315 finally:
316 exc_info = None
317 elif headers_set:
318 raise AssertionError("Headers already set")
319 status_set = status
320 headers_set = headers
321 return write
323 def execute(app: WSGIApplication) -> None:
324 application_iter = app(environ, start_response)
325 try:
326 for data in application_iter:
327 write(data)
328 if not headers_sent:
329 write(b"")
330 if chunk_response:
331 self.wfile.write(b"0\r\n\r\n")
332 finally:
333 # Check for any remaining data in the read socket, and discard it. This
334 # will read past request.max_content_length, but lets the client see a
335 # 413 response instead of a connection reset failure. If we supported
336 # keep-alive connections, this naive approach would break by reading the
337 # next request line. Since we know that write (above) closes every
338 # connection we can read everything.
339 selector = selectors.DefaultSelector()
340 selector.register(self.connection, selectors.EVENT_READ)
341 total_size = 0
342 total_reads = 0
344 # A timeout of 0 tends to fail because a client needs a small amount of
345 # time to continue sending its data.
346 while selector.select(timeout=0.01):
347 # Only read 10MB into memory at a time.
348 data = self.rfile.read(10_000_000)
349 total_size += len(data)
350 total_reads += 1
352 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends
353 # more than that, they'll get a connection reset failure.
354 if not data or total_size >= 10_000_000_000 or total_reads > 1000:
355 break
357 selector.close()
359 if hasattr(application_iter, "close"):
360 application_iter.close()
362 try:
363 execute(self.server.app)
364 except (ConnectionError, socket.timeout) as e:
365 self.connection_dropped(e, environ)
366 except Exception as e:
367 if self.server.passthrough_errors:
368 raise
370 if status_sent is not None and chunk_response:
371 self.close_connection = True
373 try:
374 # if we haven't yet sent the headers but they are set
375 # we roll back to be able to set them again.
376 if status_sent is None:
377 status_set = None
378 headers_set = None
379 execute(InternalServerError())
380 except Exception:
381 pass
383 from .debug.tbtools import DebugTraceback
385 msg = DebugTraceback(e).render_traceback_text()
386 self.server.log("error", f"Error on request:\n{msg}")
388 def handle(self) -> None:
389 """Handles a request ignoring dropped connections."""
390 try:
391 super().handle()
392 except (ConnectionError, socket.timeout) as e:
393 self.connection_dropped(e)
394 except Exception as e:
395 if self.server.ssl_context is not None and is_ssl_error(e):
396 self.log_error("SSL error occurred: %s", e)
397 else:
398 raise
400 def connection_dropped(
401 self, error: BaseException, environ: WSGIEnvironment | None = None
402 ) -> None:
403 """Called if the connection was closed by the client. By default
404 nothing happens.
405 """
407 def __getattr__(self, name: str) -> t.Any:
408 # All HTTP methods are handled by run_wsgi.
409 if name.startswith("do_"):
410 return self.run_wsgi
412 # All other attributes are forwarded to the base class.
413 return getattr(super(), name)
415 def address_string(self) -> str:
416 if getattr(self, "environ", None):
417 return self.environ["REMOTE_ADDR"] # type: ignore
419 if not self.client_address:
420 return "<local>"
422 return self.client_address[0]
424 def port_integer(self) -> int:
425 return self.client_address[1]
427 # Escape control characters. This is defined (but private) in Python 3.12.
428 _control_char_table = str.maketrans(
429 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]}
430 )
431 _control_char_table[ord("\\")] = r"\\"
433 def log_request(self, code: int | str = "-", size: int | str = "-") -> None:
434 try:
435 path = uri_to_iri(self.path)
436 msg = f"{self.command} {path} {self.request_version}"
437 except AttributeError:
438 # path isn't set if the requestline was bad
439 msg = self.requestline
441 # Escape control characters that may be in the decoded path.
442 msg = msg.translate(self._control_char_table)
443 code = str(code)
445 if code[0] == "1": # 1xx - Informational
446 msg = _ansi_style(msg, "bold")
447 elif code == "200": # 2xx - Success
448 pass
449 elif code == "304": # 304 - Resource Not Modified
450 msg = _ansi_style(msg, "cyan")
451 elif code[0] == "3": # 3xx - Redirection
452 msg = _ansi_style(msg, "green")
453 elif code == "404": # 404 - Resource Not Found
454 msg = _ansi_style(msg, "yellow")
455 elif code[0] == "4": # 4xx - Client Error
456 msg = _ansi_style(msg, "bold", "red")
457 else: # 5xx, or any other response
458 msg = _ansi_style(msg, "bold", "magenta")
460 self.log("info", '"%s" %s %s', msg, code, size)
462 def log_error(self, format: str, *args: t.Any) -> None:
463 self.log("error", format, *args)
465 def log_message(self, format: str, *args: t.Any) -> None:
466 self.log("info", format, *args)
468 def log(self, type: str, message: str, *args: t.Any) -> None:
469 _log(
470 type,
471 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n",
472 *args,
473 )
476def _ansi_style(value: str, *styles: str) -> str:
477 if not _log_add_style:
478 return value
480 codes = {
481 "bold": 1,
482 "red": 31,
483 "green": 32,
484 "yellow": 33,
485 "magenta": 35,
486 "cyan": 36,
487 }
489 for style in styles:
490 value = f"\x1b[{codes[style]}m{value}"
492 return f"{value}\x1b[0m"
495def generate_adhoc_ssl_pair(
496 cn: str | None = None,
497) -> tuple[Certificate, RSAPrivateKeyWithSerialization]:
498 try:
499 from cryptography import x509
500 from cryptography.hazmat.backends import default_backend
501 from cryptography.hazmat.primitives import hashes
502 from cryptography.hazmat.primitives.asymmetric import rsa
503 from cryptography.x509.oid import NameOID
504 except ImportError:
505 raise TypeError(
506 "Using ad-hoc certificates requires the cryptography library."
507 ) from None
509 backend = default_backend()
510 pkey = rsa.generate_private_key(
511 public_exponent=65537, key_size=2048, backend=backend
512 )
514 # pretty damn sure that this is not actually accepted by anyone
515 if cn is None:
516 cn = "*"
518 subject = x509.Name(
519 [
520 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
521 x509.NameAttribute(NameOID.COMMON_NAME, cn),
522 ]
523 )
525 backend = default_backend()
526 cert = (
527 x509.CertificateBuilder()
528 .subject_name(subject)
529 .issuer_name(subject)
530 .public_key(pkey.public_key())
531 .serial_number(x509.random_serial_number())
532 .not_valid_before(dt.now(timezone.utc))
533 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
534 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
535 .add_extension(
536 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]),
537 critical=False,
538 )
539 .sign(pkey, hashes.SHA256(), backend)
540 )
541 return cert, pkey
544def make_ssl_devcert(
545 base_path: str, host: str | None = None, cn: str | None = None
546) -> tuple[str, str]:
547 """Creates an SSL key for development. This should be used instead of
548 the ``'adhoc'`` key which generates a new cert on each server start.
549 It accepts a path for where it should store the key and cert and
550 either a host or CN. If a host is given it will use the CN
551 ``*.host/CN=host``.
553 For more information see :func:`run_simple`.
555 .. versionadded:: 0.9
557 :param base_path: the path to the certificate and key. The extension
558 ``.crt`` is added for the certificate, ``.key`` is
559 added for the key.
560 :param host: the name of the host. This can be used as an alternative
561 for the `cn`.
562 :param cn: the `CN` to use.
563 """
565 if host is not None:
566 cn = host
567 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
569 from cryptography.hazmat.primitives import serialization
571 cert_file = f"{base_path}.crt"
572 pkey_file = f"{base_path}.key"
574 with open(cert_file, "wb") as f:
575 f.write(cert.public_bytes(serialization.Encoding.PEM))
576 with open(pkey_file, "wb") as f:
577 f.write(
578 pkey.private_bytes(
579 encoding=serialization.Encoding.PEM,
580 format=serialization.PrivateFormat.TraditionalOpenSSL,
581 encryption_algorithm=serialization.NoEncryption(),
582 )
583 )
585 return cert_file, pkey_file
588def generate_adhoc_ssl_context() -> ssl.SSLContext:
589 """Generates an adhoc SSL context for the development server."""
590 import atexit
591 import tempfile
593 cert, pkey = generate_adhoc_ssl_pair()
595 from cryptography.hazmat.primitives import serialization
597 cert_handle, cert_file = tempfile.mkstemp()
598 pkey_handle, pkey_file = tempfile.mkstemp()
599 atexit.register(os.remove, pkey_file)
600 atexit.register(os.remove, cert_file)
602 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
603 os.write(
604 pkey_handle,
605 pkey.private_bytes(
606 encoding=serialization.Encoding.PEM,
607 format=serialization.PrivateFormat.TraditionalOpenSSL,
608 encryption_algorithm=serialization.NoEncryption(),
609 ),
610 )
612 os.close(cert_handle)
613 os.close(pkey_handle)
614 ctx = load_ssl_context(cert_file, pkey_file)
615 return ctx
618def load_ssl_context(
619 cert_file: str, pkey_file: str | None = None, protocol: int | None = None
620) -> ssl.SSLContext:
621 """Loads SSL context from cert/private key files and optional protocol.
622 Many parameters are directly taken from the API of
623 :py:class:`ssl.SSLContext`.
625 :param cert_file: Path of the certificate to use.
626 :param pkey_file: Path of the private key to use. If not given, the key
627 will be obtained from the certificate file.
628 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
629 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
630 """
631 if protocol is None:
632 protocol = ssl.PROTOCOL_TLS_SERVER
634 ctx = ssl.SSLContext(protocol)
635 ctx.load_cert_chain(cert_file, pkey_file)
636 return ctx
639def is_ssl_error(error: Exception | None = None) -> bool:
640 """Checks if the given error (or the current one) is an SSL error."""
641 if error is None:
642 error = t.cast(Exception, sys.exc_info()[1])
643 return isinstance(error, ssl.SSLError)
646def select_address_family(host: str, port: int) -> socket.AddressFamily:
647 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
648 the host and port."""
649 if host.startswith("unix://"):
650 return socket.AF_UNIX
651 elif ":" in host and hasattr(socket, "AF_INET6"):
652 return socket.AF_INET6
653 return socket.AF_INET
656def get_sockaddr(
657 host: str, port: int, family: socket.AddressFamily
658) -> tuple[str, int] | str:
659 """Return a fully qualified socket address that can be passed to
660 :func:`socket.bind`."""
661 if family == af_unix:
662 # Absolute path avoids IDNA encoding error when path starts with dot.
663 return os.path.abspath(host.partition("://")[2])
664 try:
665 res = socket.getaddrinfo(
666 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
667 )
668 except socket.gaierror:
669 return host, port
670 return res[0][4] # type: ignore
673def get_interface_ip(family: socket.AddressFamily) -> str:
674 """Get the IP address of an external interface. Used when binding to
675 0.0.0.0 or ::1 to show a more useful URL.
677 :meta private:
678 """
679 # arbitrary private address
680 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
682 with socket.socket(family, socket.SOCK_DGRAM) as s:
683 try:
684 s.connect((host, 58162))
685 except OSError:
686 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
688 return s.getsockname()[0] # type: ignore
691class BaseWSGIServer(HTTPServer):
692 """A WSGI server that that handles one request at a time.
694 Use :func:`make_server` to create a server instance.
695 """
697 multithread = False
698 multiprocess = False
699 request_queue_size = LISTEN_QUEUE
700 allow_reuse_address = True
702 def __init__(
703 self,
704 host: str,
705 port: int,
706 app: WSGIApplication,
707 handler: type[WSGIRequestHandler] | None = None,
708 passthrough_errors: bool = False,
709 ssl_context: _TSSLContextArg | None = None,
710 fd: int | None = None,
711 ) -> None:
712 if handler is None:
713 handler = WSGIRequestHandler
715 # If the handler doesn't directly set a protocol version and
716 # thread or process workers are used, then allow chunked
717 # responses and keep-alive connections by enabling HTTP/1.1.
718 if "protocol_version" not in vars(handler) and (
719 self.multithread or self.multiprocess
720 ):
721 handler.protocol_version = "HTTP/1.1"
723 self.host = host
724 self.port = port
725 self.app = app
726 self.passthrough_errors = passthrough_errors
728 self.address_family = address_family = select_address_family(host, port)
729 server_address = get_sockaddr(host, int(port), address_family)
731 # Remove a leftover Unix socket file from a previous run. Don't
732 # remove a file that was set up by run_simple.
733 if address_family == af_unix and fd is None:
734 server_address = t.cast(str, server_address)
736 if os.path.exists(server_address):
737 os.unlink(server_address)
739 # Bind and activate will be handled manually, it should only
740 # happen if we're not using a socket that was already set up.
741 super().__init__(
742 server_address, # type: ignore[arg-type]
743 handler,
744 bind_and_activate=False,
745 )
747 if fd is None:
748 # No existing socket descriptor, do bind_and_activate=True.
749 try:
750 self.server_bind()
751 self.server_activate()
752 except OSError as e:
753 # Catch connection issues and show them without the traceback. Show
754 # extra instructions for address not found, and for macOS.
755 self.server_close()
756 print(e.strerror, file=sys.stderr)
758 if e.errno == errno.EADDRINUSE:
759 print(
760 f"Port {port} is in use by another program. Either identify and"
761 " stop that program, or start the server with a different"
762 " port.",
763 file=sys.stderr,
764 )
766 if sys.platform == "darwin" and port == 5000:
767 print(
768 "On macOS, try disabling the 'AirPlay Receiver' service"
769 " from System Preferences -> General -> AirDrop & Handoff.",
770 file=sys.stderr,
771 )
773 sys.exit(1)
774 except BaseException:
775 self.server_close()
776 raise
777 else:
778 # TCPServer automatically opens a socket even if bind_and_activate is False.
779 # Close it to silence a ResourceWarning.
780 self.server_close()
782 # Use the passed in socket directly.
783 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
784 self.server_address = self.socket.getsockname()
786 if address_family != af_unix:
787 # If port was 0, this will record the bound port.
788 self.port = self.server_address[1]
790 if ssl_context is not None:
791 if isinstance(ssl_context, tuple):
792 ssl_context = load_ssl_context(*ssl_context)
793 elif ssl_context == "adhoc":
794 ssl_context = generate_adhoc_ssl_context()
796 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
797 self.ssl_context: ssl.SSLContext | None = ssl_context
798 else:
799 self.ssl_context = None
801 import importlib.metadata
803 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}"
805 def log(self, type: str, message: str, *args: t.Any) -> None:
806 _log(type, message, *args)
808 def serve_forever(self, poll_interval: float = 0.5) -> None:
809 try:
810 super().serve_forever(poll_interval=poll_interval)
811 except KeyboardInterrupt:
812 pass
813 finally:
814 self.server_close()
816 def handle_error(
817 self, request: t.Any, client_address: tuple[str, int] | str
818 ) -> None:
819 if self.passthrough_errors:
820 raise
822 return super().handle_error(request, client_address)
824 def log_startup(self) -> None:
825 """Show information about the address when starting the server."""
826 dev_warning = (
827 "WARNING: This is a development server. Do not use it in a production"
828 " deployment. Use a production WSGI server instead."
829 )
830 dev_warning = _ansi_style(dev_warning, "bold", "red")
831 messages = [dev_warning]
833 if self.address_family == af_unix:
834 messages.append(f" * Running on {self.host}")
835 else:
836 scheme = "http" if self.ssl_context is None else "https"
837 display_hostname = self.host
839 if self.host in {"0.0.0.0", "::"}:
840 messages.append(f" * Running on all addresses ({self.host})")
842 if self.host == "0.0.0.0":
843 localhost = "127.0.0.1"
844 display_hostname = get_interface_ip(socket.AF_INET)
845 else:
846 localhost = "[::1]"
847 display_hostname = get_interface_ip(socket.AF_INET6)
849 messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
851 if ":" in display_hostname:
852 display_hostname = f"[{display_hostname}]"
854 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
856 _log("info", "\n".join(messages))
859class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
860 """A WSGI server that handles concurrent requests in separate
861 threads.
863 Use :func:`make_server` to create a server instance.
864 """
866 multithread = True
867 daemon_threads = True
870class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
871 """A WSGI server that handles concurrent requests in separate forked
872 processes.
874 Use :func:`make_server` to create a server instance.
875 """
877 multiprocess = True
879 def __init__(
880 self,
881 host: str,
882 port: int,
883 app: WSGIApplication,
884 processes: int = 40,
885 handler: type[WSGIRequestHandler] | None = None,
886 passthrough_errors: bool = False,
887 ssl_context: _TSSLContextArg | None = None,
888 fd: int | None = None,
889 ) -> None:
890 if not can_fork:
891 raise ValueError("Your platform does not support forking.")
893 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
894 self.max_children = processes
897def make_server(
898 host: str,
899 port: int,
900 app: WSGIApplication,
901 threaded: bool = False,
902 processes: int = 1,
903 request_handler: type[WSGIRequestHandler] | None = None,
904 passthrough_errors: bool = False,
905 ssl_context: _TSSLContextArg | None = None,
906 fd: int | None = None,
907) -> BaseWSGIServer:
908 """Create an appropriate WSGI server instance based on the value of
909 ``threaded`` and ``processes``.
911 This is called from :func:`run_simple`, but can be used separately
912 to have access to the server object, such as to run it in a separate
913 thread.
915 See :func:`run_simple` for parameter docs.
916 """
917 if threaded and processes > 1:
918 raise ValueError("Cannot have a multi-thread and multi-process server.")
920 if threaded:
921 return ThreadedWSGIServer(
922 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
923 )
925 if processes > 1:
926 return ForkingWSGIServer(
927 host,
928 port,
929 app,
930 processes,
931 request_handler,
932 passthrough_errors,
933 ssl_context,
934 fd=fd,
935 )
937 return BaseWSGIServer(
938 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
939 )
942def is_running_from_reloader() -> bool:
943 """Check if the server is running as a subprocess within the
944 Werkzeug reloader.
946 .. versionadded:: 0.10
947 """
948 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
951def run_simple(
952 hostname: str,
953 port: int,
954 application: WSGIApplication,
955 use_reloader: bool = False,
956 use_debugger: bool = False,
957 use_evalex: bool = True,
958 extra_files: t.Iterable[str] | None = None,
959 exclude_patterns: t.Iterable[str] | None = None,
960 reloader_interval: int = 1,
961 reloader_type: str = "auto",
962 threaded: bool = False,
963 processes: int = 1,
964 request_handler: type[WSGIRequestHandler] | None = None,
965 static_files: dict[str, str | tuple[str, str]] | None = None,
966 passthrough_errors: bool = False,
967 ssl_context: _TSSLContextArg | None = None,
968) -> None:
969 """Start a development server for a WSGI application. Various
970 optional features can be enabled.
972 .. warning::
974 Do not use the development server when deploying to production.
975 It is intended for use only during local development. It is not
976 designed to be particularly efficient, stable, or secure.
978 :param hostname: The host to bind to, for example ``'localhost'``.
979 Can be a domain, IPv4 or IPv6 address, or file path starting
980 with ``unix://`` for a Unix socket.
981 :param port: The port to bind to, for example ``8080``. Using ``0``
982 tells the OS to pick a random free port.
983 :param application: The WSGI application to run.
984 :param use_reloader: Use a reloader process to restart the server
985 process when files are changed.
986 :param use_debugger: Use Werkzeug's debugger, which will show
987 formatted tracebacks on unhandled exceptions.
988 :param use_evalex: Make the debugger interactive. A Python terminal
989 can be opened for any frame in the traceback. Some protection is
990 provided by requiring a PIN, but this should never be enabled
991 on a publicly visible server.
992 :param extra_files: The reloader will watch these files for changes
993 in addition to Python modules. For example, watch a
994 configuration file.
995 :param exclude_patterns: The reloader will ignore changes to any
996 files matching these :mod:`fnmatch` patterns. For example,
997 ignore cache files.
998 :param reloader_interval: How often the reloader tries to check for
999 changes.
1000 :param reloader_type: The reloader to use. The ``'stat'`` reloader
1001 is built in, but may require significant CPU to watch files. The
1002 ``'watchdog'`` reloader is much more efficient but requires
1003 installing the ``watchdog`` package first.
1004 :param threaded: Handle concurrent requests using threads. Cannot be
1005 used with ``processes``.
1006 :param processes: Handle concurrent requests using up to this number
1007 of processes. Cannot be used with ``threaded``.
1008 :param request_handler: Use a different
1009 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
1010 handle requests.
1011 :param static_files: A dict mapping URL prefixes to directories to
1012 serve static files from using
1013 :class:`~werkzeug.middleware.SharedDataMiddleware`.
1014 :param passthrough_errors: Don't catch unhandled exceptions at the
1015 server level, let the server crash instead. If ``use_debugger``
1016 is enabled, the debugger will still catch such errors.
1017 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
1018 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
1019 tuple to create a typical context, or the string ``'adhoc'`` to
1020 generate a temporary self-signed certificate.
1022 .. versionchanged:: 2.1
1023 Instructions are shown for dealing with an "address already in
1024 use" error.
1026 .. versionchanged:: 2.1
1027 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
1028 addition to a real IP.
1030 .. versionchanged:: 2.1
1031 The command-line interface was removed.
1033 .. versionchanged:: 2.0
1034 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
1035 was bound as well as a warning not to run the development server
1036 in production.
1038 .. versionchanged:: 2.0
1039 The ``exclude_patterns`` parameter was added.
1041 .. versionchanged:: 0.15
1042 Bind to a Unix socket by passing a ``hostname`` that starts with
1043 ``unix://``.
1045 .. versionchanged:: 0.10
1046 Improved the reloader and added support for changing the backend
1047 through the ``reloader_type`` parameter.
1049 .. versionchanged:: 0.9
1050 A command-line interface was added.
1052 .. versionchanged:: 0.8
1053 ``ssl_context`` can be a tuple of paths to the certificate and
1054 private key files.
1056 .. versionchanged:: 0.6
1057 The ``ssl_context`` parameter was added.
1059 .. versionchanged:: 0.5
1060 The ``static_files`` and ``passthrough_errors`` parameters were
1061 added.
1062 """
1063 if not isinstance(port, int):
1064 raise TypeError("port must be an integer")
1066 if static_files:
1067 from .middleware.shared_data import SharedDataMiddleware
1069 application = SharedDataMiddleware(application, static_files)
1071 if use_debugger:
1072 from .debug import DebuggedApplication
1074 application = DebuggedApplication(application, evalex=use_evalex)
1075 # Allow the specified hostname to use the debugger, in addition to
1076 # localhost domains.
1077 application.trusted_hosts.append(hostname)
1079 if not is_running_from_reloader():
1080 fd = None
1081 else:
1082 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1084 srv = make_server(
1085 hostname,
1086 port,
1087 application,
1088 threaded,
1089 processes,
1090 request_handler,
1091 passthrough_errors,
1092 ssl_context,
1093 fd=fd,
1094 )
1095 srv.socket.set_inheritable(True)
1096 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno())
1098 if not is_running_from_reloader():
1099 srv.log_startup()
1100 _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
1102 if use_reloader:
1103 from ._reloader import run_with_reloader
1105 try:
1106 run_with_reloader(
1107 srv.serve_forever,
1108 extra_files=extra_files,
1109 exclude_patterns=exclude_patterns,
1110 interval=reloader_interval,
1111 reloader_type=reloader_type,
1112 )
1113 finally:
1114 srv.server_close()
1115 else:
1116 srv.serve_forever()