1"""A WSGI and HTTP server for use **during development only**. This
2server is convenient to use, but is not designed to be particularly
3stable, secure, or efficient. Use a dedicate WSGI server and HTTP
4server when deploying to production.
5
6It provides features like interactive debugging and code reloading. Use
7``run_simple`` to start the server. Put this in a ``run.py`` script:
8
9.. code-block:: python
10
11 from myapp import create_app
12 from werkzeug import run_simple
13"""
14
15from __future__ import annotations
16
17import errno
18import io
19import os
20import selectors
21import socket
22import socketserver
23import sys
24import typing as t
25from datetime import datetime as dt
26from datetime import timedelta
27from datetime import timezone
28from http.server import BaseHTTPRequestHandler
29from http.server import HTTPServer
30from urllib.parse import unquote
31from urllib.parse import urlsplit
32
33from ._internal import _log
34from ._internal import _wsgi_encoding_dance
35from .exceptions import InternalServerError
36from .urls import uri_to_iri
37
38try:
39 import ssl
40
41 connection_dropped_errors: tuple[type[Exception], ...] = (
42 ConnectionError,
43 socket.timeout,
44 ssl.SSLEOFError,
45 )
46except ImportError:
47
48 class _SslDummy:
49 def __getattr__(self, name: str) -> t.Any:
50 raise RuntimeError( # noqa: B904
51 "SSL is unavailable because this Python runtime was not"
52 " compiled with SSL/TLS support."
53 )
54
55 ssl = _SslDummy() # type: ignore
56 connection_dropped_errors = (ConnectionError, socket.timeout)
57
58_log_add_style = True
59
60if os.name == "nt":
61 try:
62 __import__("colorama")
63 except ImportError:
64 _log_add_style = False
65
66can_fork = hasattr(os, "fork")
67
68if can_fork:
69 ForkingMixIn = socketserver.ForkingMixIn
70else:
71
72 class ForkingMixIn: # type: ignore
73 pass
74
75
76try:
77 af_unix = socket.AF_UNIX
78except AttributeError:
79 af_unix = None # type: ignore
80
81LISTEN_QUEUE = 128
82
83_TSSLContextArg = t.Optional[
84 t.Union["ssl.SSLContext", tuple[str, t.Optional[str]], t.Literal["adhoc"]]
85]
86
87if t.TYPE_CHECKING:
88 from _typeshed.wsgi import WSGIApplication
89 from _typeshed.wsgi import WSGIEnvironment
90 from cryptography.hazmat.primitives.asymmetric.rsa import (
91 RSAPrivateKeyWithSerialization,
92 )
93 from cryptography.x509 import Certificate
94
95
96class DechunkedInput(io.RawIOBase):
97 """An input stream that handles Transfer-Encoding 'chunked'"""
98
99 def __init__(self, rfile: t.IO[bytes]) -> None:
100 self._rfile = rfile
101 self._done = False
102 self._len = 0
103
104 def readable(self) -> bool:
105 return True
106
107 def read_chunk_len(self) -> int:
108 try:
109 line = self._rfile.readline().decode("latin1")
110 _len = int(line.strip(), 16)
111 except ValueError as e:
112 raise OSError("Invalid chunk header") from e
113 if _len < 0:
114 raise OSError("Negative chunk length not allowed")
115 return _len
116
117 def readinto(self, buf: bytearray) -> int: # type: ignore
118 read = 0
119 while not self._done and read < len(buf):
120 if self._len == 0:
121 # This is the first chunk or we fully consumed the previous
122 # one. Read the next length of the next chunk
123 self._len = self.read_chunk_len()
124
125 if self._len == 0:
126 # Found the final chunk of size 0. The stream is now exhausted,
127 # but there is still a final newline that should be consumed
128 self._done = True
129
130 if self._len > 0:
131 # There is data (left) in this chunk, so append it to the
132 # buffer. If this operation fully consumes the chunk, this will
133 # reset self._len to 0.
134 n = min(len(buf), self._len)
135
136 # If (read + chunk size) becomes more than len(buf), buf will
137 # grow beyond the original size and read more data than
138 # required. So only read as much data as can fit in buf.
139 if read + n > len(buf):
140 buf[read:] = self._rfile.read(len(buf) - read)
141 self._len -= len(buf) - read
142 read = len(buf)
143 else:
144 buf[read : read + n] = self._rfile.read(n)
145 self._len -= n
146 read += n
147
148 if self._len == 0:
149 # Skip the terminating newline of a chunk that has been fully
150 # consumed. This also applies to the 0-sized final chunk
151 terminator = self._rfile.readline()
152 if terminator not in (b"\n", b"\r\n", b"\r"):
153 raise OSError("Missing chunk terminating newline")
154
155 return read
156
157
158class WSGIRequestHandler(BaseHTTPRequestHandler):
159 """A request handler that implements WSGI dispatching."""
160
161 server: BaseWSGIServer
162
163 @property
164 def server_version(self) -> str: # type: ignore
165 return self.server._server_version
166
167 def make_environ(self) -> WSGIEnvironment:
168 request_url = urlsplit(self.path)
169 url_scheme = "http" if self.server.ssl_context is None else "https"
170
171 if not self.client_address:
172 self.client_address = ("<local>", 0)
173 elif isinstance(self.client_address, str):
174 self.client_address = (self.client_address, 0)
175
176 # If there was no scheme but the path started with two slashes,
177 # the first segment may have been incorrectly parsed as the
178 # netloc, prepend it to the path again.
179 if not request_url.scheme and request_url.netloc:
180 path_info = f"/{request_url.netloc}{request_url.path}"
181 else:
182 path_info = request_url.path
183
184 path_info = unquote(path_info)
185
186 environ: WSGIEnvironment = {
187 "wsgi.version": (1, 0),
188 "wsgi.url_scheme": url_scheme,
189 "wsgi.input": self.rfile,
190 "wsgi.errors": sys.stderr,
191 "wsgi.multithread": self.server.multithread,
192 "wsgi.multiprocess": self.server.multiprocess,
193 "wsgi.run_once": False,
194 "werkzeug.socket": self.connection,
195 "SERVER_SOFTWARE": self.server_version,
196 "REQUEST_METHOD": self.command,
197 "SCRIPT_NAME": "",
198 "PATH_INFO": _wsgi_encoding_dance(path_info),
199 "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
200 # Non-standard, added by mod_wsgi, uWSGI
201 "REQUEST_URI": _wsgi_encoding_dance(self.path),
202 # Non-standard, added by gunicorn
203 "RAW_URI": _wsgi_encoding_dance(self.path),
204 "REMOTE_ADDR": self.address_string(),
205 "REMOTE_PORT": self.port_integer(),
206 "SERVER_NAME": self.server.server_address[0],
207 "SERVER_PORT": str(self.server.server_address[1]),
208 "SERVER_PROTOCOL": self.request_version,
209 }
210
211 for key, value in self.headers.items():
212 if "_" in key:
213 continue
214
215 key = key.upper().replace("-", "_")
216 value = value.replace("\r\n", "")
217 if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
218 key = f"HTTP_{key}"
219 if key in environ:
220 value = f"{environ[key]},{value}"
221 environ[key] = value
222
223 if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
224 environ["wsgi.input_terminated"] = True
225 environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
226
227 # Per RFC 2616, if the URL is absolute, use that as the host.
228 # We're using "has a scheme" to indicate an absolute URL.
229 if request_url.scheme and request_url.netloc:
230 environ["HTTP_HOST"] = request_url.netloc
231
232 try:
233 # binary_form=False gives nicer information, but wouldn't be compatible with
234 # what Nginx or Apache could return.
235 peer_cert = self.connection.getpeercert(binary_form=True)
236 if peer_cert is not None:
237 # Nginx and Apache use PEM format.
238 environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
239 except ValueError:
240 # SSL handshake hasn't finished.
241 self.server.log("error", "Cannot fetch SSL peer certificate info")
242 except AttributeError:
243 # Not using TLS, the socket will not have getpeercert().
244 pass
245
246 return environ
247
248 def run_wsgi(self) -> None:
249 if self.headers.get("Expect", "").lower().strip() == "100-continue":
250 self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
251
252 self.environ = environ = self.make_environ()
253 status_set: str | None = None
254 headers_set: list[tuple[str, str]] | None = None
255 status_sent: str | None = None
256 headers_sent: list[tuple[str, str]] | None = None
257 chunk_response: bool = False
258
259 def write(data: bytes) -> None:
260 nonlocal status_sent, headers_sent, chunk_response
261 assert status_set is not None, "write() before start_response"
262 assert headers_set is not None, "write() before start_response"
263 if status_sent is None:
264 status_sent = status_set
265 headers_sent = headers_set
266 try:
267 code_str, msg = status_sent.split(None, 1)
268 except ValueError:
269 code_str, msg = status_sent, ""
270 code = int(code_str)
271 self.send_response(code, msg)
272 header_keys = set()
273 for key, value in headers_sent:
274 self.send_header(key, value)
275 header_keys.add(key.lower())
276
277 # Use chunked transfer encoding if there is no content
278 # length. Do not use for 1xx and 204 responses. 304
279 # responses and HEAD requests are also excluded, which
280 # is the more conservative behavior and matches other
281 # parts of the code.
282 # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
283 if (
284 not (
285 "content-length" in header_keys
286 or environ["REQUEST_METHOD"] == "HEAD"
287 or (100 <= code < 200)
288 or code in {204, 304}
289 )
290 and self.protocol_version >= "HTTP/1.1"
291 ):
292 chunk_response = True
293 self.send_header("Transfer-Encoding", "chunked")
294
295 # Always close the connection. This disables HTTP/1.1
296 # keep-alive connections. They aren't handled well by
297 # Python's http.server because it doesn't know how to
298 # drain the stream before the next request line.
299 self.send_header("Connection", "close")
300 self.end_headers()
301
302 assert isinstance(data, bytes), "applications must write bytes"
303
304 if data:
305 if chunk_response:
306 self.wfile.write(hex(len(data))[2:].encode())
307 self.wfile.write(b"\r\n")
308
309 self.wfile.write(data)
310
311 if chunk_response:
312 self.wfile.write(b"\r\n")
313
314 self.wfile.flush()
315
316 def start_response(status, headers, exc_info=None): # type: ignore
317 nonlocal status_set, headers_set
318 if exc_info:
319 try:
320 if headers_sent:
321 raise exc_info[1].with_traceback(exc_info[2])
322 finally:
323 exc_info = None
324 elif headers_set:
325 raise AssertionError("Headers already set")
326 status_set = status
327 headers_set = headers
328 return write
329
330 def execute(app: WSGIApplication) -> None:
331 application_iter = app(environ, start_response)
332 try:
333 for data in application_iter:
334 write(data)
335 if not headers_sent:
336 write(b"")
337 if chunk_response:
338 self.wfile.write(b"0\r\n\r\n")
339 finally:
340 # Check for any remaining data in the read socket, and discard it. This
341 # will read past request.max_content_length, but lets the client see a
342 # 413 response instead of a connection reset failure. If we supported
343 # keep-alive connections, this naive approach would break by reading the
344 # next request line. Since we know that write (above) closes every
345 # connection we can read everything.
346 selector = selectors.DefaultSelector()
347 selector.register(self.connection, selectors.EVENT_READ)
348 total_size = 0
349 total_reads = 0
350
351 # A timeout of 0 tends to fail because a client needs a small amount of
352 # time to continue sending its data.
353 while selector.select(timeout=0.01):
354 # Only read 10MB into memory at a time.
355 data = self.rfile.read(10_000_000)
356 total_size += len(data)
357 total_reads += 1
358
359 # Stop reading on no data, >=10GB, or 1000 reads. If a client sends
360 # more than that, they'll get a connection reset failure.
361 if not data or total_size >= 10_000_000_000 or total_reads > 1000:
362 break
363
364 selector.close()
365
366 if hasattr(application_iter, "close"):
367 application_iter.close()
368
369 try:
370 execute(self.server.app)
371 except connection_dropped_errors as e:
372 self.connection_dropped(e, environ)
373 except Exception as e:
374 if self.server.passthrough_errors:
375 raise
376
377 if status_sent is not None and chunk_response:
378 self.close_connection = True
379
380 try:
381 # if we haven't yet sent the headers but they are set
382 # we roll back to be able to set them again.
383 if status_sent is None:
384 status_set = None
385 headers_set = None
386 execute(InternalServerError())
387 except Exception:
388 pass
389
390 from .debug.tbtools import DebugTraceback
391
392 msg = DebugTraceback(e).render_traceback_text()
393 self.server.log("error", f"Error on request:\n{msg}")
394
395 def handle(self) -> None:
396 """Handles a request ignoring dropped connections."""
397 try:
398 super().handle()
399 except (ConnectionError, socket.timeout) as e:
400 self.connection_dropped(e)
401 except Exception as e:
402 if self.server.ssl_context is not None and is_ssl_error(e):
403 self.log_error("SSL error occurred: %s", e)
404 else:
405 raise
406
407 def connection_dropped(
408 self, error: BaseException, environ: WSGIEnvironment | None = None
409 ) -> None:
410 """Called if the connection was closed by the client. By default
411 nothing happens.
412 """
413
414 def __getattr__(self, name: str) -> t.Any:
415 # All HTTP methods are handled by run_wsgi.
416 if name.startswith("do_"):
417 return self.run_wsgi
418
419 # All other attributes are forwarded to the base class.
420 return getattr(super(), name)
421
422 def address_string(self) -> str:
423 if getattr(self, "environ", None):
424 return self.environ["REMOTE_ADDR"] # type: ignore
425
426 if not self.client_address:
427 return "<local>"
428
429 return self.client_address[0]
430
431 def port_integer(self) -> int:
432 return self.client_address[1]
433
434 # Escape control characters. This is defined (but private) in Python 3.12.
435 _control_char_table = str.maketrans(
436 {c: rf"\x{c:02x}" for c in [*range(0x20), *range(0x7F, 0xA0)]}
437 )
438 _control_char_table[ord("\\")] = r"\\"
439
440 def log_request(self, code: int | str = "-", size: int | str = "-") -> None:
441 try:
442 path = uri_to_iri(self.path)
443 msg = f"{self.command} {path} {self.request_version}"
444 except AttributeError:
445 # path isn't set if the requestline was bad
446 msg = self.requestline
447
448 # Escape control characters that may be in the decoded path.
449 msg = msg.translate(self._control_char_table)
450 code = str(code)
451
452 if code[0] == "1": # 1xx - Informational
453 msg = _ansi_style(msg, "bold")
454 elif code == "200": # 2xx - Success
455 pass
456 elif code == "304": # 304 - Resource Not Modified
457 msg = _ansi_style(msg, "cyan")
458 elif code[0] == "3": # 3xx - Redirection
459 msg = _ansi_style(msg, "green")
460 elif code == "404": # 404 - Resource Not Found
461 msg = _ansi_style(msg, "yellow")
462 elif code[0] == "4": # 4xx - Client Error
463 msg = _ansi_style(msg, "bold", "red")
464 else: # 5xx, or any other response
465 msg = _ansi_style(msg, "bold", "magenta")
466
467 self.log("info", '"%s" %s %s', msg, code, size)
468
469 def log_error(self, format: str, *args: t.Any) -> None:
470 self.log("error", format, *args)
471
472 def log_message(self, format: str, *args: t.Any) -> None:
473 self.log("info", format, *args)
474
475 def log(self, type: str, message: str, *args: t.Any) -> None:
476 # an IPv6 scoped address contains "%" which breaks logging
477 address_string = self.address_string().replace("%", "%%")
478 _log(
479 type,
480 f"{address_string} - - [{self.log_date_time_string()}] {message}\n",
481 *args,
482 )
483
484
485def _ansi_style(value: str, *styles: str) -> str:
486 if not _log_add_style:
487 return value
488
489 codes = {
490 "bold": 1,
491 "red": 31,
492 "green": 32,
493 "yellow": 33,
494 "magenta": 35,
495 "cyan": 36,
496 }
497
498 for style in styles:
499 value = f"\x1b[{codes[style]}m{value}"
500
501 return f"{value}\x1b[0m"
502
503
504def generate_adhoc_ssl_pair(
505 cn: str | None = None,
506) -> tuple[Certificate, RSAPrivateKeyWithSerialization]:
507 try:
508 from cryptography import x509
509 from cryptography.hazmat.backends import default_backend
510 from cryptography.hazmat.primitives import hashes
511 from cryptography.hazmat.primitives.asymmetric import rsa
512 from cryptography.x509.oid import NameOID
513 except ImportError:
514 raise TypeError(
515 "Using ad-hoc certificates requires the cryptography library."
516 ) from None
517
518 backend = default_backend()
519 pkey = rsa.generate_private_key(
520 public_exponent=65537, key_size=2048, backend=backend
521 )
522
523 # pretty damn sure that this is not actually accepted by anyone
524 if cn is None:
525 cn = "*"
526
527 subject = x509.Name(
528 [
529 x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
530 x509.NameAttribute(NameOID.COMMON_NAME, cn),
531 ]
532 )
533
534 backend = default_backend()
535 cert = (
536 x509.CertificateBuilder()
537 .subject_name(subject)
538 .issuer_name(subject)
539 .public_key(pkey.public_key())
540 .serial_number(x509.random_serial_number())
541 .not_valid_before(dt.now(timezone.utc))
542 .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
543 .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
544 .add_extension(
545 x509.SubjectAlternativeName([x509.DNSName(cn), x509.DNSName(f"*.{cn}")]),
546 critical=False,
547 )
548 .sign(pkey, hashes.SHA256(), backend)
549 )
550 return cert, pkey
551
552
553def make_ssl_devcert(
554 base_path: str, host: str | None = None, cn: str | None = None
555) -> tuple[str, str]:
556 """Creates an SSL key for development. This should be used instead of
557 the ``'adhoc'`` key which generates a new cert on each server start.
558 It accepts a path for where it should store the key and cert and
559 either a host or CN. If a host is given it will use the CN
560 ``*.host/CN=host``.
561
562 For more information see :func:`run_simple`.
563
564 .. versionadded:: 0.9
565
566 :param base_path: the path to the certificate and key. The extension
567 ``.crt`` is added for the certificate, ``.key`` is
568 added for the key.
569 :param host: the name of the host. This can be used as an alternative
570 for the `cn`.
571 :param cn: the `CN` to use.
572 """
573
574 if host is not None:
575 cn = host
576 cert, pkey = generate_adhoc_ssl_pair(cn=cn)
577
578 from cryptography.hazmat.primitives import serialization
579
580 cert_file = f"{base_path}.crt"
581 pkey_file = f"{base_path}.key"
582
583 with open(cert_file, "wb") as f:
584 f.write(cert.public_bytes(serialization.Encoding.PEM))
585 with open(pkey_file, "wb") as f:
586 f.write(
587 pkey.private_bytes(
588 encoding=serialization.Encoding.PEM,
589 format=serialization.PrivateFormat.TraditionalOpenSSL,
590 encryption_algorithm=serialization.NoEncryption(),
591 )
592 )
593
594 return cert_file, pkey_file
595
596
597def generate_adhoc_ssl_context() -> ssl.SSLContext:
598 """Generates an adhoc SSL context for the development server."""
599 import atexit
600 import tempfile
601
602 cert, pkey = generate_adhoc_ssl_pair()
603
604 from cryptography.hazmat.primitives import serialization
605
606 cert_handle, cert_file = tempfile.mkstemp()
607 pkey_handle, pkey_file = tempfile.mkstemp()
608 atexit.register(os.remove, pkey_file)
609 atexit.register(os.remove, cert_file)
610
611 os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
612 os.write(
613 pkey_handle,
614 pkey.private_bytes(
615 encoding=serialization.Encoding.PEM,
616 format=serialization.PrivateFormat.TraditionalOpenSSL,
617 encryption_algorithm=serialization.NoEncryption(),
618 ),
619 )
620
621 os.close(cert_handle)
622 os.close(pkey_handle)
623 ctx = load_ssl_context(cert_file, pkey_file)
624 return ctx
625
626
627def load_ssl_context(
628 cert_file: str, pkey_file: str | None = None, protocol: int | None = None
629) -> ssl.SSLContext:
630 """Loads SSL context from cert/private key files and optional protocol.
631 Many parameters are directly taken from the API of
632 :py:class:`ssl.SSLContext`.
633
634 :param cert_file: Path of the certificate to use.
635 :param pkey_file: Path of the private key to use. If not given, the key
636 will be obtained from the certificate file.
637 :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
638 Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
639 """
640 if protocol is None:
641 protocol = ssl.PROTOCOL_TLS_SERVER
642
643 ctx = ssl.SSLContext(protocol)
644 ctx.load_cert_chain(cert_file, pkey_file)
645 return ctx
646
647
648def is_ssl_error(error: Exception | None = None) -> bool:
649 """Checks if the given error (or the current one) is an SSL error."""
650 if error is None:
651 error = t.cast(Exception, sys.exc_info()[1])
652 return isinstance(error, ssl.SSLError)
653
654
655def select_address_family(host: str, port: int) -> socket.AddressFamily:
656 """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
657 the host and port."""
658 if host.startswith("unix://"):
659 return socket.AF_UNIX
660 elif ":" in host and hasattr(socket, "AF_INET6"):
661 return socket.AF_INET6
662 return socket.AF_INET
663
664
665def get_sockaddr(
666 host: str, port: int, family: socket.AddressFamily
667) -> tuple[str, int] | str:
668 """Return a fully qualified socket address that can be passed to
669 :func:`socket.bind`."""
670 if family == af_unix:
671 # Absolute path avoids IDNA encoding error when path starts with dot.
672 return os.path.abspath(host.partition("://")[2])
673 try:
674 res = socket.getaddrinfo(
675 host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
676 )
677 except socket.gaierror:
678 return host, port
679 return res[0][4] # type: ignore
680
681
682def get_interface_ip(family: socket.AddressFamily) -> str:
683 """Get the IP address of an external interface. Used when binding to
684 0.0.0.0 or ::1 to show a more useful URL.
685
686 :meta private:
687 """
688 # arbitrary private address
689 host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
690
691 with socket.socket(family, socket.SOCK_DGRAM) as s:
692 try:
693 s.connect((host, 58162))
694 except OSError:
695 return "::1" if family == socket.AF_INET6 else "127.0.0.1"
696
697 return s.getsockname()[0] # type: ignore
698
699
700class BaseWSGIServer(HTTPServer):
701 """A WSGI server that that handles one request at a time.
702
703 Use :func:`make_server` to create a server instance.
704 """
705
706 multithread = False
707 multiprocess = False
708 request_queue_size = LISTEN_QUEUE
709 allow_reuse_address = True
710
711 def __init__(
712 self,
713 host: str,
714 port: int,
715 app: WSGIApplication,
716 handler: type[WSGIRequestHandler] | None = None,
717 passthrough_errors: bool = False,
718 ssl_context: _TSSLContextArg | None = None,
719 fd: int | None = None,
720 ) -> None:
721 if handler is None:
722 handler = WSGIRequestHandler
723
724 # If the handler doesn't directly set a protocol version and
725 # thread or process workers are used, then allow chunked
726 # responses and keep-alive connections by enabling HTTP/1.1.
727 if "protocol_version" not in vars(handler) and (
728 self.multithread or self.multiprocess
729 ):
730 handler.protocol_version = "HTTP/1.1"
731
732 self.host = host
733 self.port = port
734 self.app = app
735 self.passthrough_errors = passthrough_errors
736
737 self.address_family = address_family = select_address_family(host, port)
738 server_address = get_sockaddr(host, int(port), address_family)
739
740 # Remove a leftover Unix socket file from a previous run. Don't
741 # remove a file that was set up by run_simple.
742 if address_family == af_unix and fd is None:
743 server_address = t.cast(str, server_address)
744
745 if os.path.exists(server_address):
746 os.unlink(server_address)
747
748 # Bind and activate will be handled manually, it should only
749 # happen if we're not using a socket that was already set up.
750 super().__init__(
751 server_address, # type: ignore[arg-type]
752 handler,
753 bind_and_activate=False,
754 )
755
756 if fd is None:
757 # No existing socket descriptor, do bind_and_activate=True.
758 try:
759 self.server_bind()
760 self.server_activate()
761 except OSError as e:
762 # Catch connection issues and show them without the traceback. Show
763 # extra instructions for address not found, and for macOS.
764 self.server_close()
765 print(e.strerror, file=sys.stderr)
766
767 if e.errno == errno.EADDRINUSE:
768 print(
769 f"Port {port} is in use by another program. Either identify and"
770 " stop that program, or start the server with a different"
771 " port.",
772 file=sys.stderr,
773 )
774
775 if sys.platform == "darwin" and port == 5000:
776 print(
777 "On macOS, try disabling the 'AirPlay Receiver' service"
778 " from System Preferences -> General -> AirDrop & Handoff.",
779 file=sys.stderr,
780 )
781
782 sys.exit(1)
783 except BaseException:
784 self.server_close()
785 raise
786 else:
787 # TCPServer automatically opens a socket even if bind_and_activate is False.
788 # Close it to silence a ResourceWarning.
789 self.server_close()
790
791 # Use the passed in socket directly.
792 self.socket = socket.fromfd(fd, address_family, socket.SOCK_STREAM)
793 self.server_address = self.socket.getsockname()
794
795 if address_family != af_unix:
796 # If port was 0, this will record the bound port.
797 self.port = self.server_address[1]
798
799 if ssl_context is not None:
800 if isinstance(ssl_context, tuple):
801 ssl_context = load_ssl_context(*ssl_context)
802 elif ssl_context == "adhoc":
803 ssl_context = generate_adhoc_ssl_context()
804
805 self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
806 self.ssl_context: ssl.SSLContext | None = ssl_context
807 else:
808 self.ssl_context = None
809
810 import importlib.metadata
811
812 self._server_version = f"Werkzeug/{importlib.metadata.version('werkzeug')}"
813
814 def log(self, type: str, message: str, *args: t.Any) -> None:
815 _log(type, message, *args)
816
817 def serve_forever(self, poll_interval: float = 0.5) -> None:
818 try:
819 super().serve_forever(poll_interval=poll_interval)
820 except KeyboardInterrupt:
821 pass
822 finally:
823 self.server_close()
824
825 def handle_error(
826 self, request: t.Any, client_address: tuple[str, int] | str
827 ) -> None:
828 if self.passthrough_errors:
829 raise
830
831 return super().handle_error(request, client_address)
832
833 def log_startup(self) -> None:
834 """Show information about the address when starting the server."""
835 dev_warning = (
836 "WARNING: This is a development server. Do not use it in a production"
837 " deployment. Use a production WSGI server instead."
838 )
839 dev_warning = _ansi_style(dev_warning, "bold", "red")
840 messages = [dev_warning]
841
842 if self.address_family == af_unix:
843 messages.append(f" * Running on {self.host}")
844 else:
845 scheme = "http" if self.ssl_context is None else "https"
846 display_hostname = self.host
847
848 if self.host in {"0.0.0.0", "::"}:
849 messages.append(f" * Running on all addresses ({self.host})")
850
851 if self.host == "0.0.0.0":
852 localhost = "127.0.0.1"
853 display_hostname = get_interface_ip(socket.AF_INET)
854 else:
855 localhost = "[::1]"
856 display_hostname = get_interface_ip(socket.AF_INET6)
857
858 messages.append(f" * Running on {scheme}://{localhost}:{self.port}")
859
860 if ":" in display_hostname:
861 display_hostname = f"[{display_hostname}]"
862
863 messages.append(f" * Running on {scheme}://{display_hostname}:{self.port}")
864
865 _log("info", "\n".join(messages))
866
867
868class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
869 """A WSGI server that handles concurrent requests in separate
870 threads.
871
872 Use :func:`make_server` to create a server instance.
873 """
874
875 multithread = True
876 daemon_threads = True
877
878
879class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
880 """A WSGI server that handles concurrent requests in separate forked
881 processes.
882
883 Use :func:`make_server` to create a server instance.
884 """
885
886 multiprocess = True
887
888 def __init__(
889 self,
890 host: str,
891 port: int,
892 app: WSGIApplication,
893 processes: int = 40,
894 handler: type[WSGIRequestHandler] | None = None,
895 passthrough_errors: bool = False,
896 ssl_context: _TSSLContextArg | None = None,
897 fd: int | None = None,
898 ) -> None:
899 if not can_fork:
900 raise ValueError("Your platform does not support forking.")
901
902 super().__init__(host, port, app, handler, passthrough_errors, ssl_context, fd)
903 self.max_children = processes
904
905
906def make_server(
907 host: str,
908 port: int,
909 app: WSGIApplication,
910 threaded: bool = False,
911 processes: int = 1,
912 request_handler: type[WSGIRequestHandler] | None = None,
913 passthrough_errors: bool = False,
914 ssl_context: _TSSLContextArg | None = None,
915 fd: int | None = None,
916) -> BaseWSGIServer:
917 """Create an appropriate WSGI server instance based on the value of
918 ``threaded`` and ``processes``.
919
920 This is called from :func:`run_simple`, but can be used separately
921 to have access to the server object, such as to run it in a separate
922 thread.
923
924 See :func:`run_simple` for parameter docs.
925 """
926 if threaded and processes > 1:
927 raise ValueError("Cannot have a multi-thread and multi-process server.")
928
929 if threaded:
930 return ThreadedWSGIServer(
931 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
932 )
933
934 if processes > 1:
935 return ForkingWSGIServer(
936 host,
937 port,
938 app,
939 processes,
940 request_handler,
941 passthrough_errors,
942 ssl_context,
943 fd=fd,
944 )
945
946 return BaseWSGIServer(
947 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
948 )
949
950
951def is_running_from_reloader() -> bool:
952 """Check if the server is running as a subprocess within the
953 Werkzeug reloader.
954
955 .. versionadded:: 0.10
956 """
957 return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
958
959
960def run_simple(
961 hostname: str,
962 port: int,
963 application: WSGIApplication,
964 use_reloader: bool = False,
965 use_debugger: bool = False,
966 use_evalex: bool = True,
967 extra_files: t.Iterable[str] | None = None,
968 exclude_patterns: t.Iterable[str] | None = None,
969 reloader_interval: int = 1,
970 reloader_type: str = "auto",
971 threaded: bool = False,
972 processes: int = 1,
973 request_handler: type[WSGIRequestHandler] | None = None,
974 static_files: dict[str, str | tuple[str, str]] | None = None,
975 passthrough_errors: bool = False,
976 ssl_context: _TSSLContextArg | None = None,
977) -> None:
978 """Start a development server for a WSGI application. Various
979 optional features can be enabled.
980
981 .. warning::
982
983 Do not use the development server when deploying to production.
984 It is intended for use only during local development. It is not
985 designed to be particularly efficient, stable, or secure.
986
987 :param hostname: The host to bind to, for example ``'localhost'``.
988 Can be a domain, IPv4 or IPv6 address, or file path starting
989 with ``unix://`` for a Unix socket.
990 :param port: The port to bind to, for example ``8080``. Using ``0``
991 tells the OS to pick a random free port.
992 :param application: The WSGI application to run.
993 :param use_reloader: Use a reloader process to restart the server
994 process when files are changed.
995 :param use_debugger: Use Werkzeug's debugger, which will show
996 formatted tracebacks on unhandled exceptions.
997 :param use_evalex: Make the debugger interactive. A Python terminal
998 can be opened for any frame in the traceback. Some protection is
999 provided by requiring a PIN, but this should never be enabled
1000 on a publicly visible server.
1001 :param extra_files: The reloader will watch these files for changes
1002 in addition to Python modules. For example, watch a
1003 configuration file.
1004 :param exclude_patterns: The reloader will ignore changes to any
1005 files matching these :mod:`fnmatch` patterns. For example,
1006 ignore cache files.
1007 :param reloader_interval: How often the reloader tries to check for
1008 changes.
1009 :param reloader_type: The reloader to use. The ``'stat'`` reloader
1010 is built in, but may require significant CPU to watch files. The
1011 ``'watchdog'`` reloader is much more efficient but requires
1012 installing the ``watchdog`` package first.
1013 :param threaded: Handle concurrent requests using threads. Cannot be
1014 used with ``processes``.
1015 :param processes: Handle concurrent requests using up to this number
1016 of processes. Cannot be used with ``threaded``.
1017 :param request_handler: Use a different
1018 :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass to
1019 handle requests.
1020 :param static_files: A dict mapping URL prefixes to directories to
1021 serve static files from using
1022 :class:`~werkzeug.middleware.SharedDataMiddleware`.
1023 :param passthrough_errors: Don't catch unhandled exceptions at the
1024 server level, let the server crash instead. If ``use_debugger``
1025 is enabled, the debugger will still catch such errors.
1026 :param ssl_context: Configure TLS to serve over HTTPS. Can be an
1027 :class:`ssl.SSLContext` object, a ``(cert_file, key_file)``
1028 tuple to create a typical context, or the string ``'adhoc'`` to
1029 generate a temporary self-signed certificate.
1030
1031 .. versionchanged:: 2.1
1032 Instructions are shown for dealing with an "address already in
1033 use" error.
1034
1035 .. versionchanged:: 2.1
1036 Running on ``0.0.0.0`` or ``::`` shows the loopback IP in
1037 addition to a real IP.
1038
1039 .. versionchanged:: 2.1
1040 The command-line interface was removed.
1041
1042 .. versionchanged:: 2.0
1043 Running on ``0.0.0.0`` or ``::`` shows a real IP address that
1044 was bound as well as a warning not to run the development server
1045 in production.
1046
1047 .. versionchanged:: 2.0
1048 The ``exclude_patterns`` parameter was added.
1049
1050 .. versionchanged:: 0.15
1051 Bind to a Unix socket by passing a ``hostname`` that starts with
1052 ``unix://``.
1053
1054 .. versionchanged:: 0.10
1055 Improved the reloader and added support for changing the backend
1056 through the ``reloader_type`` parameter.
1057
1058 .. versionchanged:: 0.9
1059 A command-line interface was added.
1060
1061 .. versionchanged:: 0.8
1062 ``ssl_context`` can be a tuple of paths to the certificate and
1063 private key files.
1064
1065 .. versionchanged:: 0.6
1066 The ``ssl_context`` parameter was added.
1067
1068 .. versionchanged:: 0.5
1069 The ``static_files`` and ``passthrough_errors`` parameters were
1070 added.
1071 """
1072 if not isinstance(port, int):
1073 raise TypeError("port must be an integer")
1074
1075 if static_files:
1076 from .middleware.shared_data import SharedDataMiddleware
1077
1078 application = SharedDataMiddleware(application, static_files)
1079
1080 if use_debugger:
1081 from .debug import DebuggedApplication
1082
1083 application = DebuggedApplication(application, evalex=use_evalex)
1084 # Allow the specified hostname to use the debugger, in addition to
1085 # localhost domains.
1086 application.trusted_hosts.append(hostname)
1087
1088 if not is_running_from_reloader():
1089 fd = None
1090 else:
1091 fd = int(os.environ["WERKZEUG_SERVER_FD"])
1092
1093 srv = make_server(
1094 hostname,
1095 port,
1096 application,
1097 threaded,
1098 processes,
1099 request_handler,
1100 passthrough_errors,
1101 ssl_context,
1102 fd=fd,
1103 )
1104 srv.socket.set_inheritable(True)
1105 os.environ["WERKZEUG_SERVER_FD"] = str(srv.fileno())
1106
1107 if not is_running_from_reloader():
1108 srv.log_startup()
1109 _log("info", _ansi_style("Press CTRL+C to quit", "yellow"))
1110
1111 if use_reloader:
1112 from ._reloader import run_with_reloader
1113
1114 try:
1115 run_with_reloader(
1116 srv.serve_forever,
1117 extra_files=extra_files,
1118 exclude_patterns=exclude_patterns,
1119 interval=reloader_interval,
1120 reloader_type=reloader_type,
1121 )
1122 finally:
1123 srv.server_close()
1124 else:
1125 srv.serve_forever()