Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/tcpserver.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

87 statements  

1# 

2# Copyright 2011 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""A non-blocking, single-threaded TCP server.""" 

17 

18import errno 

19import os 

20import socket 

21import ssl 

22 

23from tornado import gen 

24from tornado.log import app_log 

25from tornado.ioloop import IOLoop 

26from tornado.iostream import IOStream, SSLIOStream 

27from tornado.netutil import ( 

28 bind_sockets, 

29 add_accept_handler, 

30 ssl_wrap_socket, 

31 _DEFAULT_BACKLOG, 

32) 

33from tornado import process 

34from tornado.util import errno_from_exception 

35 

36import typing 

37from typing import Union, Dict, Any, Iterable, Optional, Awaitable 

38 

39if typing.TYPE_CHECKING: 

40 from typing import Callable, List # noqa: F401 

41 

42 

43class TCPServer(object): 

44 r"""A non-blocking, single-threaded TCP server. 

45 

46 To use `TCPServer`, define a subclass which overrides the `handle_stream` 

47 method. For example, a simple echo server could be defined like this:: 

48 

49 from tornado.tcpserver import TCPServer 

50 from tornado.iostream import StreamClosedError 

51 

52 class EchoServer(TCPServer): 

53 async def handle_stream(self, stream, address): 

54 while True: 

55 try: 

56 data = await stream.read_until(b"\n") await 

57 stream.write(data) 

58 except StreamClosedError: 

59 break 

60 

61 To make this server serve SSL traffic, send the ``ssl_options`` keyword 

62 argument with an `ssl.SSLContext` object. For compatibility with older 

63 versions of Python ``ssl_options`` may also be a dictionary of keyword 

64 arguments for the `ssl.SSLContext.wrap_socket` method.:: 

65 

66 ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 

67 ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"), 

68 os.path.join(data_dir, "mydomain.key")) 

69 TCPServer(ssl_options=ssl_ctx) 

70 

71 `TCPServer` initialization follows one of three patterns: 

72 

73 1. `listen`: single-process:: 

74 

75 async def main(): 

76 server = TCPServer() 

77 server.listen(8888) 

78 await asyncio.Event.wait() 

79 

80 asyncio.run(main()) 

81 

82 While this example does not create multiple processes on its own, when 

83 the ``reuse_port=True`` argument is passed to ``listen()`` you can run 

84 the program multiple times to create a multi-process service. 

85 

86 2. `add_sockets`: multi-process:: 

87 

88 sockets = bind_sockets(8888) 

89 tornado.process.fork_processes(0) 

90 async def post_fork_main(): 

91 server = TCPServer() 

92 server.add_sockets(sockets) 

93 await asyncio.Event().wait() 

94 asyncio.run(post_fork_main()) 

95 

96 The `add_sockets` interface is more complicated, but it can be used with 

97 `tornado.process.fork_processes` to run a multi-process service with all 

98 worker processes forked from a single parent. `add_sockets` can also be 

99 used in single-process servers if you want to create your listening 

100 sockets in some way other than `~tornado.netutil.bind_sockets`. 

101 

102 Note that when using this pattern, nothing that touches the event loop 

103 can be run before ``fork_processes``. 

104 

105 3. `bind`/`start`: simple **deprecated** multi-process:: 

106 

107 server = TCPServer() 

108 server.bind(8888) 

109 server.start(0) # Forks multiple sub-processes 

110 IOLoop.current().start() 

111 

112 This pattern is deprecated because it requires interfaces in the 

113 `asyncio` module that have been deprecated since Python 3.10. Support for 

114 creating multiple processes in the ``start`` method will be removed in a 

115 future version of Tornado. 

116 

117 .. versionadded:: 3.1 

118 The ``max_buffer_size`` argument. 

119 

120 .. versionchanged:: 5.0 

121 The ``io_loop`` argument has been removed. 

122 """ 

123 

124 def __init__( 

125 self, 

126 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, 

127 max_buffer_size: Optional[int] = None, 

128 read_chunk_size: Optional[int] = None, 

129 ) -> None: 

130 self.ssl_options = ssl_options 

131 self._sockets = {} # type: Dict[int, socket.socket] 

132 self._handlers = {} # type: Dict[int, Callable[[], None]] 

133 self._pending_sockets = [] # type: List[socket.socket] 

134 self._started = False 

135 self._stopped = False 

136 self.max_buffer_size = max_buffer_size 

137 self.read_chunk_size = read_chunk_size 

138 

139 # Verify the SSL options. Otherwise we don't get errors until clients 

140 # connect. This doesn't verify that the keys are legitimate, but 

141 # the SSL module doesn't do that until there is a connected socket 

142 # which seems like too much work 

