Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/tcpserver.py: 29%

87 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-10 06:20 +0000

1# 

2# Copyright 2011 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""A non-blocking, single-threaded TCP server.""" 

17 

18import errno 

19import os 

20import socket 

21import ssl 

22 

23from tornado import gen 

24from tornado.log import app_log 

25from tornado.ioloop import IOLoop 

26from tornado.iostream import IOStream, SSLIOStream 

27from tornado.netutil import ( 

28 bind_sockets, 

29 add_accept_handler, 

30 ssl_wrap_socket, 

31 _DEFAULT_BACKLOG, 

32) 

33from tornado import process 

34from tornado.util import errno_from_exception 

35 

36import typing 

37from typing import Union, Dict, Any, Iterable, Optional, Awaitable 

38 

39if typing.TYPE_CHECKING: 

40 from typing import Callable, List # noqa: F401 

41 

42 

43class TCPServer(object): 

44 r"""A non-blocking, single-threaded TCP server. 

45 

46 To use `TCPServer`, define a subclass which overrides the `handle_stream` 

47 method. For example, a simple echo server could be defined like this:: 

48 

49 from tornado.tcpserver import TCPServer 

50 from tornado.iostream import StreamClosedError 

51 

52 class EchoServer(TCPServer): 

53 async def handle_stream(self, stream, address): 

54 while True: 

55 try: 

56 data = await stream.read_until(b"\n") await 

57 stream.write(data) 

58 except StreamClosedError: 

59 break 

60 

61 To make this server serve SSL traffic, send the ``ssl_options`` keyword 

62 argument with an `ssl.SSLContext` object. For compatibility with older 

63 versions of Python ``ssl_options`` may also be a dictionary of keyword 

64 arguments for the `ssl.wrap_socket` method.:: 

65 

66 ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 

67 ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"), 

68 os.path.join(data_dir, "mydomain.key")) 

69 TCPServer(ssl_options=ssl_ctx) 

70 

71 `TCPServer` initialization follows one of three patterns: 

72 

73 1. `listen`: single-process:: 

74 

75 async def main(): 

76 server = TCPServer() 

77 server.listen(8888) 

78 await asyncio.Event.wait() 

79 

80 asyncio.run(main()) 

81 

82 While this example does not create multiple processes on its own, when 

83 the ``reuse_port=True`` argument is passed to ``listen()`` you can run 

84 the program multiple times to create a multi-process service. 

85 

86 2. `add_sockets`: multi-process:: 

87 

88 sockets = bind_sockets(8888) 

89 tornado.process.fork_processes(0) 

90 async def post_fork_main(): 

91 server = TCPServer() 

92 server.add_sockets(sockets) 

93 await asyncio.Event().wait() 

94 asyncio.run(post_fork_main()) 

95 

96 The `add_sockets` interface is more complicated, but it can be used with 

97 `tornado.process.fork_processes` to run a multi-process service with all 

98 worker processes forked from a single parent. `add_sockets` can also be 

99 used in single-process servers if you want to create your listening 

100 sockets in some way other than `~tornado.netutil.bind_sockets`. 

101 

102 Note that when using this pattern, nothing that touches the event loop 

103 can be run before ``fork_processes``. 

104 

105 3. `bind`/`start`: simple **deprecated** multi-process:: 

106 

107 server = TCPServer() 

108 server.bind(8888) 

109 server.start(0) # Forks multiple sub-processes 

110 IOLoop.current().start() 

111 

112 This pattern is deprecated because it requires interfaces in the 

113 `asyncio` module that have been deprecated since Python 3.10. Support for 

114 creating multiple processes in the ``start`` method will be removed in a 

115 future version of Tornado. 

116 

117 .. versionadded:: 3.1 

118 The ``max_buffer_size`` argument. 

119 

120 .. versionchanged:: 5.0 

121 The ``io_loop`` argument has been removed. 

122 """ 

123 

124 def __init__( 

125 self, 

126 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, 

127 max_buffer_size: Optional[int] = None, 

128 read_chunk_size: Optional[int] = None, 

129 ) -> None: 

130 self.ssl_options = ssl_options 

131 self._sockets = {} # type: Dict[int, socket.socket] 

132 self._handlers = {} # type: Dict[int, Callable[[], None]] 

133 self._pending_sockets = [] # type: List[socket.socket] 

134 self._started = False 

135 self._stopped = False 

136 self.max_buffer_size = max_buffer_size 

137 self.read_chunk_size = read_chunk_size 

138 

139 # Verify the SSL options. Otherwise we don't get errors until clients 

140 # connect. This doesn't verify that the keys are legitimate, but 

141 # the SSL module doesn't do that until there is a connected socket 

142 # which seems like too much work 

143 if self.ssl_options is not None and isinstance(self.ssl_options, dict): 

