Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/serving.py: 17%
462 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
9.. code-block:: python
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
14import errno
15import io
16import os
17import socket
18import socketserver
19import sys
20import typing as t
21from datetime import datetime as dt
22from datetime import timedelta
23from datetime import timezone
24from http.server import BaseHTTPRequestHandler
25from http.server import HTTPServer
27from ._internal import _log
28from ._internal import _wsgi_encoding_dance
29from .exceptions import InternalServerError
30from .urls import uri_to_iri
31from .urls import url_parse
32from .urls import url_unquote
34try:
35 import ssl
36except ImportError:
38 class _SslDummy:
39 def __getattr__(self, name: str) -> t.Any:
40 raise RuntimeError( # noqa: B904
41 "SSL is unavailable because this Python runtime was not"
42 " compiled with SSL/TLS support."
43 )
45 ssl = _SslDummy() # type: ignore
47_log_add_style = True
49if os.name == "nt":
50 try:
51 __import__("colorama")
52 except ImportError:
53 _log_add_style = False
55can_fork = hasattr(os, "fork")
57if can_fork:
58 ForkingMixIn = socketserver.ForkingMixIn
59else:
61 class ForkingMixIn: # type: ignore
62 pass
65try:
66 af_unix = socket.AF_UNIX
67except AttributeError:
68 af_unix = None # type: ignore
70LISTEN_QUEUE = 128
72_TSSLContextArg = t.Optional[
73 t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], "te.Literal['adhoc']"]
74]
76if t.TYPE_CHECKING:
77 import typing_extensions as te # noqa: F401
78 from _typeshed.wsgi import WSGIApplication
79 from _typeshed.wsgi import WSGIEnvironment
80 from cryptography.hazmat.primitives.asymmetric.rsa import (
81 RSAPrivateKeyWithSerialization,
82 )
83 from cryptography.x509 import Certificate
86class DechunkedInput(io.RawIOBase):
87 """An input stream that handles Transfer-Encoding 'chunked'"""
89 def __init__(self, rfile: t.IO[bytes]) -> None:
90 self._rfile = rfile
91 self._done = False
92 self._len = 0
94 def readable(self) -> bool:
95 return True
97 def read_chunk_len(self) -> int:
98 try:
99 line = self._rfile.readline().decode("latin1")
100 _len = int(line.strip(), 16)
101 except ValueError as e:
102 raise OSError("Invalid chunk header") from e
103 if _len < 0:
104 raise OSError("Negative chunk length not allowed")
105 return _len
107 def readinto(self, buf: bytearray) -> int: # type: ignore
108 read = 0
109 while not self._done and read < len(buf):
110 if self._len == 0:
111 # This is the first chunk or we fully consumed the previous
112 # one. Read the next length of the next chunk
113 self._len = self.read_chunk_len()
115 if self._len == 0:
116 # Found the final chunk of size 0. The stream is now exhausted,
117 # but there is still a final newline that should be consumed
118 self._done = True
120 if self._len > 0:
121 # There is data (left) in this chunk, so append it to the
122 # buffer. If this operation fully consumes the chunk, this will
123 # reset self._len to 0.
124 n = min(len(buf), self._len)
126 # If (read + chunk size) becomes more than len(buf), buf will
127 # grow beyond the original size and read more data than
128 # required. So only read as much data as can fit in buf.
129 if read + n > len(buf):
130 buf[read:] = self._rfile.read(len(buf) - read)
131 self._len -= len(buf) - read
132 read = len(buf)
133 else:
134 buf[read : read + n] = self._rfile.read(n)
135 self._len -= n
136 read += n
138 if self._len == 0:
139 # Skip the terminating newline of a chunk that has been fully
140 # consumed. This also applies to the 0-sized final chunk
141 terminator = self._rfile.readline()
142 if terminator not in (b"\n", b"\r\n", b"\r"):
143 raise OSError("Missing chunk terminating newline")
145 return read
148class WSGIRequestHandler(BaseHTTPRequestHandler):
149 """A request handler that implements WSGI dispatching."""
151 server: "BaseWSGIServer"
153 @property
154 def server_version(self) -> str: # type: ignore
155 from . import __version__
157 return f"Werkzeug/{__version__}"
159 def make_environ(self) -> "WSGIEnvironment":
160 request_url = url_parse(self.path)
161 url_scheme = "http" if self.server.ssl_context is None else "https"
163 if not self.client_address:
164 self.client_address = ("<local>", 0)
165 elif isinstance(self.client_address, str):
166 self.client_address = (self.client_address, 0)
168 # If there was no scheme but the path started with two slashes,
169 # the first segment may have been incorrectly parsed as the
170 # netloc, prepend it to the path again.
171 if not request_url.scheme and request_url.netloc:
172 path_info = f"/{request_url.netloc}{request_url.path}"
173 else:
174 path_info = request_url.path
176 path_info = url_unquote(path_info)
178 environ: "WSGIEnvironment" = {
179 "wsgi.version": (1, 0),
180 "wsgi.url_scheme": url_scheme,
181 "wsgi.input": self.rfile,
182 "wsgi.errors": sys.stderr,
183 "wsgi.multithread": self.server.multithread,
184 "wsgi.multiprocess": self.server.multiprocess,
185 "wsgi.run_once": False,
186 "werkzeug.socket": self.connection,
187 "SERVER_SOFTWARE": self.server_version,
188 "REQUEST_METHOD": self.command,
189 "SCRIPT_NAME": "",
190 "PATH_INFO": _wsgi_encoding_dance(path_info),
191 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
192 # Non-standard, added by mod_wsgi, uWSGI
193 "REQUEST_URI": _wsgi_encoding_dance(self.path),
194 # Non-standard, added by gunicorn
195 "RAW_URI": _wsgi_encoding_dance(self.path),
196 "REMOTE_ADDR": self.address_string(),
197 "REMOTE_PORT": self.port_integer(),
198 "SERVER_NAME": self.server.server_address[0],
199 "SERVER_PORT": str(self.server.server_address[1]),
200 "SERVER_PROTOCOL": self.request_version,
201 }
203 for key, value in self.headers.items():
204 key = key.upper().replace("-", "_")
205 value = value.replace("\r\n", "")
206 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
207 key = f"HTTP_{key}"
208 if key in environ:
209 value = f"{environ[key]},{value}"
210 environ[key] = value
212 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
213 environ["wsgi.input_terminated"] = True
214 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
216 # Per RFC 2616, if the URL is absolute, use that as the host.
217 # We're using "has a scheme" to indicate an absolute URL.
218 if request_url.scheme and request_url.netloc:
219 environ["HTTP_HOST"] = request_url.netloc
221 try:
222 # binary_form=False gives nicer information, but wouldn't be compatible with
223 # what Nginx or Apache could return.
224 peer_cert = self.connection.getpeercert(binary_form=True)
225 if peer_cert is not None:
226 # Nginx and Apache use PEM format.
227 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
228 except ValueError:
229 # SSL handshake hasn't finished.
230 self.server.log("error", "Cannot fetch SSL peer certificate info")
231 except AttributeError:
232 # Not using TLS, the socket will not have getpeercert().
233 pass
235 return environ
237 def run_wsgi(self) -> None:
238 if self.headers.get("Expect", "").lower().strip() == "100-continue":
239 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
241 self.environ = environ = self.make_environ()
242 status_set: t.Optional[str] = None
243 headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None
244 status_sent: t.Optional[str] = None
245 headers_sent: t.Optional[t.List[t.Tuple[str, str]]] = None
246 chunk_response: bool = False
248 def write(data: bytes) -> None:
249 nonlocal status_sent, headers_sent, chunk_response
250 assert status_set is not None, "write() before start_response"
251 assert headers_set is not None, "write() before start_response"
252 if status_sent is None:
253 status_sent = status_set
254 headers_sent = headers_set
255 try:
256 code_str, msg = status_sent.split(None, 1)
257 except ValueError:
258 code_str, msg = status_sent, ""
259 code = int(code_str)
260 self.send_response(code, msg)
261 header_keys = set()
262 for key, value in headers_sent:
263 self.send_header(key, value)
264 header_keys.add(key.lower())
266 # Use chunked transfer encoding if there is no content
267 # length. Do not use for 1xx and 204 responses. 304
268 # responses and HEAD requests are also excluded, which
269 # is the more conservative behavior and matches other
270 # parts of the code.
271 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
272 if (
273 not (
274 "content-length" in header_keys
275 or environ["REQUEST_METHOD"] == "HEAD"
276 or (100 <= code < 200)
277 or code in {204, 304}
278 )
279 and self.protocol_version >= "HTTP/1.1"
280 ):
281 chunk_response = True
282 self.send_header("Transfer-Encoding", "chunked")
284 # Always close the connection. This disables HTTP/1.1
285 # keep-alive connections. They aren't handled well by
286 # Python's http.server because it doesn't know how to
287 # drain the stream before the next request line.
288 self.send_header("Connection", "close")
289 self.end_headers()
291 assert isinstance(data, bytes), "applications must write bytes"
293 if data:
294 if chunk_response:
295 self.wfile.write(hex(len(data))[2:].encode())
296 self.wfile.write(b"\r\n")
298 self.wfile.write(data)
300 if chunk_response:
301 self.wfile.write(b"\r\n")
303 self.wfile.flush()
305 def start_response(status, headers, exc_info=None): # type: ignore
306 nonlocal status_set, headers_set
307 if exc_info:
308 try:
309 if headers_sent:
310 raise exc_info[1].with_traceback(exc_info[2])
311 finally:
312 exc_info = None
313 elif headers_set:
314 raise AssertionError("Headers already set")
315 status_set = status
316 headers_set = headers
317 return write
319 def execute(app: "WSGIApplication") -> None:
320 application_iter = app(environ, start_response)
321 try:
322 for data in application_iter:
323 write(data)
324 if not headers_sent:
325 write(b"")
326 if chunk_response:
327 self.wfile.write(b"0\r\n\r\n")
328 finally:
329 if hasattr(application_iter, "close"):
330 application_iter.close()
332 try:
333 execute(self.server.app)
334 except (ConnectionError, socket.timeout) as e:
335 self.connection_dropped(e, environ)
336 except Exception as e:
337 if self.server.passthrough_errors:
338 raise
340 if status_sent is not None and chunk_response:
341 self.close_connection = True
343 try:
344 # if we haven't yet sent the headers but they are set
345 # we roll back to be able to set them again.
346 if status_sent is None:
347 status_set = None
348 headers_set = None
349 execute(InternalServerError())
350 except Exception:
351 pass
353 from .debug.tbtools import DebugTraceback
355 msg = DebugTraceback(e).render_traceback_text()
356 self.server.log("error", f"Error on request:\n{msg}")
358 def handle(self) -> None:
359 """Handles a request ignoring dropped connections."""
360 try:
361 super().handle()
362 except (ConnectionError, socket.timeout) as e:
363 self.connection_dropped(e)
364 except Exception as e:
365 if self.server.ssl_context is not None and is_ssl_error(e):
366 self.log_error("SSL error occurred: %s", e)
367 else:
368 raise
370 def connection_dropped(
371 self, error: BaseException, environ: t.Optional["WSGIEnvironment"] = None
372 ) -> None:
373 """Called if the connection was closed by the client. By default
374 nothing happens.
375 """
377 def __getattr__(self, name: str) -> t.Any:
378 # All HTTP methods are handled by run_wsgi.
379 if name.startswith("do_"):
380 return self.run_wsgi
382 # All other attributes are forwarded to the base class.
383 return getattr(super(), name)
385 def address_string(self) -> str:
386 if getattr(self, "environ", None):
387 return self.environ["REMOTE_ADDR"] # type: ignore
389 if not self.client_address:
390 return "<local>"
392 return self.client_address[0]
394 def port_integer(self) -> int:
395 return self.client_address[1]
397 def log_request(
398 self, code: t.Union[int, str] = "-", size: t.Union[int, str] = "-"
399 ) -> None:
400 try:
401 path = uri_to_iri(self.path)
402 msg = f"{self.command} {path} {self.request_version}"
403 except AttributeError:
404 # path isn't set if the requestline was bad
405 msg = self.requestline
407 code = str(code)
409 if code[0] == "1": # 1xx - Informational
410 msg = _ansi_style(msg, "bold")
411 elif code == "200": # 2xx - Success
412 pass
413 elif code == "304": # 304 - Resource Not Modified
414 msg = _ansi_style(msg, "cyan")
415 elif code[0] == "3": # 3xx - Redirection
416 msg = _ansi_style(msg, "green")
417 elif code == "404": # 404 - Resource Not Found
418 msg = _ansi_style(msg, "yellow")
419 elif code[0] == "4": # 4xx - Client Error
420 msg = _ansi_style(msg, "bold", "red")
421 else: # 5xx, or any other response
422 msg = _ansi_style(msg, "bold", "magenta")
424 self.log("info", '"%s" %s %s', msg, code, size)
426 def log_error(self, format: str, *args: t.Any) -> None:
427 self.log("error", format, *args)
429 def log_message(self, format: str, *args: t.Any) -> None:
430 self.log("info", format, *args)
432 def log(self, type: str, message: str, *args: t.Any) -> None:
433 _log(
434 type,
435 f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n",
436 *args,
437 )
440def _ansi_style(value: str, *styles: str) -> str:
441 if not _log_add_style:
442 return value
444 codes = {
445 "bold": 1,
446 "red": 31,
447 "green": 32,
448 "yellow": 33,
449 "magenta": 35,
450 "cyan": 36,
451 }
453 for style in styles:
454 value = f"\x1b[{codes[style]}m{value}"
456 return f"{value}\x1b[0m"
459def generate_adhoc_ssl_pair(
460 cn: t.Optional[str] = None,
461) -> t.Tuple["Certificate", "RSAPrivateKeyWithSerialization"]:
462 try:
463 from cryptography import x509
464 from cryptography.x509.oid import NameOID
465 from cryptography.hazmat.backends import default_backend
466 from cryptography.hazmat.primitives import hashes
467 from cryptography.hazmat.primitives.asymmetric import rsa
468 except ImportError:
469 raise TypeError(
470 "Using ad-hoc certificates requires the cryptography library."
471 ) from None
473 backend = default_backend()
474 pkey = rsa.generate_private_key(
475 public_exponent=65537, key_size=2048, backend=backend
476 )
478 # pretty damn sure that this is not actually accepted by anyone
479 if cn is None:
480 cn = "*"
482 subject = x509.Name(
483 [
484 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
485 x509.NameAttribute(NameOID.COMMON_NAME, cn),
486 ]
487 )
489 backend = default_backend()
490 cert = (
491 x509.CertificateBuilder()
492 .subject_name(subject)
493 .issuer_name(subject)
494 .public_key(pkey.public_key())
495 .serial_number(x509.random_serial_number())
496 .not_valid_before(dt.now(timezone.utc))
497 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
498 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
499 .add_extension(x509.SubjectAlternativeName([x509.DNSName(cn)]), critical=False)
500 .sign(pkey, hashes.SHA256(), backend)
501 )
502 return cert, pkey
505def make_ssl_devcert(
506 base_path: str, host: t.Optional[str] = None, cn: t.Optional[str] = None
507) -> t.Tuple[str, str]:
508 """Creates an SSL key for development. This should be used instead of
509 the ``'adhoc'`` key which generates a new cert on each server start.
510 It accepts a path for where it should store the key and cert and
511 either a host or CN. If a host is given it will use the CN
512 ``*.host/CN=host``.
514 For more information see :func:`run_simple`.
516 .. versionadded:: 0.9
518 :param base_path: the path to the certificate and key. The extension
519 ``.crt`` is added for the certificate, ``.key`` is
520 added for the key.
521 :param host: the name of the host. This can be used as an alternative
522 for the `cn`.
523 :param cn: the `CN` to use.
524 """
526 if host is not None:
527 cn = f"*.{host}/CN={host}"
528 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
530 from cryptography.hazmat.primitives import serialization
532 cert_file = f"{base_path}.crt"
533 pkey_file = f"{base_path}.key"
535 with open(cert_file, "wb") as f:
536 f.write(cert.public_bytes(serialization.Encoding.PEM))
537 with open(pkey_file, "wb") as f:
538 f.write(
539 pkey.private_bytes(
540 encoding=serialization.Encoding.PEM,
541 format=serialization.PrivateFormat.TraditionalOpenSSL,
542 encryption_algorithm=serialization.NoEncryption(),
543 )
544 )
546 return cert_file, pkey_file
549def generate_adhoc_ssl_context() -> "ssl.SSLContext":
550 """Generates an adhoc SSL context for the development server."""
551 import tempfile
552 import atexit
554 cert, pkey = generate_adhoc_ssl_pair()
556 from cryptography.hazmat.primitives import serialization
558 cert_handle, cert_file = tempfile.mkstemp()
559 pkey_handle, pkey_file = tempfile.mkstemp()
560 atexit.register(os.remove, pkey_file)
561 atexit.register(os.remove, cert_file)
563 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
564 os.write(
565 pkey_handle,
566 pkey.private_bytes(
567 encoding=serialization.Encoding.PEM,
568 format=serialization.PrivateFormat.TraditionalOpenSSL,
569 encryption_algorithm=serialization.NoEncryption(),
570 ),
571 )
573 os.close(cert_handle)
574 os.close(pkey_handle)
575 ctx = load_ssl_context(cert_file, pkey_file)
576 return ctx
579def load_ssl_context(
580 cert_file: str, pkey_file: t.Optional[str] = None, protocol: t.Optional[int] = None
581) -> "ssl.SSLContext":
582 """Loads SSL context from cert/private key files and optional protocol.
583 Many parameters are directly taken from the API of
584 :py:class:`ssl.SSLContext`.
586 :param cert_file: Path of the certificate to use.
587 :param pkey_file: Path of the private key to use. If not given, the key
588 will be obtained from the certificate file.
589 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
590 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
591 """
592 if protocol is None:
593 protocol = ssl.PROTOCOL_TLS_SERVER
595 ctx = ssl.SSLContext(protocol)
596 ctx.load_cert_chain(cert_file, pkey_file)
597 return ctx
600def is_ssl_error(error: t.Optional[Exception] = None) -> bool:
601 """Checks if the given error (or the current one) is an SSL error."""
602 if error is None:
603 error = t.cast(Exception, sys.exc_info()[1])
604 return isinstance(error, ssl.SSLError)
607def select_address_family(host: str, port: int) -> socket.AddressFamily:
608 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
609 the host and port."""
610 if host.startswith("unix://"):
611 return socket.AF_UNIX
612 elif ":" in host and hasattr(socket, "AF_INET6"):
613 return socket.AF_INET6
614 return socket.AF_INET
617def get_sockaddr(
618 host: str, port: int, family: socket.AddressFamily
619) -> t.Union[t.Tuple[str, int], str]:
620 """Return a fully qualified socket address that can be passed to
621 :func:`socket.bind`."""
622 if family == af_unix:
623 return host.split("://", 1)[1]
624 try:
625 res = socket.getaddrinfo(
626 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
627 )
628 except socket.gaierror:
629 return host, port
630 return res[0][4] # type: ignore
633def get_interface_ip(family: socket.AddressFamily) -> str:
634 """Get the IP address of an external interface. Used when binding to
635 0.0.0.0 or ::1 to show a more useful URL.
637 :meta private:
638 """
639 # arbitrary private address
640 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
642 with socket.socket(family, socket.SOCK_DGRAM) as s:
643 try:
644 s.connect((host, 58162))
645 except OSError:
646 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
648 return s.getsockname()[0] # type: ignore
651class BaseWSGIServer(HTTPServer):
652 """A WSGI server that that handles one request at a time.
654 Use :func:`make_server` to create a server instance.
655 """
657 multithread = False
658 multiprocess = False
659 request_queue_size = LISTEN_QUEUE
660 allow_reuse_address = True
662 def __init__(
663 self,
664 host: str,
665 port: int,
666 app: "WSGIApplication",
667 handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
668 passthrough_errors: bool = False,
669 ssl_context: t.Optional[_TSSLContextArg] = None,
670 fd: t.Optional[int] = None,
671 ) -> None:
672 if handler is None:
673 handler = WSGIRequestHandler
675 # If the handler doesn't directly set a protocol version and
676 # thread or process workers are used, then allow chunked
677 # responses and keep-alive connections by enabling HTTP/1.1.
678 if "protocol_version" not in vars(handler) and (
679 self.multithread or self.multiprocess
680 ):
681 handler.protocol_version = "HTTP/1.1"
683 self.host = host
684 self.port = port
685 self.app = app
686 self.passthrough_errors = passthrough_errors
688 self.address_family = address_family = select_address_family(host, port)
689 server_address = get_sockaddr(host, int(port), address_family)
691 # Remove a leftover Unix socket file from a previous run. Don't
692 # remove a file that was set up by run_simple.
693 if address_family == af_unix and fd is None:
694 server_address = t.cast(str, server_address)
696 if os.path.exists(server_address):
697 os.unlink(server_address)
699 # Bind and activate will be handled manually, it should only
700 # happen if we're not using a socket that was already set up.
701 super().__init__(
702 server_address, # type: ignore[arg-type]
703 handler,
704 bind_and_activate=False,
705 )
707 if fd is None:
708 # No existing socket descriptor, do bind_and_activate=True.
709 try:
710 self.server_bind()
711 self.server_activate()
712 except OSError as e:
713 # Catch connection issues and show them without the traceback. Show
714 # extra instructions for address not found, and for macOS.
715 self.server_close()
716 print(e.strerror, file=sys.stderr)
718 if e.errno == errno.EADDRINUSE:
719 print(
720 f"Port {port} is in use by another program. Either identify and"
721 " stop that program, or start the server with a different"
722 " port.",
723 file=sys.stderr,
724 )
726 if sys.platform == "darwin" and port == 5000:
727 print(
728 "On macOS, try disabling the 'AirPlay Receiver' service"
729 " from System Preferences -> Sharing.",
730 file=sys.stderr,
731 )
733 sys.exit(1)
734 except BaseException:
735 self.server_close()
736 raise
737 else:
738 # TCPServer automatically opens a socket even if bind_and_activate is False.
739 # Close it to silence a ResourceWarning.
740 self.server_close()
742 # Use the passed in socket directly.
743 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
744 self.server_address = self.socket.getsockname()
746 if address_family != af_unix:
747 # If port was 0, this will record the bound port.
748 self.port = self.server_address[1]
750 if ssl_context is not None:
751 if isinstance(ssl_context, tuple):
752 ssl_context = load_ssl_context(*ssl_context)
753 elif ssl_context == "adhoc":
754 ssl_context = generate_adhoc_ssl_context()
756 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
757 self.ssl_context: t.Optional["ssl.SSLContext"] = ssl_context
758 else:
759 self.ssl_context = None
761 def log(self, type: str, message: str, *args: t.Any) -> None:
762 _log(type, message, *args)
764 def serve_forever(self, poll_interval: float = 0.5) -> None:
765 try:
766 super().serve_forever(poll_interval=poll_interval)
767 except KeyboardInterrupt:
768 pass
769 finally:
770 self.server_close()
772 def handle_error(
773 self, request: t.Any, client_address: t.Union[t.Tuple[str, int], str]
774 ) -> None:
775 if self.passthrough_errors:
776 raise
778 return super().handle_error(request, client_address)
780 def log_startup(self) -> None:
781 """Show information about the address when starting the server."""
782 dev_warning = (
783 "WARNING: This is a development server. Do not use it in a production"
784 " deployment. Use a production WSGI server instead."
785 )
786 dev_warning = _ansi_style(dev_warning, "bold", "red")
787 messages = [dev_warning]
789 if self.address_family == af_unix:
790 messages.append(f" * Running on {self.host}")
791 else:
792 scheme = "http" if self.ssl_context is None else "https"
793 display_hostname = self.host
795 if self.host in {"0.0.0.0", "::"}:
796 messages.append(f" * Running on all addresses ({self.host})")
798 if self.host == "0.0.0.0":
799 localhost = "127.0.0.1"
800 display_hostname = get_interface_ip(socket.AF_INET)
801 else:
802 localhost = "[::1]"
803 display_hostname = get_interface_ip(socket.AF_INET6)
805 messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
807 if ":" in display_hostname:
808 display_hostname = f"[{display_hostname}]"
810 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
812 _log("info", "\n".join(messages))
815class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
816 """A WSGI server that handles concurrent requests in separate
817 threads.
819 Use :func:`make_server` to create a server instance.
820 """
822 multithread = True
823 daemon_threads = True
826class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
827 """A WSGI server that handles concurrent requests in separate forked
828 processes.
830 Use :func:`make_server` to create a server instance.
831 """
833 multiprocess = True
835 def __init__(
836 self,
837 host: str,
838 port: int,
839 app: "WSGIApplication",
840 processes: int = 40,
841 handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
842 passthrough_errors: bool = False,
843 ssl_context: t.Optional[_TSSLContextArg] = None,
844 fd: t.Optional[int] = None,
845 ) -> None:
846 if not can_fork:
847 raise ValueError("Your platform does not support forking.")
849 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
850 self.max_children = processes
853def make_server(
854 host: str,
855 port: int,
856 app: "WSGIApplication",
857 threaded: bool = False,
858 processes: int = 1,
859 request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
860 passthrough_errors: bool = False,
861 ssl_context: t.Optional[_TSSLContextArg] = None,
862 fd: t.Optional[int] = None,
863) -> BaseWSGIServer:
864 """Create an appropriate WSGI server instance based on the value of
865 ``threaded`` and ``processes``.
867 This is called from :func:`run_simple`, but can be used separately
868 to have access to the server object, such as to run it in a separate
869 thread.
871 See :func:`run_simple` for parameter docs.
872 """
873 if threaded and processes > 1:
874 raise ValueError("Cannot have a multi-thread and multi-process server.")
876 if threaded:
877 return ThreadedWSGIServer(
878 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
879 )
881 if processes > 1:
882 return ForkingWSGIServer(
883 host,
884 port,
885 app,
886 processes,
887 request_handler,
888 passthrough_errors,
889 ssl_context,
890 fd=fd,
891 )
893 return BaseWSGIServer(
894 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
895 )
898def is_running_from_reloader() -> bool:
899 """Check if the server is running as a subprocess within the
900 Werkzeug reloader.
902 .. versionadded:: 0.10
903 """
904 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
907def run_simple(
908 hostname: str,
909 port: int,
910 application: "WSGIApplication",
911 use_reloader: bool = False,
912 use_debugger: bool = False,
913 use_evalex: bool = True,
914 extra_files: t.Optional[t.Iterable[str]] = None,
915 exclude_patterns: t.Optional[t.Iterable[str]] = None,
916 reloader_interval: int = 1,
917 reloader_type: str = "auto",
918 threaded: bool = False,
919 processes: int = 1,
920 request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
921 static_files: t.Optional[t.Dict[str, t.Union[str, t.Tuple[str, str]]]] = None,
922 passthrough_errors: bool = False,
923 ssl_context: t.Optional[_TSSLContextArg] = None,
924) -> None:
925 """Start a development server for a WSGI application. Various
926 optional features can be enabled.
928 .. warning::
930 Do not use the development server when deploying to production.
931 It is intended for use only during local development. It is not
932 designed to be particularly efficient, stable, or secure.
934 :param hostname: The host to bind to, for example ``'localhost'``.
935 Can be a domain, IPv4 or IPv6 address, or file path starting
936 with ``unix://`` for a Unix socket.
937 :param port: The port to bind to, for example ``8080``. Using ``0``
938 tells the OS to pick a random free port.
939 :param application: The WSGI application to run.
940 :param use_reloader: Use a reloader process to restart the server
941 process when files are changed.
942 :param use_debugger: Use Werkzeug's debugger, which will show
943 formatted tracebacks on unhandled exceptions.
944 :param use_evalex: Make the debugger interactive. A Python terminal
945 can be opened for any frame in the traceback. Some protection is
946 provided by requiring a PIN, but this should never be enabled
947 on a publicly visible server.
948 :param extra_files: The reloader will watch these files for changes
949 in addition to Python modules. For example, watch a
950 configuration file.
951 :param exclude_patterns: The reloader will ignore changes to any
952 files matching these :mod:`fnmatch` patterns. For example,
953 ignore cache files.
954 :param reloader_interval: How often the reloader tries to check for
955 changes.
956 :param reloader_type: The reloader to use. The ``'stat'`` reloader
957 is built in, but may require significant CPU to watch files. The
958 ``'watchdog'`` reloader is much more efficient but requires
959 installing the ``watchdog`` package first.
960 :param threaded: Handle concurrent requests using threads. Cannot be
961 used with ``processes``.
962 :param processes: Handle concurrent requests using up to this number
963 of processes. Cannot be used with ``threaded``.
964 :param request_handler: Use a different
965 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
966 handle requests.
967 :param static_files: A dict mapping URL prefixes to directories to
968 serve static files from using
969 :class:`~werkzeug.middleware.SharedDataMiddleware`.
970 :param passthrough_errors: Don't catch unhandled exceptions at the
971 server level, let the serve crash instead. If ``use_debugger``
972 is enabled, the debugger will still catch such errors.
973 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
974 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
975 tuple to create a typical context, or the string ``'adhoc'`` to
976 generate a temporary self-signed certificate.
978 .. versionchanged:: 2.1
979 Instructions are shown for dealing with an "address already in
980 use" error.
982 .. versionchanged:: 2.1
983 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
984 addition to a real IP.
986 .. versionchanged:: 2.1
987 The command-line interface was removed.
989 .. versionchanged:: 2.0
990 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
991 was bound as well as a warning not to run the development server
992 in production.
994 .. versionchanged:: 2.0
995 The ``exclude_patterns`` parameter was added.
997 .. versionchanged:: 0.15
998 Bind to a Unix socket by passing a ``hostname`` that starts with
999 ``unix://``.
1001 .. versionchanged:: 0.10
1002 Improved the reloader and added support for changing the backend
1003 through the ``reloader_type`` parameter.
1005 .. versionchanged:: 0.9
1006 A command-line interface was added.
1008 .. versionchanged:: 0.8
1009 ``ssl_context`` can be a tuple of paths to the certificate and
1010 private key files.
1012 .. versionchanged:: 0.6
1013 The ``ssl_context`` parameter was added.
1015 .. versionchanged:: 0.5
1016 The ``static_files`` and ``passthrough_errors`` parameters were
1017 added.
1018 """
1019 if not isinstance(port, int):
1020 raise TypeError("port must be an integer")
1022 if static_files:
1023 from .middleware.shared_data import SharedDataMiddleware
1025 application = SharedDataMiddleware(application, static_files)
1027 if use_debugger:
1028 from .debug import DebuggedApplication
1030 application = DebuggedApplication(application, evalex=use_evalex)
1032 if not is_running_from_reloader():
1033 fd = None
1034 else:
1035 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1037 srv = make_server(
1038 hostname,
1039 port,
1040 application,
1041 threaded,
1042 processes,
1043 request_handler,
1044 passthrough_errors,
1045 ssl_context,
1046 fd=fd,
1047 )
1048 srv.socket.set_inheritable(True)
1049 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno())
1051 if not is_running_from_reloader():
1052 srv.log_startup()
1053 _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
1055 if use_reloader:
1056 from ._reloader import run_with_reloader
1058 try:
1059 run_with_reloader(
1060 srv.serve_forever,
1061 extra_files=extra_files,
1062 exclude_patterns=exclude_patterns,
1063 interval=reloader_interval,
1064 reloader_type=reloader_type,
1065 )
1066 finally:
1067 srv.server_close()
1068 else:
1069 srv.serve_forever()