Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%
468 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
9.. code-block:: python
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
14import errno
15import io
16import os
17import socket
18import socketserver
19import sys
20import typing as t
21from datetime import datetime as dt
22from datetime import timedelta
23from datetime import timezone
24from http.server import BaseHTTPRequestHandler
25from http.server import HTTPServer
27from ._internal import _log
28from ._internal import _wsgi_encoding_dance
29from .exceptions import InternalServerError
30from .urls import uri_to_iri
31from .urls import url_parse
32from .urls import url_unquote
34try:
35 import ssl
36except ImportError:
38 class _SslDummy:
39 def __getattr__(self, name: str) -> t.Any:
40 raise RuntimeError( # noqa: B904
41 "SSL is unavailable because this Python runtime was not"
42 " compiled with SSL/TLS support."
43 )
45 ssl = _SslDummy() # type: ignore
47_log_add_style = True
49if os.name == "nt":
50 try:
51 __import__("colorama")
52 except ImportError:
53 _log_add_style = False
55can_fork = hasattr(os, "fork")
57if can_fork:
58 ForkingMixIn = socketserver.ForkingMixIn
59else:
61 class ForkingMixIn: # type: ignore
62 pass
65try:
66 af_unix = socket.AF_UNIX
67except AttributeError:
68 af_unix = None # type: ignore
70LISTEN_QUEUE = 128
72_TSSLContextArg = t.Optional[
73 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], "te.Literal['adhoc']"]
74]
76if t.TYPE_CHECKING:
77 import typing_extensions as te # noqa: F401
78 from _typeshed.wsgi import WSGIApplication
79 from _typeshed.wsgi import WSGIEnvironment
80 from cryptography.hazmat.primitives.asymmetric.rsa import (
81 RSAPrivateKeyWithSerialization,
82 )
83 from cryptography.x509 import Certificate
86class DechunkedInput(io.RawIOBase):
87 """An input stream that handles Transfer-Encoding 'chunked'"""
89 def __init__(self, rfile: t.IO[bytes]) -> None:
90 self._rfile = rfile
91 self._done = False
92 self._len = 0
94 def readable(self) -> bool:
95 return True
97 def read_chunk_len(self) -> int:
98 try:
99 line = self._rfile.readline().decode("latin1")
100 _len = int(line.strip(), 16)
101 except ValueError as e:
102 raise OSError("Invalid chunk header") from e
103 if _len < 0:
104 raise OSError("Negative chunk length not allowed")
105 return _len
107 def readinto(self, buf: bytearray) -> int: # type: ignore
108 read = 0
109 while not self._done and read < len(buf):
110 if self._len == 0:
111 # This is the first chunk or we fully consumed the previous
112 # one. Read the next length of the next chunk
113 self._len = self.read_chunk_len()
115 if self._len == 0:
116 # Found the final chunk of size 0. The stream is now exhausted,
117 # but there is still a final newline that should be consumed
118 self._done = True
120 if self._len > 0:
121 # There is data (left) in this chunk, so append it to the
122 # buffer. If this operation fully consumes the chunk, this will
123 # reset self._len to 0.
124 n = min(len(buf), self._len)
126 # If (read + chunk size) becomes more than len(buf), buf will
127 # grow beyond the original size and read more data than
128 # required. So only read as much data as can fit in buf.
129 if read + n > len(buf):
130 buf[read:] = self._rfile.read(len(buf) - read)
131 self._len -= len(buf) - read
132 read = len(buf)
133 else:
134 buf[read : read + n] = self._rfile.read(n)
135 self._len -= n
136 read += n
138 if self._len == 0:
139 # Skip the terminating newline of a chunk that has been fully
140 # consumed. This also applies to the 0-sized final chunk
141 terminator = self._rfile.readline()
142 if terminator not in (b"\n", b"\r\n", b"\r"):
143 raise OSError("Missing chunk terminating newline")
145 return read
148class WSGIRequestHandler(BaseHTTPRequestHandler):
149 """A request handler that implements WSGI dispatching."""
151 server: "BaseWSGIServer"
153 @property
154 def server_version(self) -> str: # type: ignore
155 from . import __version__
157 return f"Werkzeug/{__version__}"
159 def make_environ(self) -> "WSGIEnvironment":
160 request_url = url_parse(self.path)
161 url_scheme = "http" if self.server.ssl_context is None else "https"
163 if not self.client_address:
164 self.client_address = ("<local>", 0)
165 elif isinstance(self.client_address, str):
166 self.client_address = (self.client_address, 0)
168 # If there was no scheme but the path started with two slashes,
169 # the first segment may have been incorrectly parsed as the
170 # netloc, prepend it to the path again.
171 if not request_url.scheme and request_url.netloc:
172 path_info = f"/{request_url.netloc}{request_url.path}"
173 else:
174 path_info = request_url.path
176 path_info = url_unquote(path_info)
178 environ: "WSGIEnvironment" = {
179 "wsgi.version": (1, 0),
180 "wsgi.url_scheme": url_scheme,
181 "wsgi.input": self.rfile,
182 "wsgi.errors": sys.stderr,
183 "wsgi.multithread": self.server.multithread,
184 "wsgi.multiprocess": self.server.multiprocess,
185 "wsgi.run_once": False,
186 "werkzeug.socket": self.connection,
187 "SERVER_SOFTWARE": self.server_version,
188 "REQUEST_METHOD": self.command,
189 "SCRIPT_NAME": "",
190 "PATH_INFO": _wsgi_encoding_dance(path_info),
191 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
192 # Non-standard, added by mod_wsgi, uWSGI
193 "REQUEST_URI": _wsgi_encoding_dance(self.path),
194 # Non-standard, added by gunicorn
195 "RAW_URI": _wsgi_encoding_dance(self.path),
196 "REMOTE_ADDR": self.address_string(),
197 "REMOTE_PORT": self.port_integer(),
198 "SERVER_NAME": self.server.server_address[0],
199 "SERVER_PORT": str(self.server.server_address[1]),
200 "SERVER_PROTOCOL": self.request_version,
201 }
203 for key, value in self.headers.items():
204 key = key.upper().replace("-", "_")
205 value = value.replace("\r\n", "")
206 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
207 key = f"HTTP_{key}"
208 if key in environ:
209 value = f"{environ[key]},{value}"
210 environ[key] = value
212 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
213 environ["wsgi.input_terminated"] = True
214 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
216 # Per RFC 2616, if the URL is absolute, use that as the host.
217 # We're using "has a scheme" to indicate an absolute URL.
218 if request_url.scheme and request_url.netloc:
219 environ["HTTP_HOST"] = request_url.netloc
221 try:
222 # binary_form=False gives nicer information, but wouldn't be compatible with
223 # what Nginx or Apache could return.
224 peer_cert = self.connection.getpeercert( # type: ignore[attr-defined]
225 binary_form=True
226 )
227 if peer_cert is not None:
228 # Nginx and Apache use PEM format.
229 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
230 except ValueError:
231 # SSL handshake hasn't finished.
232 self.server.log("error", "Cannot fetch SSL peer certificate info")
233 except AttributeError:
234 # Not using TLS, the socket will not have getpeercert().
235 pass
237 return environ
239 def run_wsgi(self) -> None:
240 if self.headers.get("Expect", "").lower().strip() == "100-continue":
241 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
243 self.environ = environ = self.make_environ()
244 status_set: t.Optional[str] = None
245 headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None
246 status_sent: t.Optional[str] = None
247 headers_sent: t.Optional[t.List[t.Tuple[str, str]]] = None
248 chunk_response: bool = False
250 def write(data: bytes) -> None:
251 nonlocal status_sent, headers_sent, chunk_response
252 assert status_set is not None, "write() before start_response"
253 assert headers_set is not None, "write() before start_response"
254 if status_sent is None:
255 status_sent = status_set
256 headers_sent = headers_set
257 try:
258 code_str, msg = status_sent.split(None, 1)
259 except ValueError:
260 code_str, msg = status_sent, ""
261 code = int(code_str)
262 self.send_response(code, msg)
263 header_keys = set()
264 for key, value in headers_sent:
265 self.send_header(key, value)
266 header_keys.add(key.lower())
268 # Use chunked transfer encoding if there is no content
269 # length. Do not use for 1xx and 204 responses. 304
270 # responses and HEAD requests are also excluded, which
271 # is the more conservative behavior and matches other
272 # parts of the code.
273 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
274 if (
275 not (
276 "content-length" in header_keys
277 or environ["REQUEST_METHOD"] == "HEAD"
278 or (100 <= code < 200)
279 or code in {204, 304}
280 )
281 and self.protocol_version >= "HTTP/1.1"
282 ):
283 chunk_response = True
284 self.send_header("Transfer-Encoding", "chunked")
286 # Always close the connection. This disables HTTP/1.1
287 # keep-alive connections. They aren't handled well by
288 # Python's http.server because it doesn't know how to
289 # drain the stream before the next request line.
290 self.send_header("Connection", "close")
291 self.end_headers()
293 assert isinstance(data, bytes), "applications must write bytes"
295 if data:
296 if chunk_response:
297 self.wfile.write(hex(len(data))[2:].encode())
298 self.wfile.write(b"\r\n")
300 self.wfile.write(data)
302 if chunk_response:
303 self.wfile.write(b"\r\n")
305 self.wfile.flush()
307 def start_response(status, headers, exc_info=None): # type: ignore
308 nonlocal status_set, headers_set
309 if exc_info:
310 try:
311 if headers_sent:
312 raise exc_info[1].with_traceback(exc_info[2])
313 finally:
314 exc_info = None
315 elif headers_set:
316 raise AssertionError("Headers already set")
317 status_set = status
318 headers_set = headers
319 return write
321 def execute(app: "WSGIApplication") -> None:
322 application_iter = app(environ, start_response)
323 try:
324 for data in application_iter:
325 write(data)
326 if not headers_sent:
327 write(b"")
328 if chunk_response:
329 self.wfile.write(b"0\r\n\r\n")
330 finally:
331 if hasattr(application_iter, "close"):
332 application_iter.close() # type: ignore
334 try:
335 execute(self.server.app)
336 except (ConnectionError, socket.timeout) as e:
337 self.connection_dropped(e, environ)
338 except Exception as e:
339 if self.server.passthrough_errors:
340 raise
342 if status_sent is not None and chunk_response:
343 self.close_connection = True
345 try:
346 # if we haven't yet sent the headers but they are set
347 # we roll back to be able to set them again.
348 if status_sent is None:
349 status_set = None
350 headers_set = None
351 execute(InternalServerError())
352 except Exception:
353 pass
355 from .debug.tbtools import DebugTraceback
357 msg = DebugTraceback(e).render_traceback_text()
358 self.server.log("error", f"Error on request:\n{msg}")
360 def handle(self) -> None:
361 """Handles a request ignoring dropped connections."""
362 try:
363 super().handle()
364 except (ConnectionError, socket.timeout) as e:
365 self.connection_dropped(e)
366 except Exception as e:
367 if self.server.ssl_context is not None and is_ssl_error(e):
368 self.log_error("SSL error occurred: %s", e)
369 else:
370 raise
372 def connection_dropped(
373 self, error: BaseException, environ: t.Optional["WSGIEnvironment"] = None
374 ) -> None:
375 """Called if the connection was closed by the client. By default
376 nothing happens.
377 """
379 def __getattr__(self, name: str) -> t.Any:
380 # All HTTP methods are handled by run_wsgi.
381 if name.startswith("do_"):
382 return self.run_wsgi
384 # All other attributes are forwarded to the base class.
385 return getattr(super(), name)
387 def address_string(self) -> str:
388 if getattr(self, "environ", None):
389 return self.environ["REMOTE_ADDR"] # type: ignore
391 if not self.client_address:
392 return "<local>"
394 return self.client_address[0]
396 def port_integer(self) -> int:
397 return self.client_address[1]
399 def log_request(
400 self, code: t.Union[int, str] = "-", size: t.Union[int, str] = "-"
401 ) -> None:
402 try:
403 path = uri_to_iri(self.path)
404 msg = f"{self.command} {path} {self.request_version}"
405 except AttributeError:
406 # path isn't set if the requestline was bad
407 msg = self.requestline
409 code = str(code)
411 if _log_add_style:
412 if code[0] == "1": # 1xx - Informational
413 msg = _ansi_style(msg, "bold")
414 elif code == "200": # 2xx - Success
415 pass
416 elif code == "304": # 304 - Resource Not Modified
417 msg = _ansi_style(msg, "cyan")
418 elif code[0] == "3": # 3xx - Redirection
419 msg = _ansi_style(msg, "green")
420 elif code == "404": # 404 - Resource Not Found
421 msg = _ansi_style(msg, "yellow")
422 elif code[0] == "4": # 4xx - Client Error
423 msg = _ansi_style(msg, "bold", "red")
424 else: # 5xx, or any other response
425 msg = _ansi_style(msg, "bold", "magenta")
427 self.log("info", '"%s" %s %s', msg, code, size)
429 def log_error(self, format: str, *args: t.Any) -> None:
430 self.log("error", format, *args)
432 def log_message(self, format: str, *args: t.Any) -> None:
433 self.log("info", format, *args)
435 def log(self, type: str, message: str, *args: t.Any) -> None:
436 _log(
437 type,
438 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n",
439 *args,
440 )
443def _ansi_style(value: str, *styles: str) -> str:
444 codes = {
445 "bold": 1,
446 "red": 31,
447 "green": 32,
448 "yellow": 33,
449 "magenta": 35,
450 "cyan": 36,
451 }
453 for style in styles:
454 value = f"\x1b[{codes[style]}m{value}"
456 return f"{value}\x1b[0m"
459def generate_adhoc_ssl_pair(
460 cn: t.Optional[str] = None,
461) -> t.Tuple["Certificate", "RSAPrivateKeyWithSerialization"]:
462 try:
463 from cryptography import x509
464 from cryptography.x509.oid import NameOID
465 from cryptography.hazmat.backends import default_backend
466 from cryptography.hazmat.primitives import hashes
467 from cryptography.hazmat.primitives.asymmetric import rsa
468 except ImportError:
469 raise TypeError(
470 "Using ad-hoc certificates requires the cryptography library."
471 ) from None
473 backend = default_backend()
474 pkey = rsa.generate_private_key(
475 public_exponent=65537, key_size=2048, backend=backend
476 )
478 # pretty damn sure that this is not actually accepted by anyone
479 if cn is None:
480 cn = "*"
482 subject = x509.Name(
483 [
484 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
485 x509.NameAttribute(NameOID.COMMON_NAME, cn),
486 ]
487 )
489 backend = default_backend()
490 cert = (
491 x509.CertificateBuilder()
492 .subject_name(subject)
493 .issuer_name(subject)
494 .public_key(pkey.public_key())
495 .serial_number(x509.random_serial_number())
496 .not_valid_before(dt.now(timezone.utc))
497 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
498 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
499 .add_extension(x509.SubjectAlternativeName([x509.DNSName(cn)]), critical=False)
500 .sign(pkey, hashes.SHA256(), backend)
501 )
502 return cert, pkey
505def make_ssl_devcert(
506 base_path: str, host: t.Optional[str] = None, cn: t.Optional[str] = None
507) -> t.Tuple[str, str]:
508 """Creates an SSL key for development. This should be used instead of
509 the ``'adhoc'`` key which generates a new cert on each server start.
510 It accepts a path for where it should store the key and cert and
511 either a host or CN. If a host is given it will use the CN
512 ``*.host/CN=host``.
514 For more information see :func:`run_simple`.
516 .. versionadded:: 0.9
518 :param base_path: the path to the certificate and key. The extension
519 ``.crt`` is added for the certificate, ``.key`` is
520 added for the key.
521 :param host: the name of the host. This can be used as an alternative
522 for the `cn`.
523 :param cn: the `CN` to use.
524 """
526 if host is not None:
527 cn = f"*.{host}/CN={host}"
528 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
530 from cryptography.hazmat.primitives import serialization
532 cert_file = f"{base_path}.crt"
533 pkey_file = f"{base_path}.key"
535 with open(cert_file, "wb") as f:
536 f.write(cert.public_bytes(serialization.Encoding.PEM))
537 with open(pkey_file, "wb") as f:
538 f.write(
539 pkey.private_bytes(
540 encoding=serialization.Encoding.PEM,
541 format=serialization.PrivateFormat.TraditionalOpenSSL,
542 encryption_algorithm=serialization.NoEncryption(),
543 )
544 )
546 return cert_file, pkey_file
549def generate_adhoc_ssl_context() -> "ssl.SSLContext":
550 """Generates an adhoc SSL context for the development server."""
551 import tempfile
552 import atexit
554 cert, pkey = generate_adhoc_ssl_pair()
556 from cryptography.hazmat.primitives import serialization
558 cert_handle, cert_file = tempfile.mkstemp()
559 pkey_handle, pkey_file = tempfile.mkstemp()
560 atexit.register(os.remove, pkey_file)
561 atexit.register(os.remove, cert_file)
563 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
564 os.write(
565 pkey_handle,
566 pkey.private_bytes(
567 encoding=serialization.Encoding.PEM,
568 format=serialization.PrivateFormat.TraditionalOpenSSL,
569 encryption_algorithm=serialization.NoEncryption(),
570 ),
571 )
573 os.close(cert_handle)
574 os.close(pkey_handle)
575 ctx = load_ssl_context(cert_file, pkey_file)
576 return ctx
579def load_ssl_context(
580 cert_file: str, pkey_file: t.Optional[str] = None, protocol: t.Optional[int] = None
581) -> "ssl.SSLContext":
582 """Loads SSL context from cert/private key files and optional protocol.
583 Many parameters are directly taken from the API of
584 :py:class:`ssl.SSLContext`.
586 :param cert_file: Path of the certificate to use.
587 :param pkey_file: Path of the private key to use. If not given, the key
588 will be obtained from the certificate file.
589 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
590 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
591 """
592 if protocol is None:
593 protocol = ssl.PROTOCOL_TLS_SERVER
595 ctx = ssl.SSLContext(protocol)
596 ctx.load_cert_chain(cert_file, pkey_file)
597 return ctx
600def is_ssl_error(error: t.Optional[Exception] = None) -> bool:
601 """Checks if the given error (or the current one) is an SSL error."""
602 if error is None:
603 error = t.cast(Exception, sys.exc_info()[1])
604 return isinstance(error, ssl.SSLError)
607def select_address_family(host: str, port: int) -> socket.AddressFamily:
608 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
609 the host and port."""
610 if host.startswith("unix://"):
611 return socket.AF_UNIX
612 elif ":" in host and hasattr(socket, "AF_INET6"):
613 return socket.AF_INET6
614 return socket.AF_INET
617def get_sockaddr(
618 host: str, port: int, family: socket.AddressFamily
619) -> t.Union[t.Tuple[str, int], str]:
620 """Return a fully qualified socket address that can be passed to
621 :func:`socket.bind`."""
622 if family == af_unix:
623 return host.split("://", 1)[1]
624 try:
625 res = socket.getaddrinfo(
626 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
627 )
628 except socket.gaierror:
629 return host, port
630 return res[0][4] # type: ignore
633def get_interface_ip(family: socket.AddressFamily) -> str:
634 """Get the IP address of an external interface. Used when binding to
635 0.0.0.0 or ::1 to show a more useful URL.
637 :meta private:
638 """
639 # arbitrary private address
640 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
642 with socket.socket(family, socket.SOCK_DGRAM) as s:
643 try:
644 s.connect((host, 58162))
645 except OSError:
646 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
648 return s.getsockname()[0] # type: ignore
651class BaseWSGIServer(HTTPServer):
652 """A WSGI server that that handles one request at a time.
654 Use :func:`make_server` to create a server instance.
655 """
657 multithread = False
658 multiprocess = False
659 request_queue_size = LISTEN_QUEUE
661 def __init__(
662 self,
663 host: str,
664 port: int,
665 app: "WSGIApplication",
666 handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
667 passthrough_errors: bool = False,
668 ssl_context: t.Optional[_TSSLContextArg] = None,
669 fd: t.Optional[int] = None,
670 ) -> None:
671 if handler is None:
672 handler = WSGIRequestHandler
674 # If the handler doesn't directly set a protocol version and
675 # thread or process workers are used, then allow chunked
676 # responses and keep-alive connections by enabling HTTP/1.1.
677 if "protocol_version" not in vars(handler) and (
678 self.multithread or self.multiprocess
679 ):
680 handler.protocol_version = "HTTP/1.1"
682 self.host = host
683 self.port = port
684 self.app = app
685 self.passthrough_errors = passthrough_errors
687 self.address_family = address_family = select_address_family(host, port)
688 server_address = get_sockaddr(host, int(port), address_family)
690 # Remove a leftover Unix socket file from a previous run. Don't
691 # remove a file that was set up by run_simple.
692 if address_family == af_unix and fd is None:
693 server_address = t.cast(str, server_address)
695 if os.path.exists(server_address):
696 os.unlink(server_address)
698 # Bind and activate will be handled manually, it should only
699 # happen if we're not using a socket that was already set up.
700 super().__init__(
701 server_address, # type: ignore[arg-type]
702 handler,
703 bind_and_activate=False,
704 )
706 if fd is None:
707 # No existing socket descriptor, do bind_and_activate=True.
708 try:
709 self.server_bind()
710 self.server_activate()
711 except BaseException:
712 self.server_close()
713 raise
714 else:
715 # Use the passed in socket directly.
716 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
717 self.server_address = self.socket.getsockname()
719 if address_family != af_unix:
720 # If port was 0, this will record the bound port.
721 self.port = self.server_address[1]
723 if ssl_context is not None:
724 if isinstance(ssl_context, tuple):
725 ssl_context = load_ssl_context(*ssl_context)
726 elif ssl_context == "adhoc":
727 ssl_context = generate_adhoc_ssl_context()
729 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
730 self.ssl_context: t.Optional["ssl.SSLContext"] = ssl_context
731 else:
732 self.ssl_context = None
734 def log(self, type: str, message: str, *args: t.Any) -> None:
735 _log(type, message, *args)
737 def serve_forever(self, poll_interval: float = 0.5) -> None:
738 try:
739 super().serve_forever(poll_interval=poll_interval)
740 except KeyboardInterrupt:
741 pass
742 finally:
743 self.server_close()
745 def handle_error(
746 self, request: t.Any, client_address: t.Union[t.Tuple[str, int], str]
747 ) -> None:
748 if self.passthrough_errors:
749 raise
751 return super().handle_error(request, client_address)
753 def log_startup(self) -> None:
754 """Show information about the address when starting the server."""
755 if self.address_family == af_unix:
756 _log("info", f" * Running on {self.host} (Press CTRL+C to quit)")
757 else:
758 scheme = "http" if self.ssl_context is None else "https"
759 messages = []
760 all_addresses_message = (
761 f" * Running on all addresses ({self.host})\n"
762 " WARNING: This is a development server. Do not use it in"
763 " a production deployment."
764 )
766 if self.host == "0.0.0.0":
767 messages.append(all_addresses_message)
768 messages.append(f" * Running on {scheme}://127.0.0.1:{self.port}")
769 display_hostname = get_interface_ip(socket.AF_INET)
770 elif self.host == "::":
771 messages.append(all_addresses_message)
772 messages.append(f" * Running on {scheme}://[::1]:{self.port}")
773 display_hostname = get_interface_ip(socket.AF_INET6)
774 else:
775 display_hostname = self.host
777 if ":" in display_hostname:
778 display_hostname = f"[{display_hostname}]"
780 messages.append(
781 f" * Running on {scheme}://{display_hostname}:{self.port}"
782 " (Press CTRL+C to quit)"
783 )
784 _log("info", "\n".join(messages))
787class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
788 """A WSGI server that handles concurrent requests in separate
789 threads.
791 Use :func:`make_server` to create a server instance.
792 """
794 multithread = True
795 daemon_threads = True
798class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
799 """A WSGI server that handles concurrent requests in separate forked
800 processes.
802 Use :func:`make_server` to create a server instance.
803 """
805 multiprocess = True
807 def __init__(
808 self,
809 host: str,
810 port: int,
811 app: "WSGIApplication",
812 processes: int = 40,
813 handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
814 passthrough_errors: bool = False,
815 ssl_context: t.Optional[_TSSLContextArg] = None,
816 fd: t.Optional[int] = None,
817 ) -> None:
818 if not can_fork:
819 raise ValueError("Your platform does not support forking.")
821 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
822 self.max_children = processes
825def make_server(
826 host: str,
827 port: int,
828 app: "WSGIApplication",
829 threaded: bool = False,
830 processes: int = 1,
831 request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
832 passthrough_errors: bool = False,
833 ssl_context: t.Optional[_TSSLContextArg] = None,
834 fd: t.Optional[int] = None,
835) -> BaseWSGIServer:
836 """Create an appropriate WSGI server instance based on the value of
837 ``threaded`` and ``processes``.
839 This is called from :func:`run_simple`, but can be used separately
840 to have access to the server object, such as to run it in a separate
841 thread.
843 See :func:`run_simple` for parameter docs.
844 """
845 if threaded and processes > 1:
846 raise ValueError("Cannot have a multi-thread and multi-process server.")
848 if threaded:
849 return ThreadedWSGIServer(
850 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
851 )
853 if processes > 1:
854 return ForkingWSGIServer(
855 host,
856 port,
857 app,
858 processes,
859 request_handler,
860 passthrough_errors,
861 ssl_context,
862 fd=fd,
863 )
865 return BaseWSGIServer(
866 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
867 )
870def is_running_from_reloader() -> bool:
871 """Check if the server is running as a subprocess within the
872 Werkzeug reloader.
874 .. versionadded:: 0.10
875 """
876 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
879def prepare_socket(hostname: str, port: int) -> socket.socket:
880 """Prepare a socket for use by the WSGI server and reloader.
882 The socket is marked inheritable so that it can be kept across
883 reloads instead of breaking connections.
885 Catch errors during bind and show simpler error messages. For
886 "address already in use", show instructions for resolving the issue,
887 with special instructions for macOS.
889 This is called from :func:`run_simple`, but can be used separately
890 to control server creation with :func:`make_server`.
891 """
892 address_family = select_address_family(hostname, port)
893 server_address = get_sockaddr(hostname, port, address_family)
894 s = socket.socket(address_family, socket.SOCK_STREAM)
895 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
896 s.set_inheritable(True)
898 # Remove the socket file if it already exists.
899 if address_family == af_unix:
900 server_address = t.cast(str, server_address)
902 if os.path.exists(server_address):
903 os.unlink(server_address)
905 # Catch connection issues and show them without the traceback. Show
906 # extra instructions for address not found, and for macOS.
907 try:
908 s.bind(server_address)
909 except OSError as e:
910 print(e.strerror, file=sys.stderr)
912 if e.errno == errno.EADDRINUSE:
913 print(
914 f"Port {port} is in use by another program. Either"
915 " identify and stop that program, or start the"
916 " server with a different port.",
917 file=sys.stderr,
918 )
920 if sys.platform == "darwin" and port == 5000:
921 print(
922 "On macOS, try disabling the 'AirPlay Receiver'"
923 " service from System Preferences -> Sharing.",
924 file=sys.stderr,
925 )
927 sys.exit(1)
929 s.listen(LISTEN_QUEUE)
930 return s
933def run_simple(
934 hostname: str,
935 port: int,
936 application: "WSGIApplication",
937 use_reloader: bool = False,
938 use_debugger: bool = False,
939 use_evalex: bool = True,
940 extra_files: t.Optional[t.Iterable[str]] = None,
941 exclude_patterns: t.Optional[t.Iterable[str]] = None,
942 reloader_interval: int = 1,
943 reloader_type: str = "auto",
944 threaded: bool = False,
945 processes: int = 1,
946 request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
947 static_files: t.Optional[t.Dict[str, t.Union[str, t.Tuple[str, str]]]] = None,
948 passthrough_errors: bool = False,
949 ssl_context: t.Optional[_TSSLContextArg] = None,
950) -> None:
951 """Start a development server for a WSGI application. Various
952 optional features can be enabled.
954 .. warning::
956 Do not use the development server when deploying to production.
957 It is intended for use only during local development. It is not
958 designed to be particularly efficient, stable, or secure.
960 :param hostname: The host to bind to, for example ``'localhost'``.
961 Can be a domain, IPv4 or IPv6 address, or file path starting
962 with ``unix://`` for a Unix socket.
963 :param port: The port to bind to, for example ``8080``. Using ``0``
964 tells the OS to pick a random free port.
965 :param application: The WSGI application to run.
966 :param use_reloader: Use a reloader process to restart the server
967 process when files are changed.
968 :param use_debugger: Use Werkzeug's debugger, which will show
969 formatted tracebacks on unhandled exceptions.
970 :param use_evalex: Make the debugger interactive. A Python terminal
971 can be opened for any frame in the traceback. Some protection is
972 provided by requiring a PIN, but this should never be enabled
973 on a publicly visible server.
974 :param extra_files: The reloader will watch these files for changes
975 in addition to Python modules. For example, watch a
976 configuration file.
977 :param exclude_patterns: The reloader will ignore changes to any
978 files matching these :mod:`fnmatch` patterns. For example,
979 ignore cache files.
980 :param reloader_interval: How often the reloader tries to check for
981 changes.
982 :param reloader_type: The reloader to use. The ``'stat'`` reloader
983 is built in, but may require significant CPU to watch files. The
984 ``'watchdog'`` reloader is much more efficient but requires
985 installing the ``watchdog`` package first.
986 :param threaded: Handle concurrent requests using threads. Cannot be
987 used with ``processes``.
988 :param processes: Handle concurrent requests using up to this number
989 of processes. Cannot be used with ``threaded``.
990 :param request_handler: Use a different
991 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
992 handle requests.
993 :param static_files: A dict mapping URL prefixes to directories to
994 serve static files from using
995 :class:`~werkzeug.middleware.SharedDataMiddleware`.
996 :param passthrough_errors: Don't catch unhandled exceptions at the
997 server level, let the serve crash instead. If ``use_debugger``
998 is enabled, the debugger will still catch such errors.
999 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
1000 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
1001 tuple to create a typical context, or the string ``'adhoc'`` to
1002 generate a temporary self-signed certificate.
1004 .. versionchanged:: 2.1
1005 Instructions are shown for dealing with an "address already in
1006 use" error.
1008 .. versionchanged:: 2.1
1009 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
1010 addition to a real IP.
1012 .. versionchanged:: 2.1
1013 The command-line interface was removed.
1015 .. versionchanged:: 2.0
1016 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
1017 was bound as well as a warning not to run the development server
1018 in production.
1020 .. versionchanged:: 2.0
1021 The ``exclude_patterns`` parameter was added.
1023 .. versionchanged:: 0.15
1024 Bind to a Unix socket by passing a ``hostname`` that starts with
1025 ``unix://``.
1027 .. versionchanged:: 0.10
1028 Improved the reloader and added support for changing the backend
1029 through the ``reloader_type`` parameter.
1031 .. versionchanged:: 0.9
1032 A command-line interface was added.
1034 .. versionchanged:: 0.8
1035 ``ssl_context`` can be a tuple of paths to the certificate and
1036 private key files.
1038 .. versionchanged:: 0.6
1039 The ``ssl_context`` parameter was added.
1041 .. versionchanged:: 0.5
1042 The ``static_files`` and ``passthrough_errors`` parameters were
1043 added.
1044 """
1045 if not isinstance(port, int):
1046 raise TypeError("port must be an integer")
1048 if static_files:
1049 from .middleware.shared_data import SharedDataMiddleware
1051 application = SharedDataMiddleware(application, static_files)
1053 if use_debugger:
1054 from .debug import DebuggedApplication
1056 application = DebuggedApplication(application, evalex=use_evalex)
1058 if not is_running_from_reloader():
1059 s = prepare_socket(hostname, port)
1060 fd = s.fileno()
1061 os.environ["WERKZEUG_SERVER_FD"] = str(fd)
1062 else:
1063 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1065 srv = make_server(
1066 hostname,
1067 port,
1068 application,
1069 threaded,
1070 processes,
1071 request_handler,
1072 passthrough_errors,
1073 ssl_context,
1074 fd=fd,
1075 )
1077 if not is_running_from_reloader():
1078 srv.log_startup()
1080 if use_reloader:
1081 from ._reloader import run_with_reloader
1083 run_with_reloader(
1084 srv.serve_forever,
1085 extra_files=extra_files,
1086 exclude_patterns=exclude_patterns,
1087 interval=reloader_interval,
1088 reloader_type=reloader_type,
1089 )
1090 else:
1091 srv.serve_forever()