144 # Only certfile is required: it can contain both keys 

145 if "certfile" not in self.ssl_options: 

146 raise KeyError('missing key "certfile" in ssl_options') 

147 

148 if not os.path.exists(self.ssl_options["certfile"]): 

149 raise ValueError( 

150 'certfile "%s" does not exist' % self.ssl_options["certfile"] 

151 ) 

152 if "keyfile" in self.ssl_options and not os.path.exists( 

153 self.ssl_options["keyfile"] 

154 ): 

155 raise ValueError( 

156 'keyfile "%s" does not exist' % self.ssl_options["keyfile"] 

157 ) 

158 

159 def listen( 

160 self, 

161 port: int, 

162 address: Optional[str] = None, 

163 family: socket.AddressFamily = socket.AF_UNSPEC, 

164 backlog: int = _DEFAULT_BACKLOG, 

165 flags: Optional[int] = None, 

166 reuse_port: bool = False, 

167 ) -> None: 

168 """Starts accepting connections on the given port. 

169 

170 This method may be called more than once to listen on multiple ports. 

171 `listen` takes effect immediately; it is not necessary to call 

172 `TCPServer.start` afterwards. It is, however, necessary to start the 

173 event loop if it is not already running. 

174 

175 All arguments have the same meaning as in 

176 `tornado.netutil.bind_sockets`. 

177 

178 .. versionchanged:: 6.2 

179 

180 Added ``family``, ``backlog``, ``flags``, and ``reuse_port`` 

181 arguments to match `tornado.netutil.bind_sockets`. 

182 """ 

183 sockets = bind_sockets( 

184 port, 

185 address=address, 

186 family=family, 

187 backlog=backlog, 

188 flags=flags, 

189 reuse_port=reuse_port, 

190 ) 

191 self.add_sockets(sockets) 

192 

193 def add_sockets(self, sockets: Iterable[socket.socket]) -> None: 

194 """Makes this server start accepting connections on the given sockets. 

195 

196 The ``sockets`` parameter is a list of socket objects such as 

197 those returned by `~tornado.netutil.bind_sockets`. 

198 `add_sockets` is typically used in combination with that 

199 method and `tornado.process.fork_processes` to provide greater 

200 control over the initialization of a multi-process server. 

201 """ 

202 for sock in sockets: 

203 self._sockets[sock.fileno()] = sock 

204 self._handlers[sock.fileno()] = add_accept_handler( 

205 sock, self._handle_connection 

206 ) 

207 

208 def add_socket(self, socket: socket.socket) -> None: 

209 """Singular version of `add_sockets`. Takes a single socket object.""" 

210 self.add_sockets([socket]) 

211 

212 def bind( 

213 self, 

214 port: int, 

215 address: Optional[str] = None, 

216 family: socket.AddressFamily = socket.AF_UNSPEC, 

217 backlog: int = _DEFAULT_BACKLOG, 

218 flags: Optional[int] = None, 

219 reuse_port: bool = False, 

220 ) -> None: 

221 """Binds this server to the given port on the given address. 

222 

223 To start the server, call `start`. If you want to run this server in a 

224 single process, you can call `listen` as a shortcut to the sequence of 

225 `bind` and `start` calls. 

226 

227 Address may be either an IP address or hostname. If it's a hostname, 

228 the server will listen on all IP addresses associated with the name. 

229 Address may be an empty string or None to listen on all available 

230 interfaces. Family may be set to either `socket.AF_INET` or 

231 `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise both 

232 will be used if available. 

233 

234 The ``backlog`` argument has the same meaning as for `socket.listen 

235 <socket.socket.listen>`. The ``reuse_port`` argument has the same 

236 meaning as for `.bind_sockets`. 

237 

238 This method may be called multiple times prior to `start` to listen on 

239 multiple ports or interfaces. 

240 

241 .. versionchanged:: 4.4 

242 Added the ``reuse_port`` argument. 

243 

244 .. versionchanged:: 6.2 

245 Added the ``flags`` argument to match `.bind_sockets`. 

246 

247 .. deprecated:: 6.2 

248 Use either ``listen()`` or ``add_sockets()`` instead of ``bind()`` 

249 and ``start()``. The ``bind()/start()`` pattern depends on 

250 interfaces that have been deprecated in Python 3.10 and will be 

251 removed in future versions of Python. 

252 """ 

253 sockets = bind_sockets( 

254 port, 

255 address=address, 

256 family=family, 

257 backlog=backlog, 

258 flags=flags, 

259 reuse_port=reuse_port, 

260 ) 

261 if self._started: 

262 self.add_sockets(sockets) 

263 else: 

264 self._pending_sockets.extend(sockets) 

265 

266 def start( 

267 self, num_processes: Optional[int] = 1, max_restarts: Optional[int] = None 

268 ) -> None: 