143 if self.ssl_options is not None and isinstance(self.ssl_options, dict): 

144 # Only certfile is required: it can contain both keys 

145 if "certfile" not in self.ssl_options: 

146 raise KeyError('missing key "certfile" in ssl_options') 

147 

148 if not os.path.exists(self.ssl_options["certfile"]): 

149 raise ValueError( 

150 'certfile "%s" does not exist' % self.ssl_options["certfile"] 

151 ) 

152 if "keyfile" in self.ssl_options and not os.path.exists( 

153 self.ssl_options["keyfile"] 

154 ): 

155 raise ValueError( 

156 'keyfile "%s" does not exist' % self.ssl_options["keyfile"] 

157 ) 

158 

159 def listen( 

160 self, 

161 port: int, 

162 address: Optional[str] = None, 

163 family: socket.AddressFamily = socket.AF_UNSPEC, 

164 backlog: int = _DEFAULT_BACKLOG, 

165 flags: Optional[int] = None, 

166 reuse_port: bool = False, 

167 ) -> None: 

168 """Starts accepting connections on the given port. 

169 

170 This method may be called more than once to listen on multiple ports. 

171 `listen` takes effect immediately; it is not necessary to call 

172 `TCPServer.start` afterwards. It is, however, necessary to start the 

173 event loop if it is not already running. 

174 

175 All arguments have the same meaning as in 

176 `tornado.netutil.bind_sockets`. 

177 

178 .. versionchanged:: 6.2 

179 

180 Added ``family``, ``backlog``, ``flags``, and ``reuse_port`` 

181 arguments to match `tornado.netutil.bind_sockets`. 

182 """ 

183 sockets = bind_sockets( 

184 port, 

185 address=address, 

186 family=family, 

187 backlog=backlog, 

188 flags=flags, 

189 reuse_port=reuse_port, 

190 ) 

191 self.add_sockets(sockets) 

192 

193 def add_sockets(self, sockets: Iterable[socket.socket]) -> None: 

194 """Makes this server start accepting connections on the given sockets. 

195 

196 The ``sockets`` parameter is a list of socket objects such as 

197 those returned by `~tornado.netutil.bind_sockets`. 

198 `add_sockets` is typically used in combination with that 

199 method and `tornado.process.fork_processes` to provide greater 

200 control over the initialization of a multi-process server. 

201 """ 

202 for sock in sockets: 

203 self._sockets[sock.fileno()] = sock 

204 self._handlers[sock.fileno()] = add_accept_handler( 

205 sock, self._handle_connection 

206 ) 

207 

208 def add_socket(self, socket: socket.socket) -> None: 

209 """Singular version of `add_sockets`. Takes a single socket object.""" 

210 self.add_sockets([socket]) 

211 

212 def bind( 

213 self, 

214 port: int, 

215 address: Optional[str] = None, 

216 family: socket.AddressFamily = socket.AF_UNSPEC, 

217 backlog: int = _DEFAULT_BACKLOG, 

218 flags: Optional[int] = None, 

219 reuse_port: bool = False, 

220 ) -> None: 

221 """Binds this server to the given port on the given address. 

222 

223 To start the server, call `start`. If you want to run this server in a 

224 single process, you can call `listen` as a shortcut to the sequence of 

225 `bind` and `start` calls. 

226 

227 Address may be either an IP address or hostname. If it's a hostname, 

228 the server will listen on all IP addresses associated with the name. 

229 Address may be an empty string or None to listen on all available 

230 interfaces. Family may be set to either `socket.AF_INET` or 

231 `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise both 

232 will be used if available. 

233 

234 The ``backlog`` argument has the same meaning as for `socket.listen 

235 <socket.socket.listen>`. The ``reuse_port`` argument has the same 

236 meaning as for `.bind_sockets`. 

237 

238 This method may be called multiple times prior to `start` to listen on 

239 multiple ports or interfaces. 

240 

241 .. versionchanged:: 4.4 

242 Added the ``reuse_port`` argument. 

243 

244 .. versionchanged:: 6.2 

245 Added the ``flags`` argument to match `.bind_sockets`. 

246 

247 .. deprecated:: 6.2 

248 Use either ``listen()`` or ``add_sockets()`` instead of ``bind()`` 

249 and ``start()``. 

250 """ 

251 sockets = bind_sockets( 

252 port, 

253 address=address, 

254 family=family, 

255 backlog=backlog, 

256 flags=flags, 

257 reuse_port=reuse_port, 

258 ) 

259 if self._started: 

260 self.add_sockets(sockets) 

261 else: 

262 self._pending_sockets.extend(sockets) 

263 

264 def start( 

265 self, num_processes: Optional[int] = 1, max_restarts: Optional[int] = None 

266 ) -> None: 

