1#
2# Copyright 2011 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16"""A non-blocking, single-threaded TCP server."""
17
18import errno
19import os
20import socket
21import ssl
22
23from tornado import gen
24from tornado.log import app_log
25from tornado.ioloop import IOLoop
26from tornado.iostream import IOStream, SSLIOStream
27from tornado.netutil import (
28 bind_sockets,
29 add_accept_handler,
30 ssl_wrap_socket,
31 _DEFAULT_BACKLOG,
32)
33from tornado import process
34from tornado.util import errno_from_exception
35
36import typing
37from typing import Union, Dict, Any, Iterable, Optional, Awaitable
38
39if typing.TYPE_CHECKING:
40 from typing import Callable, List # noqa: F401
41
42
43class TCPServer(object):
44 r"""A non-blocking, single-threaded TCP server.
45
46 To use `TCPServer`, define a subclass which overrides the `handle_stream`
47 method. For example, a simple echo server could be defined like this::
48
49 from tornado.tcpserver import TCPServer
50 from tornado.iostream import StreamClosedError
51
52 class EchoServer(TCPServer):
53 async def handle_stream(self, stream, address):
54 while True:
55 try:
56 data = await stream.read_until(b"\n") await
57 stream.write(data)
58 except StreamClosedError:
59 break
60
61 To make this server serve SSL traffic, send the ``ssl_options`` keyword
62 argument with an `ssl.SSLContext` object. For compatibility with older
63 versions of Python ``ssl_options`` may also be a dictionary of keyword
64 arguments for the `ssl.SSLContext.wrap_socket` method.::
65
66 ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
67 ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"),
68 os.path.join(data_dir, "mydomain.key"))
69 TCPServer(ssl_options=ssl_ctx)
70
71 `TCPServer` initialization follows one of three patterns:
72
73 1. `listen`: single-process::
74
75 async def main():
76 server = TCPServer()
77 server.listen(8888)
78 await asyncio.Event.wait()
79
80 asyncio.run(main())
81
82 While this example does not create multiple processes on its own, when
83 the ``reuse_port=True`` argument is passed to ``listen()`` you can run
84 the program multiple times to create a multi-process service.
85
86 2. `add_sockets`: multi-process::
87
88 sockets = bind_sockets(8888)
89 tornado.process.fork_processes(0)
90 async def post_fork_main():
91 server = TCPServer()
92 server.add_sockets(sockets)
93 await asyncio.Event().wait()
94 asyncio.run(post_fork_main())
95
96 The `add_sockets` interface is more complicated, but it can be used with
97 `tornado.process.fork_processes` to run a multi-process service with all
98 worker processes forked from a single parent. `add_sockets` can also be
99 used in single-process servers if you want to create your listening
100 sockets in some way other than `~tornado.netutil.bind_sockets`.
101
102 Note that when using this pattern, nothing that touches the event loop
103 can be run before ``fork_processes``.
104
105 3. `bind`/`start`: simple **deprecated** multi-process::
106
107 server = TCPServer()
108 server.bind(8888)
109 server.start(0) # Forks multiple sub-processes
110 IOLoop.current().start()
111
112 This pattern is deprecated because it requires interfaces in the
113 `asyncio` module that have been deprecated since Python 3.10. Support for
114 creating multiple processes in the ``start`` method will be removed in a
115 future version of Tornado.
116
117 .. versionadded:: 3.1
118 The ``max_buffer_size`` argument.
119
120 .. versionchanged:: 5.0
121 The ``io_loop`` argument has been removed.
122 """
123
124 def __init__(
125 self,
126 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
127 max_buffer_size: Optional[int] = None,
128 read_chunk_size: Optional[int] = None,
129 ) -> None:
130 self.ssl_options = ssl_options
131 self._sockets = {} # type: Dict[int, socket.socket]
132 self._handlers = {} # type: Dict[int, Callable[[], None]]
133 self._pending_sockets = [] # type: List[socket.socket]
134 self._started = False
135 self._stopped = False
136 self.max_buffer_size = max_buffer_size
137 self.read_chunk_size = read_chunk_size
138
139 # Verify the SSL options. Otherwise we don't get errors until clients
140 # connect. This doesn't verify that the keys are legitimate, but
141 # the SSL module doesn't do that until there is a connected socket
142 # which seems like too much work
143 if self.ssl_options is not None and isinstance(self.ssl_options, dict):
144 # Only certfile is required: it can contain both keys
145 if "certfile" not in self.ssl_options:
146 raise KeyError('missing key "certfile" in ssl_options')
147
148 if not os.path.exists(self.ssl_options["certfile"]):
149 raise ValueError(
150 'certfile "%s" does not exist' % self.ssl_options["certfile"]
151 )
152 if "keyfile" in self.ssl_options and not os.path.exists(
153 self.ssl_options["keyfile"]
154 ):
155 raise ValueError(
156 'keyfile "%s" does not exist' % self.ssl_options["keyfile"]
157 )
158
159 def listen(
160 self,
161 port: int,
162 address: Optional[str] = None,
163 family: socket.AddressFamily = socket.AF_UNSPEC,
164 backlog: int = _DEFAULT_BACKLOG,
165 flags: Optional[int] = None,
166 reuse_port: bool = False,
167 ) -> None:
168 """Starts accepting connections on the given port.
169
170 This method may be called more than once to listen on multiple ports.
171 `listen` takes effect immediately; it is not necessary to call
172 `TCPServer.start` afterwards. It is, however, necessary to start the
173 event loop if it is not already running.
174
175 All arguments have the same meaning as in
176 `tornado.netutil.bind_sockets`.
177
178 .. versionchanged:: 6.2
179
180 Added ``family``, ``backlog``, ``flags``, and ``reuse_port``
181 arguments to match `tornado.netutil.bind_sockets`.
182 """
183 sockets = bind_sockets(
184 port,
185 address=address,
186 family=family,
187 backlog=backlog,
188 flags=flags,
189 reuse_port=reuse_port,
190 )
191 self.add_sockets(sockets)
192
193 def add_sockets(self, sockets: Iterable[socket.socket]) -> None:
194 """Makes this server start accepting connections on the given sockets.
195
196 The ``sockets`` parameter is a list of socket objects such as
197 those returned by `~tornado.netutil.bind_sockets`.
198 `add_sockets` is typically used in combination with that
199 method and `tornado.process.fork_processes` to provide greater
200 control over the initialization of a multi-process server.
201 """
202 for sock in sockets:
203 self._sockets[sock.fileno()] = sock
204 self._handlers[sock.fileno()] = add_accept_handler(
205 sock, self._handle_connection
206 )
207
208 def add_socket(self, socket: socket.socket) -> None:
209 """Singular version of `add_sockets`. Takes a single socket object."""
210 self.add_sockets([socket])
211
212 def bind(
213 self,
214 port: int,
215 address: Optional[str] = None,
216 family: socket.AddressFamily = socket.AF_UNSPEC,
217 backlog: int = _DEFAULT_BACKLOG,
218 flags: Optional[int] = None,
219 reuse_port: bool = False,
220 ) -> None:
221 """Binds this server to the given port on the given address.
222
223 To start the server, call `start`. If you want to run this server in a
224 single process, you can call `listen` as a shortcut to the sequence of
225 `bind` and `start` calls.
226
227 Address may be either an IP address or hostname. If it's a hostname,
228 the server will listen on all IP addresses associated with the name.
229 Address may be an empty string or None to listen on all available
230 interfaces. Family may be set to either `socket.AF_INET` or
231 `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise both
232 will be used if available.
233
234 The ``backlog`` argument has the same meaning as for `socket.listen
235 <socket.socket.listen>`. The ``reuse_port`` argument has the same
236 meaning as for `.bind_sockets`.
237
238 This method may be called multiple times prior to `start` to listen on
239 multiple ports or interfaces.
240
241 .. versionchanged:: 4.4
242 Added the ``reuse_port`` argument.
243
244 .. versionchanged:: 6.2
245 Added the ``flags`` argument to match `.bind_sockets`.
246
247 .. deprecated:: 6.2
248 Use either ``listen()`` or ``add_sockets()`` instead of ``bind()``
249 and ``start()``.
250 """
251 sockets = bind_sockets(
252 port,
253 address=address,
254 family=family,
255 backlog=backlog,
256 flags=flags,
257 reuse_port=reuse_port,
258 )
259 if self._started:
260 self.add_sockets(sockets)
261 else:
262 self._pending_sockets.extend(sockets)
263
264 def start(
265 self, num_processes: Optional[int] = 1, max_restarts: Optional[int] = None
266 ) -> None:
267 """Starts this server in the `.IOLoop`.
268
269 By default, we run the server in this process and do not fork any
270 additional child process.
271
272 If num_processes is ``None`` or <= 0, we detect the number of cores
273 available on this machine and fork that number of child
274 processes. If num_processes is given and > 1, we fork that
275 specific number of sub-processes.
276
277 Since we use processes and not threads, there is no shared memory
278 between any server code.
279
280 Note that multiple processes are not compatible with the autoreload
281 module (or the ``autoreload=True`` option to `tornado.web.Application`
282 which defaults to True when ``debug=True``).
283 When using multiple processes, no IOLoops can be created or
284 referenced until after the call to ``TCPServer.start(n)``.
285
286 Values of ``num_processes`` other than 1 are not supported on Windows.
287
288 The ``max_restarts`` argument is passed to `.fork_processes`.
289
290 .. versionchanged:: 6.0
291
292 Added ``max_restarts`` argument.
293
294 .. deprecated:: 6.2
295 Use either ``listen()`` or ``add_sockets()`` instead of ``bind()``
296 and ``start()``.
297 """
298 assert not self._started
299 self._started = True
300 if num_processes != 1:
301 process.fork_processes(num_processes, max_restarts)
302 sockets = self._pending_sockets
303 self._pending_sockets = []
304 self.add_sockets(sockets)
305
306 def stop(self) -> None:
307 """Stops listening for new connections.
308
309 Requests currently in progress may still continue after the
310 server is stopped.
311 """
312 if self._stopped:
313 return
314 self._stopped = True
315 for fd, sock in self._sockets.items():
316 assert sock.fileno() == fd
317 # Unregister socket from IOLoop
318 self._handlers.pop(fd)()
319 sock.close()
320
321 def handle_stream(
322 self, stream: IOStream, address: tuple
323 ) -> Optional[Awaitable[None]]:
324 """Override to handle a new `.IOStream` from an incoming connection.
325
326 This method may be a coroutine; if so any exceptions it raises
327 asynchronously will be logged. Accepting of incoming connections
328 will not be blocked by this coroutine.
329
330 If this `TCPServer` is configured for SSL, ``handle_stream``
331 may be called before the SSL handshake has completed. Use
332 `.SSLIOStream.wait_for_handshake` if you need to verify the client's
333 certificate or use NPN/ALPN.
334
335 .. versionchanged:: 4.2
336 Added the option for this method to be a coroutine.
337 """
338 raise NotImplementedError()
339
340 def _handle_connection(self, connection: socket.socket, address: Any) -> None:
341 if self.ssl_options is not None:
342 assert ssl, "Python 2.6+ and OpenSSL required for SSL"
343 try:
344 connection = ssl_wrap_socket(
345 connection,
346 self.ssl_options,
347 server_side=True,
348 do_handshake_on_connect=False,
349 )
350 except ssl.SSLError as err:
351 if err.args[0] == ssl.SSL_ERROR_EOF:
352 return connection.close()
353 else:
354 raise
355 except socket.error as err:
356 # If the connection is closed immediately after it is created
357 # (as in a port scan), we can get one of several errors.
358 # wrap_socket makes an internal call to getpeername,
359 # which may return either EINVAL (Mac OS X) or ENOTCONN
360 # (Linux). If it returns ENOTCONN, this error is
361 # silently swallowed by the ssl module, so we need to
362 # catch another error later on (AttributeError in
363 # SSLIOStream._do_ssl_handshake).
364 # To test this behavior, try nmap with the -sT flag.
365 # https://github.com/tornadoweb/tornado/pull/750
366 if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
367 return connection.close()
368 else:
369 raise
370 try:
371 if self.ssl_options is not None:
372 stream = SSLIOStream(
373 connection,
374 max_buffer_size=self.max_buffer_size,
375 read_chunk_size=self.read_chunk_size,
376 ) # type: IOStream
377 else:
378 stream = IOStream(
379 connection,
380 max_buffer_size=self.max_buffer_size,
381 read_chunk_size=self.read_chunk_size,
382 )
383
384 future = self.handle_stream(stream, address)
385 if future is not None:
386 IOLoop.current().add_future(
387 gen.convert_yielded(future), lambda f: f.result()
388 )
389 except Exception:
390 app_log.error("Error in connection callback", exc_info=True)