269 """Starts this server in the `.IOLoop`. 

270 

271 By default, we run the server in this process and do not fork any 

272 additional child process. 

273 

274 If num_processes is ``None`` or <= 0, we detect the number of cores 

275 available on this machine and fork that number of child 

276 processes. If num_processes is given and > 1, we fork that 

277 specific number of sub-processes. 

278 

279 Since we use processes and not threads, there is no shared memory 

280 between any server code. 

281 

282 Note that multiple processes are not compatible with the autoreload 

283 module (or the ``autoreload=True`` option to `tornado.web.Application` 

284 which defaults to True when ``debug=True``). 

285 When using multiple processes, no IOLoops can be created or 

286 referenced until after the call to ``TCPServer.start(n)``. 

287 

288 Values of ``num_processes`` other than 1 are not supported on Windows. 

289 

290 The ``max_restarts`` argument is passed to `.fork_processes`. 

291 

292 .. versionchanged:: 6.0 

293 

294 Added ``max_restarts`` argument. 

295 

296 .. deprecated:: 6.2 

297 Use either ``listen()`` or ``add_sockets()`` instead of ``bind()`` 

298 and ``start()``. The ``bind()/start()`` pattern depends on 

299 interfaces that have been deprecated in Python 3.10 and will be 

300 removed in future versions of Python. 

301 """ 

302 assert not self._started 

303 self._started = True 

304 if num_processes != 1: 

305 process.fork_processes(num_processes, max_restarts) 

306 sockets = self._pending_sockets 

307 self._pending_sockets = [] 

308 self.add_sockets(sockets) 

309 

310 def stop(self) -> None: 

311 """Stops listening for new connections. 

312 

313 Requests currently in progress may still continue after the 

314 server is stopped. 

315 """ 

316 if self._stopped: 

317 return 

318 self._stopped = True 

319 for fd, sock in self._sockets.items(): 

320 assert sock.fileno() == fd 

321 # Unregister socket from IOLoop 

322 self._handlers.pop(fd)() 

323 sock.close() 

324 

325 def handle_stream( 

326 self, stream: IOStream, address: tuple 

327 ) -> Optional[Awaitable[None]]: 

328 """Override to handle a new `.IOStream` from an incoming connection. 

329 

330 This method may be a coroutine; if so any exceptions it raises 

331 asynchronously will be logged. Accepting of incoming connections 

332 will not be blocked by this coroutine. 

333 

334 If this `TCPServer` is configured for SSL, ``handle_stream`` 

335 may be called before the SSL handshake has completed. Use 

336 `.SSLIOStream.wait_for_handshake` if you need to verify the client's 

337 certificate or use NPN/ALPN. 

338 

339 .. versionchanged:: 4.2 

340 Added the option for this method to be a coroutine. 

341 """ 

342 raise NotImplementedError() 

343 

344 def _handle_connection(self, connection: socket.socket, address: Any) -> None: 

345 if self.ssl_options is not None: 

346 assert ssl, "Python 2.6+ and OpenSSL required for SSL" 

347 try: 

348 connection = ssl_wrap_socket( 

349 connection, 

350 self.ssl_options, 

351 server_side=True, 

352 do_handshake_on_connect=False, 

353 ) 

354 except ssl.SSLError as err: 

355 if err.args[0] == ssl.SSL_ERROR_EOF: 

356 return connection.close() 

357 else: 

358 raise 

359 except socket.error as err: 

360 # If the connection is closed immediately after it is created 

361 # (as in a port scan), we can get one of several errors. 

362 # wrap_socket makes an internal call to getpeername, 

363 # which may return either EINVAL (Mac OS X) or ENOTCONN 

364 # (Linux). If it returns ENOTCONN, this error is 

365 # silently swallowed by the ssl module, so we need to 

366 # catch another error later on (AttributeError in 

367 # SSLIOStream._do_ssl_handshake). 

368 # To test this behavior, try nmap with the -sT flag. 

369 # https://github.com/tornadoweb/tornado/pull/750 

370 if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL): 

371 return connection.close() 

372 else: 

373 raise 

374 try: 

375 if self.ssl_options is not None: 

376 stream = SSLIOStream( 

377 connection, 

378 max_buffer_size=self.max_buffer_size, 

379 read_chunk_size=self.read_chunk_size, 

380 ) # type: IOStream 

381 else: 

382 stream = IOStream( 

383 connection, 

384 max_buffer_size=self.max_buffer_size, 

385 read_chunk_size=self.read_chunk_size, 

386 ) 

387 

388 future = self.handle_stream(stream, address) 

389 if future is not None: 

390 IOLoop.current().add_future( 

391 gen.convert_yielded(future), lambda f: f.result() 

392 ) 

393 except Exception: 

394 app_log.error("Error in connection callback", exc_info=True)