267 """Starts this server in the `.IOLoop`. 

268 

269 By default, we run the server in this process and do not fork any 

270 additional child process. 

271 

272 If num_processes is ``None`` or <= 0, we detect the number of cores 

273 available on this machine and fork that number of child 

274 processes. If num_processes is given and > 1, we fork that 

275 specific number of sub-processes. 

276 

277 Since we use processes and not threads, there is no shared memory 

278 between any server code. 

279 

280 Note that multiple processes are not compatible with the autoreload 

281 module (or the ``autoreload=True`` option to `tornado.web.Application` 

282 which defaults to True when ``debug=True``). 

283 When using multiple processes, no IOLoops can be created or 

284 referenced until after the call to ``TCPServer.start(n)``. 

285 

286 Values of ``num_processes`` other than 1 are not supported on Windows. 

287 

288 The ``max_restarts`` argument is passed to `.fork_processes`. 

289 

290 .. versionchanged:: 6.0 

291 

292 Added ``max_restarts`` argument. 

293 

294 .. deprecated:: 6.2 

295 Use either ``listen()`` or ``add_sockets()`` instead of ``bind()`` 

296 and ``start()``. 

297 """ 

298 assert not self._started 

299 self._started = True 

300 if num_processes != 1: 

301 process.fork_processes(num_processes, max_restarts) 

302 sockets = self._pending_sockets 

303 self._pending_sockets = [] 

304 self.add_sockets(sockets) 

305 

306 def stop(self) -> None: 

307 """Stops listening for new connections. 

308 

309 Requests currently in progress may still continue after the 

310 server is stopped. 

311 """ 

312 if self._stopped: 

313 return 

314 self._stopped = True 

315 for fd, sock in self._sockets.items(): 

316 assert sock.fileno() == fd 

317 # Unregister socket from IOLoop 

318 self._handlers.pop(fd)() 

319 sock.close() 

320 

321 def handle_stream( 

322 self, stream: IOStream, address: tuple 

323 ) -> Optional[Awaitable[None]]: 

324 """Override to handle a new `.IOStream` from an incoming connection. 

325 

326 This method may be a coroutine; if so any exceptions it raises 

327 asynchronously will be logged. Accepting of incoming connections 

328 will not be blocked by this coroutine. 

329 

330 If this `TCPServer` is configured for SSL, ``handle_stream`` 

331 may be called before the SSL handshake has completed. Use 

332 `.SSLIOStream.wait_for_handshake` if you need to verify the client's 

333 certificate or use NPN/ALPN. 

334 

335 .. versionchanged:: 4.2 

336 Added the option for this method to be a coroutine. 

337 """ 

338 raise NotImplementedError() 

339 

340 def _handle_connection(self, connection: socket.socket, address: Any) -> None: 

341 if self.ssl_options is not None: 

342 assert ssl, "Python 2.6+ and OpenSSL required for SSL" 

343 try: 

344 connection = ssl_wrap_socket( 

345 connection, 

346 self.ssl_options, 

347 server_side=True, 

348 do_handshake_on_connect=False, 

349 ) 

350 except ssl.SSLError as err: 

351 if err.args[0] == ssl.SSL_ERROR_EOF: 

352 return connection.close() 

353 else: 

354 raise 

355 except socket.error as err: 

356 # If the connection is closed immediately after it is created 

357 # (as in a port scan), we can get one of several errors. 

358 # wrap_socket makes an internal call to getpeername, 

359 # which may return either EINVAL (Mac OS X) or ENOTCONN 

360 # (Linux). If it returns ENOTCONN, this error is 

361 # silently swallowed by the ssl module, so we need to 

362 # catch another error later on (AttributeError in 

363 # SSLIOStream._do_ssl_handshake). 

364 # To test this behavior, try nmap with the -sT flag. 

365 # https://github.com/tornadoweb/tornado/pull/750 

366 if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL): 

367 return connection.close() 

368 else: 

369 raise 

370 try: 

371 if self.ssl_options is not None: 

372 stream = SSLIOStream( 

373 connection, 

374 max_buffer_size=self.max_buffer_size, 

375 read_chunk_size=self.read_chunk_size, 

376 ) # type: IOStream 

377 else: 

378 stream = IOStream( 

379 connection, 

380 max_buffer_size=self.max_buffer_size, 

381 read_chunk_size=self.read_chunk_size, 

382 ) 

383 

384 future = self.handle_stream(stream, address) 

385 if future is not None: 

386 IOLoop.current().add_future( 

387 gen.convert_yielded(future), lambda f: f.result() 

388 ) 

389 except Exception: 

390 app_log.error("Error in connection callback", exc_info=True)