Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/serving.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
9.. code-block:: python
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
15from __future__ import annotations
17import errno
18import io
19import os
20import selectors
21import socket
22import socketserver
23import sys
24import typing as t
25from datetime import datetime as dt
26from datetime import timedelta
27from datetime import timezone
28from http.server import BaseHTTPRequestHandler
29from http.server import HTTPServer
30from urllib.parse import unquote
31from urllib.parse import urlsplit
33from ._internal import _log
34from ._internal import _plain_int
35from ._internal import _wsgi_encoding_dance
36from .datastructures import HeaderSet
37from .exceptions import InternalServerError
38from .urls import uri_to_iri
40try:
41 import ssl
43 connection_dropped_errors: tuple[type[Exception], ...] = (
44 ConnectionError,
45 socket.timeout,
46 ssl.SSLEOFError,
47 )
48except ImportError:
50 class _SslDummy:
51 def __getattr__(self, name: str) -> t.Any:
52 raise RuntimeError( # noqa: B904
53 "SSL is unavailable because this Python runtime was not"
54 " compiled with SSL/TLS support."
55 )
57 ssl = _SslDummy() # type: ignore
58 connection_dropped_errors = (ConnectionError, socket.timeout)
60_log_add_style = True
62if os.name == "nt":
63 try:
64 __import__("colorama")
65 except ImportError:
66 _log_add_style = False
68can_fork = hasattr(os, "fork")
70if can_fork:
71 ForkingMixIn = socketserver.ForkingMixIn
72else:
74 class ForkingMixIn: # type: ignore
75 pass
78try:
79 af_unix = socket.AF_UNIX
80except AttributeError:
81 af_unix = None # type: ignore
83LISTEN_QUEUE = 128
85_TSSLContextArg = t.Union[
86 "ssl.SSLContext", tuple[str, str | None], t.Literal["adhoc"], None
87]
89if t.TYPE_CHECKING:
90 from _typeshed.wsgi import WSGIApplication
91 from _typeshed.wsgi import WSGIEnvironment
92 from cryptography.hazmat.primitives.asymmetric.rsa import (
93 RSAPrivateKeyWithSerialization,
94 )
95 from cryptography.x509 import Certificate
98class DechunkedInput(io.RawIOBase):
99 """An input stream that handles Transfer-Encoding 'chunked'"""
101 def __init__(self, rfile: t.IO[bytes]) -> None:
102 self._rfile = rfile
103 self._done = False
104 self._len = 0
106 def readable(self) -> bool:
107 return True
109 def read_chunk_len(self) -> int:
110 try:
111 line = self._rfile.readline().decode("latin1")
112 _len = _plain_int(line, 16)
113 except ValueError as e:
114 raise OSError("Invalid chunk header") from e
115 if _len < 0:
116 raise OSError("Negative chunk length not allowed")
117 return _len
119 def readinto(self, buf: bytearray) -> int: # type: ignore
120 read = 0
121 while not self._done and read < len(buf):
122 if self._len == 0:
123 # This is the first chunk or we fully consumed the previous
124 # one. Read the next length of the next chunk
125 self._len = self.read_chunk_len()
127 if self._len == 0:
128 # Found the final chunk of size 0. The stream is now exhausted,
129 # but there is still a final newline that should be consumed
130 self._done = True
132 if self._len > 0:
133 # There is data (left) in this chunk, so append it to the
134 # buffer. If this operation fully consumes the chunk, this will
135 # reset self._len to 0.
136 n = min(len(buf), self._len)
138 # If (read + chunk size) becomes more than len(buf), buf will
139 # grow beyond the original size and read more data than
140 # required. So only read as much data as can fit in buf.
141 if read + n > len(buf):
142 buf[read:] = self._rfile.read(len(buf) - read)
143 self._len -= len(buf) - read
144 read = len(buf)
145 else:
146 buf[read : read + n] = self._rfile.read(n)
147 self._len -= n
148 read += n
150 if self._len == 0:
151 # Skip the terminating newline of a chunk that has been fully
152 # consumed. This also applies to the 0-sized final chunk
153 terminator = self._rfile.readline()
154 if terminator not in (b"\n", b"\r\n", b"\r"):
155 raise OSError("Missing chunk terminating newline")
157 return read
160class WSGIRequestHandler(BaseHTTPRequestHandler):
161 """A request handler that implements WSGI dispatching."""
163 server: BaseWSGIServer
165 @property
166 def server_version(self) -> str: # type: ignore
167 return self.server._server_version
169 def make_environ(self) -> WSGIEnvironment:
170 request_url = urlsplit(self.path)
171 url_scheme = "http" if self.server.ssl_context is None else "https"
173 if not self.client_address:
174 self.client_address = ("<local>", 0)
175 elif isinstance(self.client_address, str):
176 self.client_address = (self.client_address, 0)
178 # If there was no scheme but the path started with two slashes,
179 # the first segment may have been incorrectly parsed as the
180 # netloc, prepend it to the path again.
181 if not request_url.scheme and request_url.netloc:
182 path_info = f"/{request_url.netloc}{request_url.path}"
183 else:
184 path_info = request_url.path
186 path_info = unquote(path_info)
188 environ: WSGIEnvironment = {
189 "wsgi.version": (1, 0),
190 "wsgi.url_scheme": url_scheme,
191 "wsgi.input": self.rfile,
192 "wsgi.errors": sys.stderr,
193 "wsgi.multithread": self.server.multithread,
194 "wsgi.multiprocess": self.server.multiprocess,
195 "wsgi.run_once": False,
196 "werkzeug.socket": self.connection,
197 "SERVER_SOFTWARE": self.server_version,
198 "REQUEST_METHOD": self.command,
199 "SCRIPT_NAME": "",
200 "PATH_INFO": _wsgi_encoding_dance(path_info),
201 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
202 # Non-standard, added by mod_wsgi, uWSGI
203 "REQUEST_URI": _wsgi_encoding_dance(self.path),
204 # Non-standard, added by gunicorn
205 "RAW_URI": _wsgi_encoding_dance(self.path),
206 "REMOTE_ADDR": self.address_string(),
207 "REMOTE_PORT": self.port_integer(),
208 "SERVER_NAME": self.server.server_address[0],
209 "SERVER_PORT": str(self.server.server_address[1]),
210 "SERVER_PROTOCOL": self.request_version,
211 }
213 for key, value in self.headers.items():
214 if "_" in key:
215 continue
217 key = key.upper().replace("-", "_")
218 value = value.replace("\r\n", "")
219 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
220 key = f"HTTP_{key}"
221 if key in environ:
222 value = f"{environ[key]},{value}"
223 environ[key] = value
225 if "chunked" in HeaderSet.from_header(environ.get("HTTP_TRANSFER_ENCODING")):
226 environ["wsgi.input_terminated"] = True
227 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
229 # Per RFC 2616, if the URL is absolute, use that as the host.
230 # We're using "has a scheme" to indicate an absolute URL.
231 if request_url.scheme and request_url.netloc:
232 environ["HTTP_HOST"] = request_url.netloc
234 try:
235 # binary_form=False gives nicer information, but wouldn't be compatible with
236 # what Nginx or Apache could return.
237 peer_cert = self.connection.getpeercert(binary_form=True)
238 if peer_cert is not None:
239 # Nginx and Apache use PEM format.
240 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
241 except ValueError:
242 # SSL handshake hasn't finished.
243 self.server.log("error", "Cannot fetch SSL peer certificate info")
244 except AttributeError:
245 # Not using TLS, the socket will not have getpeercert().
246 pass
248 return environ
250 def run_wsgi(self) -> None:
251 self.environ = environ = self.make_environ()
252 status_set: str | None = None
253 headers_set: list[tuple[str, str]] | None = None
254 status_sent: str | None = None
255 headers_sent: list[tuple[str, str]] | None = None
256 chunk_response: bool = False
258 def write(data: bytes) -> None:
259 nonlocal status_sent, headers_sent, chunk_response
260 assert status_set is not None, "write() before start_response"
261 assert headers_set is not None, "write() before start_response"
262 if status_sent is None:
263 status_sent = status_set
264 headers_sent = headers_set
265 try:
266 code_str, msg = status_sent.split(None, 1)
267 except ValueError:
268 code_str, msg = status_sent, ""
269 code = int(code_str)
270 self.send_response(code, msg)
271 header_keys = set()
272 for key, value in headers_sent:
273 self.send_header(key, value)
274 header_keys.add(key.lower())
276 # Use chunked transfer encoding if there is no content
277 # length. Do not use for 1xx and 204 responses. 304
278 # responses and HEAD requests are also excluded, which
279 # is the more conservative behavior and matches other
280 # parts of the code.
281 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
282 if (
283 not (
284 "content-length" in header_keys
285 or environ["REQUEST_METHOD"] == "HEAD"
286 or (100 <= code < 200)
287 or code in {204, 304}
288 )
289 and self.protocol_version >= "HTTP/1.1"
290 ):
291 chunk_response = True
292 self.send_header("Transfer-Encoding", "chunked")
294 # Always close the connection. This disables HTTP/1.1
295 # keep-alive connections. They aren't handled well by
296 # Python's http.server because it doesn't know how to
297 # drain the stream before the next request line.
298 self.send_header("Connection", "close")
299 self.end_headers()
301 assert isinstance(data, bytes), "applications must write bytes"
303 if data:
304 if chunk_response:
305 self.wfile.write(hex(len(data))[2:].encode())
306 self.wfile.write(b"\r\n")
308 self.wfile.write(data)
310 if chunk_response:
311 self.wfile.write(b"\r\n")
313 self.wfile.flush()
315 def start_response(status, headers, exc_info=None): # type: ignore
316 nonlocal status_set, headers_set
317 if exc_info:
318 try:
319 if headers_sent:
320 raise exc_info[1].with_traceback(exc_info[2])
321 finally:
322 exc_info = None
323 elif headers_set:
324 raise AssertionError("Headers already set")
325 status_set = status
326 headers_set = headers
327 return write
329 def execute(app: WSGIApplication) -> None:
330 application_iter = app(environ, start_response)
331 try:
332 for data in application_iter:
333 write(data)
334 if not headers_sent:
335 write(b"")
336 if chunk_response:
337 self.wfile.write(b"0\r\n\r\n")
338 finally:
339 # Check for any remaining data in the read socket, and discard it. This
340 # will read past request.max_content_length, but lets the client see a
341 # 413 response instead of a connection reset failure. If we supported
342 # keep-alive connections, this naive approach would break by reading the
343 # next request line. Since we know that write (above) closes every
344 # connection we can read everything.
345 selector = selectors.DefaultSelector()
346 selector.register(self.connection, selectors.EVENT_READ)
347 total_size = 0
348 total_reads = 0
350 # A timeout of 0 tends to fail because a client needs a small amount of
351 # time to continue sending its data.
352 while selector.select(timeout=0.01):
353 # Only read 10MB into memory at a time.
354 data = self.rfile.read(10_000_000)
355 total_size += len(data)
356 total_reads += 1
358 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends
359 # more than that, they'll get a connection reset failure.
360 if not data or total_size >= 10_000_000_000 or total_reads > 1000:
361 break
363 selector.close()
365 if hasattr(application_iter, "close"):
366 application_iter.close()
368 try:
369 execute(self.server.app)
370 except connection_dropped_errors as e:
371 self.connection_dropped(e, environ)
372 except Exception as e:
373 if self.server.passthrough_errors:
374 raise
376 if status_sent is not None and chunk_response:
377 self.close_connection = True
379 try:
380 # if we haven't yet sent the headers but they are set
381 # we roll back to be able to set them again.
382 if status_sent is None:
383 status_set = None
384 headers_set = None
385 execute(InternalServerError())
386 except Exception:
387 pass
389 from .debug.tbtools import DebugTraceback
391 msg = DebugTraceback(e).render_traceback_text()
392 self.server.log("error", f"Error on request:\n{msg}")
394 def handle(self) -> None:
395 """Handles a request ignoring dropped connections."""
396 try:
397 super().handle()
398 except (ConnectionError, TimeoutError) as e:
399 self.connection_dropped(e)
400 except Exception as e:
401 if self.server.ssl_context is not None and is_ssl_error(e):
402 self.log_error("SSL error occurred: %s", e)
403 else:
404 raise
406 def connection_dropped(
407 self, error: BaseException, environ: WSGIEnvironment | None = None
408 ) -> None:
409 """Called if the connection was closed by the client. By default
410 nothing happens.
411 """
413 def __getattr__(self, name: str) -> t.Any:
414 # All HTTP methods are handled by run_wsgi.
415 if name.startswith("do_"):
416 return self.run_wsgi
418 # All other attributes are forwarded to the base class.
419 return getattr(super(), name)
421 def address_string(self) -> str:
422 if getattr(self, "environ", None):
423 return self.environ["REMOTE_ADDR"] # type: ignore
425 if not self.client_address:
426 return "<local>"
428 return self.client_address[0]
430 def port_integer(self) -> int:
431 return self.client_address[1]
433 # Escape control characters. This is defined (but private) in Python 3.12.
434 _control_char_table = str.maketrans(
435 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]}
436 )
437 _control_char_table[ord("\\")] = r"\\"
439 def log_request(self, code: int | str = "-", size: int | str = "-") -> None:
440 try:
441 path = uri_to_iri(self.path)
442 msg = f"{self.command} {path} {self.request_version}"
443 except AttributeError:
444 # path isn't set if the requestline was bad
445 msg = self.requestline
447 # Escape control characters that may be in the decoded path.
448 msg = msg.translate(self._control_char_table)
449 code = str(code)
451 if code[0] == "1": # 1xx - Informational
452 msg = _ansi_style(msg, "bold")
453 elif code == "200": # 2xx - Success
454 pass
455 elif code == "304": # 304 - Resource Not Modified
456 msg = _ansi_style(msg, "cyan")
457 elif code[0] == "3": # 3xx - Redirection
458 msg = _ansi_style(msg, "green")
459 elif code == "404": # 404 - Resource Not Found
460 msg = _ansi_style(msg, "yellow")
461 elif code[0] == "4": # 4xx - Client Error
462 msg = _ansi_style(msg, "bold", "red")
463 else: # 5xx, or any other response
464 msg = _ansi_style(msg, "bold", "magenta")
466 self.log("info", '"%s" %s %s', msg, code, size)
468 def log_error(self, format: str, *args: t.Any) -> None:
469 self.log("error", format, *args)
471 def log_message(self, format: str, *args: t.Any) -> None:
472 self.log("info", format, *args)
474 def log(self, type: str, message: str, *args: t.Any) -> None:
475 # an IPv6 scoped address contains "%" which breaks logging
476 address_string = self.address_string().replace("%", "%%")
477 _log(
478 type,
479 f"{address_string} - - [{self.log_date_time_string()}] {message}\n",
480 *args,
481 )
484def _ansi_style(value: str, *styles: str) -> str:
485 if not _log_add_style:
486 return value
488 codes = {
489 "bold": 1,
490 "red": 31,
491 "green": 32,
492 "yellow": 33,
493 "magenta": 35,
494 "cyan": 36,
495 }
497 for style in styles:
498 value = f"\x1b[{codes[style]}m{value}"
500 return f"{value}\x1b[0m"
503def generate_adhoc_ssl_pair(
504 cn: str | None = None,
505) -> tuple[Certificate, RSAPrivateKeyWithSerialization]:
506 try:
507 from cryptography import x509
508 from cryptography.hazmat.backends import default_backend
509 from cryptography.hazmat.primitives import hashes
510 from cryptography.hazmat.primitives.asymmetric import rsa
511 from cryptography.x509.oid import NameOID
512 except ImportError:
513 raise TypeError(
514 "Using ad-hoc certificates requires the cryptography library."
515 ) from None
517 backend = default_backend()
518 pkey = rsa.generate_private_key(
519 public_exponent=65537, key_size=2048, backend=backend
520 )
522 # pretty damn sure that this is not actually accepted by anyone
523 if cn is None:
524 cn = "*"
526 subject = x509.Name(
527 [
528 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
529 x509.NameAttribute(NameOID.COMMON_NAME, cn),
530 ]
531 )
533 backend = default_backend()
534 cert = (
535 x509.CertificateBuilder()
536 .subject_name(subject)
537 .issuer_name(subject)
538 .public_key(pkey.public_key())
539 .serial_number(x509.random_serial_number())
540 .not_valid_before(dt.now(timezone.utc))
541 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
542 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
543 .add_extension(
544 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]),
545 critical=False,
546 )
547 .sign(pkey, hashes.SHA256(), backend)
548 )
549 return cert, pkey
552def make_ssl_devcert(
553 base_path: str, host: str | None = None, cn: str | None = None
554) -> tuple[str, str]:
555 """Creates an SSL key for development. This should be used instead of
556 the ``'adhoc'`` key which generates a new cert on each server start.
557 It accepts a path for where it should store the key and cert and
558 either a host or CN. If a host is given it will use the CN
559 ``*.host/CN=host``.
561 For more information see :func:`run_simple`.
563 .. versionadded:: 0.9
565 :param base_path: the path to the certificate and key. The extension
566 ``.crt`` is added for the certificate, ``.key`` is
567 added for the key.
568 :param host: the name of the host. This can be used as an alternative
569 for the `cn`.
570 :param cn: the `CN` to use.
571 """
573 if host is not None:
574 cn = host
575 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
577 from cryptography.hazmat.primitives import serialization
579 cert_file = f"{base_path}.crt"
580 pkey_file = f"{base_path}.key"
582 with open(cert_file, "wb") as f:
583 f.write(cert.public_bytes(serialization.Encoding.PEM))
584 with open(pkey_file, "wb") as f:
585 f.write(
586 pkey.private_bytes(
587 encoding=serialization.Encoding.PEM,
588 format=serialization.PrivateFormat.TraditionalOpenSSL,
589 encryption_algorithm=serialization.NoEncryption(),
590 )
591 )
593 return cert_file, pkey_file
596def generate_adhoc_ssl_context() -> ssl.SSLContext:
597 """Generates an adhoc SSL context for the development server."""
598 import atexit
599 import tempfile
601 cert, pkey = generate_adhoc_ssl_pair()
603 from cryptography.hazmat.primitives import serialization
605 with tempfile.NamedTemporaryFile("wb", delete=False) as cert_file:
606 cert_file.write(cert.public_bytes(serialization.Encoding.PEM))
608 with tempfile.NamedTemporaryFile("wb", delete=False) as pkey_file:
609 pkey_file.write(
610 pkey.private_bytes(
611 encoding=serialization.Encoding.PEM,
612 format=serialization.PrivateFormat.TraditionalOpenSSL,
613 encryption_algorithm=serialization.NoEncryption(),
614 )
615 )
617 atexit.register(os.remove, cert_file.name)
618 atexit.register(os.remove, pkey_file.name)
619 ctx = load_ssl_context(cert_file.name, pkey_file.name)
620 return ctx
623def load_ssl_context(
624 cert_file: str, pkey_file: str | None = None, protocol: int | None = None
625) -> ssl.SSLContext:
626 """Loads SSL context from cert/private key files and optional protocol.
627 Many parameters are directly taken from the API of
628 :py:class:`ssl.SSLContext`.
630 :param cert_file: Path of the certificate to use.
631 :param pkey_file: Path of the private key to use. If not given, the key
632 will be obtained from the certificate file.
633 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
634 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
635 """
636 if protocol is None:
637 protocol = ssl.PROTOCOL_TLS_SERVER
639 ctx = ssl.SSLContext(protocol)
640 ctx.load_cert_chain(cert_file, pkey_file)
641 return ctx
644def is_ssl_error(error: Exception | None = None) -> bool:
645 """Checks if the given error (or the current one) is an SSL error."""
646 if error is None:
647 error = t.cast(Exception, sys.exc_info()[1])
648 return isinstance(error, ssl.SSLError)
651def select_address_family(host: str, port: int) -> socket.AddressFamily:
652 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
653 the host and port."""
654 if host.startswith("unix://"):
655 return socket.AF_UNIX
656 elif ":" in host and hasattr(socket, "AF_INET6"):
657 return socket.AF_INET6
658 return socket.AF_INET
661def get_sockaddr(
662 host: str, port: int, family: socket.AddressFamily
663) -> tuple[str, int] | str:
664 """Return a fully qualified socket address that can be passed to
665 :func:`socket.bind`."""
666 if family == af_unix:
667 # Absolute path avoids IDNA encoding error when path starts with dot.
668 return os.path.abspath(host.partition("://")[2])
669 try:
670 res = socket.getaddrinfo(
671 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
672 )
673 except socket.gaierror:
674 return host, port
675 return res[0][4] # type: ignore
678def get_interface_ip(family: socket.AddressFamily) -> str:
679 """Get the IP address of an external interface. Used when binding to
680 0.0.0.0 or ::1 to show a more useful URL.
682 :meta private:
683 """
684 # arbitrary private address
685 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
687 with socket.socket(family, socket.SOCK_DGRAM) as s:
688 try:
689 s.connect((host, 58162))
690 except OSError:
691 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
693 return s.getsockname()[0] # type: ignore
696class BaseWSGIServer(HTTPServer):
697 """A WSGI server that that handles one request at a time.
699 Use :func:`make_server` to create a server instance.
700 """
702 multithread = False
703 multiprocess = False
704 request_queue_size = LISTEN_QUEUE
705 allow_reuse_address = True
707 def __init__(
708 self,
709 host: str,
710 port: int,
711 app: WSGIApplication,
712 handler: type[WSGIRequestHandler] | None = None,
713 passthrough_errors: bool = False,
714 ssl_context: _TSSLContextArg = None,
715 fd: int | None = None,
716 ) -> None:
717 if handler is None:
718 handler = WSGIRequestHandler
720 # If the handler doesn't directly set a protocol version and
721 # thread or process workers are used, then allow chunked
722 # responses and keep-alive connections by enabling HTTP/1.1.
723 if "protocol_version" not in vars(handler) and (
724 self.multithread or self.multiprocess
725 ):
726 handler.protocol_version = "HTTP/1.1"
728 self.host = host
729 self.port = port
730 self.app = app
731 self.passthrough_errors = passthrough_errors
733 self.address_family = address_family = select_address_family(host, port)
734 server_address = get_sockaddr(host, int(port), address_family)
736 # Remove a leftover Unix socket file from a previous run. Don't
737 # remove a file that was set up by run_simple.
738 if address_family == af_unix and fd is None:
739 server_address = t.cast(str, server_address)
741 if os.path.exists(server_address):
742 os.unlink(server_address)
744 # Bind and activate will be handled manually, it should only
745 # happen if we're not using a socket that was already set up.
746 super().__init__(
747 server_address, # type: ignore[arg-type]
748 handler,
749 bind_and_activate=False,
750 )
752 if fd is None:
753 # No existing socket descriptor, do bind_and_activate=True.
754 try:
755 self.server_bind()
756 self.server_activate()
757 except OSError as e:
758 # Catch connection issues and show them without the traceback. Show
759 # extra instructions for address not found, and for macOS.
760 self.server_close()
761 print(e.strerror, file=sys.stderr)
763 if e.errno == errno.EADDRINUSE:
764 print(
765 f"Port {port} is in use by another program. Either identify and"
766 " stop that program, or start the server with a different"
767 " port.",
768 file=sys.stderr,
769 )
771 if sys.platform == "darwin" and port == 5000:
772 print(
773 "On macOS, try searching for and disabling"
774 " 'AirPlay Receiver' in System Settings.",
775 file=sys.stderr,
776 )
778 sys.exit(1)
779 except BaseException:
780 self.server_close()
781 raise
782 else:
783 # TCPServer automatically opens a socket even if bind_and_activate is False.
784 # Close it to silence a ResourceWarning.
785 self.server_close()
787 # Use the passed in socket directly.
788 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
789 self.server_address = self.socket.getsockname()
791 if address_family != af_unix:
792 # If port was 0, this will record the bound port.
793 self.port = self.server_address[1]
795 if ssl_context is not None:
796 if isinstance(ssl_context, tuple):
797 ssl_context = load_ssl_context(*ssl_context)
798 elif ssl_context == "adhoc":
799 ssl_context = generate_adhoc_ssl_context()
801 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
802 self.ssl_context: ssl.SSLContext | None = ssl_context
803 else:
804 self.ssl_context = None
806 import importlib.metadata
808 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}"
810 def log(self, type: str, message: str, *args: t.Any) -> None:
811 _log(type, message, *args)
813 def serve_forever(self, poll_interval: float = 0.5) -> None:
814 try:
815 super().serve_forever(poll_interval=poll_interval)
816 except KeyboardInterrupt:
817 pass
818 finally:
819 self.server_close()
821 def handle_error(
822 self, request: t.Any, client_address: tuple[str, int] | str
823 ) -> None:
824 if self.passthrough_errors:
825 raise
827 return super().handle_error(request, client_address)
829 def log_startup(self) -> None:
830 """Show information about the address when starting the server."""
831 dev_warning = (
832 "WARNING: This is a development server. Do not use it in a production"
833 " deployment. Use a production WSGI server instead."
834 )
835 dev_warning = _ansi_style(dev_warning, "bold", "red")
836 messages = [dev_warning]
838 if self.address_family == af_unix:
839 messages.append(f" * Running on {self.host}")
840 else:
841 scheme = "http" if self.ssl_context is None else "https"
842 display_hostname = self.host
844 if self.host in {"0.0.0.0", "::"}:
845 messages.append(f" * Running on all addresses ({self.host})")
847 if self.host == "0.0.0.0":
848 localhost = "127.0.0.1"
849 display_hostname = get_interface_ip(socket.AF_INET)
850 else:
851 localhost = "[::1]"
852 display_hostname = get_interface_ip(socket.AF_INET6)
854 messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
856 if ":" in display_hostname:
857 display_hostname = f"[{display_hostname}]"
859 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
861 _log("info", "\n".join(messages))
864class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
865 """A WSGI server that handles concurrent requests in separate
866 threads.
868 Use :func:`make_server` to create a server instance.
869 """
871 multithread = True
872 daemon_threads = True
875class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
876 """A WSGI server that handles concurrent requests in separate forked
877 processes.
879 Use :func:`make_server` to create a server instance.
880 """
882 multiprocess = True
884 def __init__(
885 self,
886 host: str,
887 port: int,
888 app: WSGIApplication,
889 processes: int = 40,
890 handler: type[WSGIRequestHandler] | None = None,
891 passthrough_errors: bool = False,
892 ssl_context: _TSSLContextArg = None,
893 fd: int | None = None,
894 ) -> None:
895 if not can_fork:
896 raise ValueError("Your platform does not support forking.")
898 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
899 self.max_children = processes
902def make_server(
903 host: str,
904 port: int,
905 app: WSGIApplication,
906 threaded: bool = False,
907 processes: int = 1,
908 request_handler: type[WSGIRequestHandler] | None = None,
909 passthrough_errors: bool = False,
910 ssl_context: _TSSLContextArg = None,
911 fd: int | None = None,
912) -> BaseWSGIServer:
913 """Create an appropriate WSGI server instance based on the value of
914 ``threaded`` and ``processes``.
916 This is called from :func:`run_simple`, but can be used separately
917 to have access to the server object, such as to run it in a separate
918 thread.
920 See :func:`run_simple` for parameter docs.
921 """
922 if threaded and processes > 1:
923 raise ValueError("Cannot have a multi-thread and multi-process server.")
925 if threaded:
926 return ThreadedWSGIServer(
927 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
928 )
930 if processes > 1:
931 return ForkingWSGIServer(
932 host,
933 port,
934 app,
935 processes,
936 request_handler,
937 passthrough_errors,
938 ssl_context,
939 fd=fd,
940 )
942 return BaseWSGIServer(
943 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
944 )
947def is_running_from_reloader() -> bool:
948 """Check if the server is running as a subprocess within the
949 Werkzeug reloader.
951 .. versionadded:: 0.10
952 """
953 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
956def run_simple(
957 hostname: str,
958 port: int,
959 application: WSGIApplication,
960 use_reloader: bool = False,
961 use_debugger: bool = False,
962 use_evalex: bool = True,
963 extra_files: t.Iterable[str] | None = None,
964 exclude_patterns: t.Iterable[str] | None = None,
965 reloader_interval: int = 1,
966 reloader_type: str = "auto",
967 threaded: bool = False,
968 processes: int = 1,
969 request_handler: type[WSGIRequestHandler] | None = None,
970 static_files: dict[str, str | tuple[str, str]] | None = None,
971 passthrough_errors: bool = False,
972 ssl_context: _TSSLContextArg = None,
973) -> None:
974 """Start a development server for a WSGI application. Various
975 optional features can be enabled.
977 .. warning::
979 Do not use the development server when deploying to production.
980 It is intended for use only during local development. It is not
981 designed to be particularly efficient, stable, or secure.
983 :param hostname: The host to bind to, for example ``'localhost'``.
984 Can be a domain, IPv4 or IPv6 address, or file path starting
985 with ``unix://`` for a Unix socket.
986 :param port: The port to bind to, for example ``8080``. Using ``0``
987 tells the OS to pick a random free port.
988 :param application: The WSGI application to run.
989 :param use_reloader: Use a reloader process to restart the server
990 process when files are changed.
991 :param use_debugger: Use Werkzeug's debugger, which will show
992 formatted tracebacks on unhandled exceptions.
993 :param use_evalex: Make the debugger interactive. A Python terminal
994 can be opened for any frame in the traceback. Some protection is
995 provided by requiring a PIN, but this should never be enabled
996 on a publicly visible server.
997 :param extra_files: The reloader will watch these files for changes
998 in addition to Python modules. For example, watch a
999 configuration file.
1000 :param exclude_patterns: The reloader will ignore changes to any
1001 files matching these :mod:`fnmatch` patterns. For example,
1002 ignore cache files.
1003 :param reloader_interval: How often the reloader tries to check for
1004 changes.
1005 :param reloader_type: The reloader to use. The ``'stat'`` reloader
1006 is built in, but may require significant CPU to watch files. The
1007 ``'watchdog'`` reloader is much more efficient but requires
1008 installing the ``watchdog`` package first.
1009 :param threaded: Handle concurrent requests using threads. Cannot be
1010 used with ``processes``.
1011 :param processes: Handle concurrent requests using up to this number
1012 of processes. Cannot be used with ``threaded``.
1013 :param request_handler: Use a different
1014 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
1015 handle requests.
1016 :param static_files: A dict mapping URL prefixes to directories to
1017 serve static files from using
1018 :class:`~werkzeug.middleware.SharedDataMiddleware`.
1019 :param passthrough_errors: Don't catch unhandled exceptions at the
1020 server level, let the server crash instead. If ``use_debugger``
1021 is enabled, the debugger will still catch such errors.
1022 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
1023 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
1024 tuple to create a typical context, or the string ``'adhoc'`` to
1025 generate a temporary self-signed certificate.
1027 .. versionchanged:: 2.1
1028 Instructions are shown for dealing with an "address already in
1029 use" error.
1031 .. versionchanged:: 2.1
1032 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
1033 addition to a real IP.
1035 .. versionchanged:: 2.1
1036 The command-line interface was removed.
1038 .. versionchanged:: 2.0
1039 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
1040 was bound as well as a warning not to run the development server
1041 in production.
1043 .. versionchanged:: 2.0
1044 The ``exclude_patterns`` parameter was added.
1046 .. versionchanged:: 0.15
1047 Bind to a Unix socket by passing a ``hostname`` that starts with
1048 ``unix://``.
1050 .. versionchanged:: 0.10
1051 Improved the reloader and added support for changing the backend
1052 through the ``reloader_type`` parameter.
1054 .. versionchanged:: 0.9
1055 A command-line interface was added.
1057 .. versionchanged:: 0.8
1058 ``ssl_context`` can be a tuple of paths to the certificate and
1059 private key files.
1061 .. versionchanged:: 0.6
1062 The ``ssl_context`` parameter was added.
1064 .. versionchanged:: 0.5
1065 The ``static_files`` and ``passthrough_errors`` parameters were
1066 added.
1067 """
1068 if not isinstance(port, int):
1069 raise TypeError("port must be an integer")
1071 if static_files:
1072 from .middleware.shared_data import SharedDataMiddleware
1074 application = SharedDataMiddleware(application, static_files)
1076 if use_debugger:
1077 from .debug import DebuggedApplication
1079 application = DebuggedApplication(application, evalex=use_evalex)
1081 # Allow the specified hostname to use the debugger, in addition to
1082 # localhost domains.
1083 if hostname not in {"0.0.0.0", "[::]"}:
1084 application.trusted_hosts.append(hostname)
1086 if not is_running_from_reloader():
1087 fd = None
1088 else:
1089 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1091 srv = make_server(
1092 hostname,
1093 port,
1094 application,
1095 threaded,
1096 processes,
1097 request_handler,
1098 passthrough_errors,
1099 ssl_context,
1100 fd=fd,
1101 )
1102 srv.socket.set_inheritable(True)
1103 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno())
1105 if not is_running_from_reloader():
1106 srv.log_startup()
1107 _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
1109 if use_reloader:
1110 from ._reloader import run_with_reloader
1112 try:
1113 run_with_reloader(
1114 srv.serve_forever,
1115 extra_files=extra_files,
1116 exclude_patterns=exclude_patterns,
1117 interval=reloader_interval,
1118 reloader_type=reloader_type,
1119 )
1120 finally:
1121 srv.server_close()
1122 else:
1123 srv.serve_forever()