Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%
472 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
9.. code-block:: python
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
14import errno
15import io
16import os
17import socket
18import socketserver
19import sys
20import typing as t
21from datetime import datetime as dt
22from datetime import timedelta
23from datetime import timezone
24from http.server import BaseHTTPRequestHandler
25from http.server import HTTPServer
27from ._internal import _log
28from ._internal import _wsgi_encoding_dance
29from .exceptions import InternalServerError
30from .urls import uri_to_iri
31from .urls import url_parse
32from .urls import url_unquote
34try:
35 import ssl
36except ImportError:
38 class _SslDummy:
39 def __getattr__(self, name: str) -> t.Any:
40 raise RuntimeError( # noqa: B904
41 "SSL is unavailable because this Python runtime was not"
42 " compiled with SSL/TLS support."
43 )
45 ssl = _SslDummy() # type: ignore
47_log_add_style = True
49if os.name == "nt":
50 try:
51 __import__("colorama")
52 except ImportError:
53 _log_add_style = False
55can_fork = hasattr(os, "fork")
57if can_fork:
58 ForkingMixIn = socketserver.ForkingMixIn
59else:
61 class ForkingMixIn: # type: ignore
62 pass
65try:
66 af_unix = socket.AF_UNIX
67except AttributeError:
68 af_unix = None # type: ignore
70LISTEN_QUEUE = 128
72_TSSLContextArg = t.Optional[
73 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], "te.Literal['adhoc']"]
74]
76if t.TYPE_CHECKING:
77 import typing_extensions as te # noqa: F401
78 from _typeshed.wsgi import WSGIApplication
79 from _typeshed.wsgi import WSGIEnvironment
80 from cryptography.hazmat.primitives.asymmetric.rsa import (
81 RSAPrivateKeyWithSerialization,
82 )
83 from cryptography.x509 import Certificate
86class DechunkedInput(io.RawIOBase):
87 """An input stream that handles Transfer-Encoding 'chunked'"""
89 def __init__(self, rfile: t.IO[bytes]) -> None:
90 self._rfile = rfile
91 self._done = False
92 self._len = 0
94 def readable(self) -> bool:
95 return True
97 def read_chunk_len(self) -> int:
98 try:
99 line = self._rfile.readline().decode("latin1")
100 _len = int(line.strip(), 16)
101 except ValueError as e:
102 raise OSError("Invalid chunk header") from e
103 if _len < 0:
104 raise OSError("Negative chunk length not allowed")
105 return _len
107 def readinto(self, buf: bytearray) -> int: # type: ignore
108 read = 0
109 while not self._done and read < len(buf):
110 if self._len == 0:
111 # This is the first chunk or we fully consumed the previous
112 # one. Read the next length of the next chunk
113 self._len = self.read_chunk_len()
115 if self._len == 0:
116 # Found the final chunk of size 0. The stream is now exhausted,
117 # but there is still a final newline that should be consumed
118 self._done = True
120 if self._len > 0:
121 # There is data (left) in this chunk, so append it to the
122 # buffer. If this operation fully consumes the chunk, this will
123 # reset self._len to 0.
124 n = min(len(buf), self._len)
126 # If (read + chunk size) becomes more than len(buf), buf will
127 # grow beyond the original size and read more data than
128 # required. So only read as much data as can fit in buf.
129 if read + n > len(buf):
130 buf[read:] = self._rfile.read(len(buf) - read)
131 self._len -= len(buf) - read
132 read = len(buf)
133 else:
134 buf[read : read + n] = self._rfile.read(n)
135 self._len -= n
136 read += n
138 if self._len == 0:
139 # Skip the terminating newline of a chunk that has been fully
140 # consumed. This also applies to the 0-sized final chunk
141 terminator = self._rfile.readline()
142 if terminator not in (b"\n", b"\r\n", b"\r"):
143 raise OSError("Missing chunk terminating newline")
145 return read
148class WSGIRequestHandler(BaseHTTPRequestHandler):
149 """A request handler that implements WSGI dispatching."""
151 server: "BaseWSGIServer"
153 @property
154 def server_version(self) -> str: # type: ignore
155 from . import __version__
157 return f"Werkzeug/{__version__}"
159 def make_environ(self) -> "WSGIEnvironment":
160 request_url = url_parse(self.path)
161 url_scheme = "http" if self.server.ssl_context is None else "https"
163 if not self.client_address:
164 self.client_address = ("<local>", 0)
165 elif isinstance(self.client_address, str):
166 self.client_address = (self.client_address, 0)
168 # If there was no scheme but the path started with two slashes,
169 # the first segment may have been incorrectly parsed as the
170 # netloc, prepend it to the path again.
171 if not request_url.scheme and request_url.netloc:
172 path_info = f"/{request_url.netloc}{request_url.path}"
173 else:
174 path_info = request_url.path
176 path_info = url_unquote(path_info)
178 environ: "WSGIEnvironment" = {
179 "wsgi.version": (1, 0),
180 "wsgi.url_scheme": url_scheme,
181 "wsgi.input": self.rfile,
182 "wsgi.errors": sys.stderr,
183 "wsgi.multithread": self.server.multithread,
184 "wsgi.multiprocess": self.server.multiprocess,
185 "wsgi.run_once": False,
186 "werkzeug.socket": self.connection,
187 "SERVER_SOFTWARE": self.server_version,
188 "REQUEST_METHOD": self.command,
189 "SCRIPT_NAME": "",
190 "PATH_INFO": _wsgi_encoding_dance(path_info),
191 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
192 # Non-standard, added by mod_wsgi, uWSGI
193 "REQUEST_URI": _wsgi_encoding_dance(self.path),
194 # Non-standard, added by gunicorn
195 "RAW_URI": _wsgi_encoding_dance(self.path),
196 "REMOTE_ADDR": self.address_string(),
197 "REMOTE_PORT": self.port_integer(),
198 "SERVER_NAME": self.server.server_address[0],
199 "SERVER_PORT": str(self.server.server_address[1]),
200 "SERVER_PROTOCOL": self.request_version,
201 }
203 for key, value in self.headers.items():
204 key = key.upper().replace("-", "_")
205 value = value.replace("\r\n", "")
206 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
207 key = f"HTTP_{key}"
208 if key in environ:
209 value = f"{environ[key]},{value}"
210 environ[key] = value
212 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
213 environ["wsgi.input_terminated"] = True
214 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
216 # Per RFC 2616, if the URL is absolute, use that as the host.
217 # We're using "has a scheme" to indicate an absolute URL.
218 if request_url.scheme and request_url.netloc:
219 environ["HTTP_HOST"] = request_url.netloc
221 try:
222 # binary_form=False gives nicer information, but wouldn't be compatible with
223 # what Nginx or Apache could return.
224 peer_cert = self.connection.getpeercert( # type: ignore[attr-defined]
225 binary_form=True
226 )
227 if peer_cert is not None:
228 # Nginx and Apache use PEM format.
229 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
230 except ValueError:
231 # SSL handshake hasn't finished.
232 self.server.log("error", "Cannot fetch SSL peer certificate info")
233 except AttributeError:
234 # Not using TLS, the socket will not have getpeercert().
235 pass
237 return environ
239 def run_wsgi(self) -> None:
240 if self.headers.get("Expect", "").lower().strip() == "100-continue":
241 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
243 self.environ = environ = self.make_environ()
244 status_set: t.Optional[str] = None
245 headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None
246 status_sent: t.Optional[str] = None
247 headers_sent: t.Optional[t.List[t.Tuple[str, str]]] = None
248 chunk_response: bool = False
250 def write(data: bytes) -> None:
251 nonlocal status_sent, headers_sent, chunk_response
252 assert status_set is not None, "write() before start_response"
253 assert headers_set is not None, "write() before start_response"
254 if status_sent is None:
255 status_sent = status_set
256 headers_sent = headers_set
257 try:
258 code_str, msg = status_sent.split(None, 1)
259 except ValueError:
260 code_str, msg = status_sent, ""
261 code = int(code_str)
262 self.send_response(code, msg)
263 header_keys = set()
264 for key, value in headers_sent:
265 self.send_header(key, value)
266 header_keys.add(key.lower())
268 # Use chunked transfer encoding if there is no content
269 # length. Do not use for 1xx and 204 responses. 304
270 # responses and HEAD requests are also excluded, which
271 # is the more conservative behavior and matches other
272 # parts of the code.
273 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
274 if (
275 not (
276 "content-length" in header_keys
277 or environ["REQUEST_METHOD"] == "HEAD"
278 or (100 <= code < 200)
279 or code in {204, 304}
280 )
281 and self.protocol_version >= "HTTP/1.1"
282 ):
283 chunk_response = True
284 self.send_header("Transfer-Encoding", "chunked")
286 # Always close the connection. This disables HTTP/1.1
287 # keep-alive connections. They aren't handled well by
288 # Python's http.server because it doesn't know how to
289 # drain the stream before the next request line.
290 self.send_header("Connection", "close")
291 self.end_headers()
293 assert isinstance(data, bytes), "applications must write bytes"
295 if data:
296 if chunk_response:
297 self.wfile.write(hex(len(data))[2:].encode())
298 self.wfile.write(b"\r\n")
300 self.wfile.write(data)
302 if chunk_response:
303 self.wfile.write(b"\r\n")
305 self.wfile.flush()
307 def start_response(status, headers, exc_info=None): # type: ignore
308 nonlocal status_set, headers_set
309 if exc_info:
310 try:
311 if headers_sent:
312 raise exc_info[1].with_traceback(exc_info[2])
313 finally:
314 exc_info = None
315 elif headers_set:
316 raise AssertionError("Headers already set")
317 status_set = status
318 headers_set = headers
319 return write
321 def execute(app: "WSGIApplication") -> None:
322 application_iter = app(environ, start_response)
323 try:
324 for data in application_iter:
325 write(data)
326 if not headers_sent:
327 write(b"")
328 if chunk_response:
329 self.wfile.write(b"0\r\n\r\n")
330 finally:
331 if hasattr(application_iter, "close"):
332 application_iter.close() # type: ignore
334 try:
335 execute(self.server.app)
336 except (ConnectionError, socket.timeout) as e:
337 self.connection_dropped(e, environ)
338 except Exception as e:
339 if self.server.passthrough_errors:
340 raise
342 if status_sent is not None and chunk_response:
343 self.close_connection = True
345 try:
346 # if we haven't yet sent the headers but they are set
347 # we roll back to be able to set them again.
348 if status_sent is None:
349 status_set = None
350 headers_set = None
351 execute(InternalServerError())
352 except Exception:
353 pass
355 from .debug.tbtools import DebugTraceback
357 msg = DebugTraceback(e).render_traceback_text()
358 self.server.log("error", f"Error on request:\n{msg}")
360 def handle(self) -> None:
361 """Handles a request ignoring dropped connections."""
362 try:
363 super().handle()
364 except (ConnectionError, socket.timeout) as e:
365 self.connection_dropped(e)
366 except Exception as e:
367 if self.server.ssl_context is not None and is_ssl_error(e):
368 self.log_error("SSL error occurred: %s", e)
369 else:
370 raise
372 def connection_dropped(
373 self, error: BaseException, environ: t.Optional["WSGIEnvironment"] = None
374 ) -> None:
375 """Called if the connection was closed by the client. By default
376 nothing happens.
377 """
379 def __getattr__(self, name: str) -> t.Any:
380 # All HTTP methods are handled by run_wsgi.
381 if name.startswith("do_"):
382 return self.run_wsgi
384 # All other attributes are forwarded to the base class.
385 return getattr(super(), name)
387 def address_string(self) -> str:
388 if getattr(self, "environ", None):
389 return self.environ["REMOTE_ADDR"] # type: ignore
391 if not self.client_address:
392 return "<local>"
394 return self.client_address[0]
396 def port_integer(self) -> int:
397 return self.client_address[1]
399 def log_request(
400 self, code: t.Union[int, str] = "-", size: t.Union[int, str] = "-"
401 ) -> None:
402 try:
403 path = uri_to_iri(self.path)
404 msg = f"{self.command} {path} {self.request_version}"
405 except AttributeError:
406 # path isn't set if the requestline was bad
407 msg = self.requestline
409 code = str(code)
411 if code[0] == "1": # 1xx - Informational
412 msg = _ansi_style(msg, "bold")
413 elif code == "200": # 2xx - Success
414 pass
415 elif code == "304": # 304 - Resource Not Modified
416 msg = _ansi_style(msg, "cyan")
417 elif code[0] == "3": # 3xx - Redirection
418 msg = _ansi_style(msg, "green")
419 elif code == "404": # 404 - Resource Not Found
420 msg = _ansi_style(msg, "yellow")
421 elif code[0] == "4": # 4xx - Client Error
422 msg = _ansi_style(msg, "bold", "red")
423 else: # 5xx, or any other response
424 msg = _ansi_style(msg, "bold", "magenta")
426 self.log("info", '"%s" %s %s', msg, code, size)
428 def log_error(self, format: str, *args: t.Any) -> None:
429 self.log("error", format, *args)
431 def log_message(self, format: str, *args: t.Any) -> None:
432 self.log("info", format, *args)
434 def log(self, type: str, message: str, *args: t.Any) -> None:
435 _log(
436 type,
437 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n",
438 *args,
439 )
442def _ansi_style(value: str, *styles: str) -> str:
443 if not _log_add_style:
444 return value
446 codes = {
447 "bold": 1,
448 "red": 31,
449 "green": 32,
450 "yellow": 33,
451 "magenta": 35,
452 "cyan": 36,
453 }
455 for style in styles:
456 value = f"\x1b[{codes[style]}m{value}"
458 return f"{value}\x1b[0m"
461def generate_adhoc_ssl_pair(
462 cn: t.Optional[str] = None,
463) -> t.Tuple["Certificate", "RSAPrivateKeyWithSerialization"]:
464 try:
465 from cryptography import x509
466 from cryptography.x509.oid import NameOID
467 from cryptography.hazmat.backends import default_backend
468 from cryptography.hazmat.primitives import hashes
469 from cryptography.hazmat.primitives.asymmetric import rsa
470 except ImportError:
471 raise TypeError(
472 "Using ad-hoc certificates requires the cryptography library."
473 ) from None
475 backend = default_backend()
476 pkey = rsa.generate_private_key(
477 public_exponent=65537, key_size=2048, backend=backend
478 )
480 # pretty damn sure that this is not actually accepted by anyone
481 if cn is None:
482 cn = "*"
484 subject = x509.Name(
485 [
486 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
487 x509.NameAttribute(NameOID.COMMON_NAME, cn),
488 ]
489 )
491 backend = default_backend()
492 cert = (
493 x509.CertificateBuilder()
494 .subject_name(subject)
495 .issuer_name(subject)
496 .public_key(pkey.public_key())
497 .serial_number(x509.random_serial_number())
498 .not_valid_before(dt.now(timezone.utc))
499 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
500 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
501 .add_extension(x509.SubjectAlternativeName([x509.DNSName(cn)]), critical=False)
502 .sign(pkey, hashes.SHA256(), backend)
503 )
504 return cert, pkey
507def make_ssl_devcert(
508 base_path: str, host: t.Optional[str] = None, cn: t.Optional[str] = None
509) -> t.Tuple[str, str]:
510 """Creates an SSL key for development. This should be used instead of
511 the ``'adhoc'`` key which generates a new cert on each server start.
512 It accepts a path for where it should store the key and cert and
513 either a host or CN. If a host is given it will use the CN
514 ``*.host/CN=host``.
516 For more information see :func:`run_simple`.
518 .. versionadded:: 0.9
520 :param base_path: the path to the certificate and key. The extension
521 ``.crt`` is added for the certificate, ``.key`` is
522 added for the key.
523 :param host: the name of the host. This can be used as an alternative
524 for the `cn`.
525 :param cn: the `CN` to use.
526 """
528 if host is not None:
529 cn = f"*.{host}/CN={host}"
530 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
532 from cryptography.hazmat.primitives import serialization
534 cert_file = f"{base_path}.crt"
535 pkey_file = f"{base_path}.key"
537 with open(cert_file, "wb") as f:
538 f.write(cert.public_bytes(serialization.Encoding.PEM))
539 with open(pkey_file, "wb") as f:
540 f.write(
541 pkey.private_bytes(
542 encoding=serialization.Encoding.PEM,
543 format=serialization.PrivateFormat.TraditionalOpenSSL,
544 encryption_algorithm=serialization.NoEncryption(),
545 )
546 )
548 return cert_file, pkey_file
551def generate_adhoc_ssl_context() -> "ssl.SSLContext":
552 """Generates an adhoc SSL context for the development server."""
553 import tempfile
554 import atexit
556 cert, pkey = generate_adhoc_ssl_pair()
558 from cryptography.hazmat.primitives import serialization
560 cert_handle, cert_file = tempfile.mkstemp()
561 pkey_handle, pkey_file = tempfile.mkstemp()
562 atexit.register(os.remove, pkey_file)
563 atexit.register(os.remove, cert_file)
565 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
566 os.write(
567 pkey_handle,
568 pkey.private_bytes(
569 encoding=serialization.Encoding.PEM,
570 format=serialization.PrivateFormat.TraditionalOpenSSL,
571 encryption_algorithm=serialization.NoEncryption(),
572 ),
573 )
575 os.close(cert_handle)
576 os.close(pkey_handle)
577 ctx = load_ssl_context(cert_file, pkey_file)
578 return ctx
581def load_ssl_context(
582 cert_file: str, pkey_file: t.Optional[str] = None, protocol: t.Optional[int] = None
583) -> "ssl.SSLContext":
584 """Loads SSL context from cert/private key files and optional protocol.
585 Many parameters are directly taken from the API of
586 :py:class:`ssl.SSLContext`.
588 :param cert_file: Path of the certificate to use.
589 :param pkey_file: Path of the private key to use. If not given, the key
590 will be obtained from the certificate file.
591 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
592 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
593 """
594 if protocol is None:
595 protocol = ssl.PROTOCOL_TLS_SERVER
597 ctx = ssl.SSLContext(protocol)
598 ctx.load_cert_chain(cert_file, pkey_file)
599 return ctx
602def is_ssl_error(error: t.Optional[Exception] = None) -> bool:
603 """Checks if the given error (or the current one) is an SSL error."""
604 if error is None:
605 error = t.cast(Exception, sys.exc_info()[1])
606 return isinstance(error, ssl.SSLError)
609def select_address_family(host: str, port: int) -> socket.AddressFamily:
610 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
611 the host and port."""
612 if host.startswith("unix://"):
613 return socket.AF_UNIX
614 elif ":" in host and hasattr(socket, "AF_INET6"):
615 return socket.AF_INET6
616 return socket.AF_INET
619def get_sockaddr(
620 host: str, port: int, family: socket.AddressFamily
621) -> t.Union[t.Tuple[str, int], str]:
622 """Return a fully qualified socket address that can be passed to
623 :func:`socket.bind`."""
624 if family == af_unix:
625 return host.split("://", 1)[1]
626 try:
627 res = socket.getaddrinfo(
628 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
629 )
630 except socket.gaierror:
631 return host, port
632 return res[0][4] # type: ignore
635def get_interface_ip(family: socket.AddressFamily) -> str:
636 """Get the IP address of an external interface. Used when binding to
637 0.0.0.0 or ::1 to show a more useful URL.
639 :meta private:
640 """
641 # arbitrary private address
642 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
644 with socket.socket(family, socket.SOCK_DGRAM) as s:
645 try:
646 s.connect((host, 58162))
647 except OSError:
648 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
650 return s.getsockname()[0] # type: ignore
653class BaseWSGIServer(HTTPServer):
654 """A WSGI server that that handles one request at a time.
656 Use :func:`make_server` to create a server instance.
657 """
659 multithread = False
660 multiprocess = False
661 request_queue_size = LISTEN_QUEUE
663 def __init__(
664 self,
665 host: str,
666 port: int,
667 app: "WSGIApplication",
668 handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
669 passthrough_errors: bool = False,
670 ssl_context: t.Optional[_TSSLContextArg] = None,
671 fd: t.Optional[int] = None,
672 ) -> None:
673 if handler is None:
674 handler = WSGIRequestHandler
676 # If the handler doesn't directly set a protocol version and
677 # thread or process workers are used, then allow chunked
678 # responses and keep-alive connections by enabling HTTP/1.1.
679 if "protocol_version" not in vars(handler) and (
680 self.multithread or self.multiprocess
681 ):
682 handler.protocol_version = "HTTP/1.1"
684 self.host = host
685 self.port = port
686 self.app = app
687 self.passthrough_errors = passthrough_errors
689 self.address_family = address_family = select_address_family(host, port)
690 server_address = get_sockaddr(host, int(port), address_family)
692 # Remove a leftover Unix socket file from a previous run. Don't
693 # remove a file that was set up by run_simple.
694 if address_family == af_unix and fd is None:
695 server_address = t.cast(str, server_address)
697 if os.path.exists(server_address):
698 os.unlink(server_address)
700 # Bind and activate will be handled manually, it should only
701 # happen if we're not using a socket that was already set up.
702 super().__init__(
703 server_address, # type: ignore[arg-type]
704 handler,
705 bind_and_activate=False,
706 )
708 if fd is None:
709 # No existing socket descriptor, do bind_and_activate=True.
710 try:
711 self.server_bind()
712 self.server_activate()
713 except BaseException:
714 self.server_close()
715 raise
716 else:
717 # Use the passed in socket directly.
718 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
719 self.server_address = self.socket.getsockname()
721 if address_family != af_unix:
722 # If port was 0, this will record the bound port.
723 self.port = self.server_address[1]
725 if ssl_context is not None:
726 if isinstance(ssl_context, tuple):
727 ssl_context = load_ssl_context(*ssl_context)
728 elif ssl_context == "adhoc":
729 ssl_context = generate_adhoc_ssl_context()
731 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
732 self.ssl_context: t.Optional["ssl.SSLContext"] = ssl_context
733 else:
734 self.ssl_context = None
736 def log(self, type: str, message: str, *args: t.Any) -> None:
737 _log(type, message, *args)
739 def serve_forever(self, poll_interval: float = 0.5) -> None:
740 try:
741 super().serve_forever(poll_interval=poll_interval)
742 except KeyboardInterrupt:
743 pass
744 finally:
745 self.server_close()
747 def handle_error(
748 self, request: t.Any, client_address: t.Union[t.Tuple[str, int], str]
749 ) -> None:
750 if self.passthrough_errors:
751 raise
753 return super().handle_error(request, client_address)
755 def log_startup(self) -> None:
756 """Show information about the address when starting the server."""
757 dev_warning = (
758 "WARNING: This is a development server. Do not use it in a production"
759 " deployment. Use a production WSGI server instead."
760 )
761 dev_warning = _ansi_style(dev_warning, "bold", "red")
762 messages = [dev_warning]
764 if self.address_family == af_unix:
765 messages.append(f" * Running on {self.host}")
766 else:
767 scheme = "http" if self.ssl_context is None else "https"
768 display_hostname = self.host
770 if self.host in {"0.0.0.0", "::"}:
771 messages.append(f" * Running on all addresses ({self.host})")
773 if self.host == "0.0.0.0":
774 localhost = "127.0.0.1"
775 display_hostname = get_interface_ip(socket.AF_INET)
776 else:
777 localhost = "[::1]"
778 display_hostname = get_interface_ip(socket.AF_INET6)
780 messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
782 if ":" in display_hostname:
783 display_hostname = f"[{display_hostname}]"
785 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
787 _log("info", "\n".join(messages))
790class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
791 """A WSGI server that handles concurrent requests in separate
792 threads.
794 Use :func:`make_server` to create a server instance.
795 """
797 multithread = True
798 daemon_threads = True
801class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
802 """A WSGI server that handles concurrent requests in separate forked
803 processes.
805 Use :func:`make_server` to create a server instance.
806 """
808 multiprocess = True
810 def __init__(
811 self,
812 host: str,
813 port: int,
814 app: "WSGIApplication",
815 processes: int = 40,
816 handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
817 passthrough_errors: bool = False,
818 ssl_context: t.Optional[_TSSLContextArg] = None,
819 fd: t.Optional[int] = None,
820 ) -> None:
821 if not can_fork:
822 raise ValueError("Your platform does not support forking.")
824 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
825 self.max_children = processes
828def make_server(
829 host: str,
830 port: int,
831 app: "WSGIApplication",
832 threaded: bool = False,
833 processes: int = 1,
834 request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
835 passthrough_errors: bool = False,
836 ssl_context: t.Optional[_TSSLContextArg] = None,
837 fd: t.Optional[int] = None,
838) -> BaseWSGIServer:
839 """Create an appropriate WSGI server instance based on the value of
840 ``threaded`` and ``processes``.
842 This is called from :func:`run_simple`, but can be used separately
843 to have access to the server object, such as to run it in a separate
844 thread.
846 See :func:`run_simple` for parameter docs.
847 """
848 if threaded and processes > 1:
849 raise ValueError("Cannot have a multi-thread and multi-process server.")
851 if threaded:
852 return ThreadedWSGIServer(
853 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
854 )
856 if processes > 1:
857 return ForkingWSGIServer(
858 host,
859 port,
860 app,
861 processes,
862 request_handler,
863 passthrough_errors,
864 ssl_context,
865 fd=fd,
866 )
868 return BaseWSGIServer(
869 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
870 )
873def is_running_from_reloader() -> bool:
874 """Check if the server is running as a subprocess within the
875 Werkzeug reloader.
877 .. versionadded:: 0.10
878 """
879 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
882def prepare_socket(hostname: str, port: int) -> socket.socket:
883 """Prepare a socket for use by the WSGI server and reloader.
885 The socket is marked inheritable so that it can be kept across
886 reloads instead of breaking connections.
888 Catch errors during bind and show simpler error messages. For
889 "address already in use", show instructions for resolving the issue,
890 with special instructions for macOS.
892 This is called from :func:`run_simple`, but can be used separately
893 to control server creation with :func:`make_server`.
894 """
895 address_family = select_address_family(hostname, port)
896 server_address = get_sockaddr(hostname, port, address_family)
897 s = socket.socket(address_family, socket.SOCK_STREAM)
898 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
899 s.set_inheritable(True)
901 # Remove the socket file if it already exists.
902 if address_family == af_unix:
903 server_address = t.cast(str, server_address)
905 if os.path.exists(server_address):
906 os.unlink(server_address)
908 # Catch connection issues and show them without the traceback. Show
909 # extra instructions for address not found, and for macOS.
910 try:
911 s.bind(server_address)
912 except OSError as e:
913 print(e.strerror, file=sys.stderr)
915 if e.errno == errno.EADDRINUSE:
916 print(
917 f"Port {port} is in use by another program. Either"
918 " identify and stop that program, or start the"
919 " server with a different port.",
920 file=sys.stderr,
921 )
923 if sys.platform == "darwin" and port == 5000:
924 print(
925 "On macOS, try disabling the 'AirPlay Receiver'"
926 " service from System Preferences -> Sharing.",
927 file=sys.stderr,
928 )
930 sys.exit(1)
932 s.listen(LISTEN_QUEUE)
933 return s
936def run_simple(
937 hostname: str,
938 port: int,
939 application: "WSGIApplication",
940 use_reloader: bool = False,
941 use_debugger: bool = False,
942 use_evalex: bool = True,
943 extra_files: t.Optional[t.Iterable[str]] = None,
944 exclude_patterns: t.Optional[t.Iterable[str]] = None,
945 reloader_interval: int = 1,
946 reloader_type: str = "auto",
947 threaded: bool = False,
948 processes: int = 1,
949 request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
950 static_files: t.Optional[t.Dict[str, t.Union[str, t.Tuple[str, str]]]] = None,
951 passthrough_errors: bool = False,
952 ssl_context: t.Optional[_TSSLContextArg] = None,
953) -> None:
954 """Start a development server for a WSGI application. Various
955 optional features can be enabled.
957 .. warning::
959 Do not use the development server when deploying to production.
960 It is intended for use only during local development. It is not
961 designed to be particularly efficient, stable, or secure.
963 :param hostname: The host to bind to, for example ``'localhost'``.
964 Can be a domain, IPv4 or IPv6 address, or file path starting
965 with ``unix://`` for a Unix socket.
966 :param port: The port to bind to, for example ``8080``. Using ``0``
967 tells the OS to pick a random free port.
968 :param application: The WSGI application to run.
969 :param use_reloader: Use a reloader process to restart the server
970 process when files are changed.
971 :param use_debugger: Use Werkzeug's debugger, which will show
972 formatted tracebacks on unhandled exceptions.
973 :param use_evalex: Make the debugger interactive. A Python terminal
974 can be opened for any frame in the traceback. Some protection is
975 provided by requiring a PIN, but this should never be enabled
976 on a publicly visible server.
977 :param extra_files: The reloader will watch these files for changes
978 in addition to Python modules. For example, watch a
979 configuration file.
980 :param exclude_patterns: The reloader will ignore changes to any
981 files matching these :mod:`fnmatch` patterns. For example,
982 ignore cache files.
983 :param reloader_interval: How often the reloader tries to check for
984 changes.
985 :param reloader_type: The reloader to use. The ``'stat'`` reloader
986 is built in, but may require significant CPU to watch files. The
987 ``'watchdog'`` reloader is much more efficient but requires
988 installing the ``watchdog`` package first.
989 :param threaded: Handle concurrent requests using threads. Cannot be
990 used with ``processes``.
991 :param processes: Handle concurrent requests using up to this number
992 of processes. Cannot be used with ``threaded``.
993 :param request_handler: Use a different
994 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
995 handle requests.
996 :param static_files: A dict mapping URL prefixes to directories to
997 serve static files from using
998 :class:`~werkzeug.middleware.SharedDataMiddleware`.
999 :param passthrough_errors: Don't catch unhandled exceptions at the
1000 server level, let the serve crash instead. If ``use_debugger``
1001 is enabled, the debugger will still catch such errors.
1002 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
1003 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
1004 tuple to create a typical context, or the string ``'adhoc'`` to
1005 generate a temporary self-signed certificate.
1007 .. versionchanged:: 2.1
1008 Instructions are shown for dealing with an "address already in
1009 use" error.
1011 .. versionchanged:: 2.1
1012 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
1013 addition to a real IP.
1015 .. versionchanged:: 2.1
1016 The command-line interface was removed.
1018 .. versionchanged:: 2.0
1019 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
1020 was bound as well as a warning not to run the development server
1021 in production.
1023 .. versionchanged:: 2.0
1024 The ``exclude_patterns`` parameter was added.
1026 .. versionchanged:: 0.15
1027 Bind to a Unix socket by passing a ``hostname`` that starts with
1028 ``unix://``.
1030 .. versionchanged:: 0.10
1031 Improved the reloader and added support for changing the backend
1032 through the ``reloader_type`` parameter.
1034 .. versionchanged:: 0.9
1035 A command-line interface was added.
1037 .. versionchanged:: 0.8
1038 ``ssl_context`` can be a tuple of paths to the certificate and
1039 private key files.
1041 .. versionchanged:: 0.6
1042 The ``ssl_context`` parameter was added.
1044 .. versionchanged:: 0.5
1045 The ``static_files`` and ``passthrough_errors`` parameters were
1046 added.
1047 """
1048 if not isinstance(port, int):
1049 raise TypeError("port must be an integer")
1051 if static_files:
1052 from .middleware.shared_data import SharedDataMiddleware
1054 application = SharedDataMiddleware(application, static_files)
1056 if use_debugger:
1057 from .debug import DebuggedApplication
1059 application = DebuggedApplication(application, evalex=use_evalex)
1061 if not is_running_from_reloader():
1062 s = prepare_socket(hostname, port)
1063 fd = s.fileno()
1064 # Silence a ResourceWarning about an unclosed socket. This object is no longer
1065 # used, the server will create another with fromfd.
1066 s.detach()
1067 os.environ["WERKZEUG_SERVER_FD"] = str(fd)
1068 else:
1069 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1071 srv = make_server(
1072 hostname,
1073 port,
1074 application,
1075 threaded,
1076 processes,
1077 request_handler,
1078 passthrough_errors,
1079 ssl_context,
1080 fd=fd,
1081 )
1083 if not is_running_from_reloader():
1084 srv.log_startup()
1085 _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
1087 if use_reloader:
1088 from ._reloader import run_with_reloader
1090 run_with_reloader(
1091 srv.serve_forever,
1092 extra_files=extra_files,
1093 exclude_patterns=exclude_patterns,
1094 interval=reloader_interval,
1095 reloader_type=reloader_type,
1096 )
1097 else:
1098 srv.serve_forever()