Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/iostream.py: 14%

752 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-10 06:20 +0000

1# 

2# Copyright 2009 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Utility classes to write to and read from non-blocking files and sockets. 

17 

18Contents: 

19 

20* `BaseIOStream`: Generic interface for reading and writing. 

21* `IOStream`: Implementation of BaseIOStream using non-blocking sockets. 

22* `SSLIOStream`: SSL-aware version of IOStream. 

23* `PipeIOStream`: Pipe-based IOStream implementation. 

24""" 

25 

26import asyncio 

27import collections 

28import errno 

29import io 

30import numbers 

31import os 

32import socket 

33import ssl 

34import sys 

35import re 

36 

37from tornado.concurrent import Future, future_set_result_unless_cancelled 

38from tornado import ioloop 

39from tornado.log import gen_log 

40from tornado.netutil import ssl_wrap_socket, _client_ssl_defaults, _server_ssl_defaults 

41from tornado.util import errno_from_exception 

42 

43import typing 

44from typing import ( 

45 Union, 

46 Optional, 

47 Awaitable, 

48 Callable, 

49 Pattern, 

50 Any, 

51 Dict, 

52 TypeVar, 

53 Tuple, 

54) 

55from types import TracebackType 

56 

57if typing.TYPE_CHECKING: 

58 from typing import Deque, List, Type # noqa: F401 

59 

60_IOStreamType = TypeVar("_IOStreamType", bound="IOStream") 

61 

62# These errnos indicate that a connection has been abruptly terminated. 

63# They should be caught and handled less noisily than other errors. 

64_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE, errno.ETIMEDOUT) 

65 

66if hasattr(errno, "WSAECONNRESET"): 

67 _ERRNO_CONNRESET += ( # type: ignore 

68 errno.WSAECONNRESET, # type: ignore 

69 errno.WSAECONNABORTED, # type: ignore 

70 errno.WSAETIMEDOUT, # type: ignore 

71 ) 

72 

73if sys.platform == "darwin": 

74 # OSX appears to have a race condition that causes send(2) to return 

75 # EPROTOTYPE if called while a socket is being torn down: 

76 # http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/ 

77 # Since the socket is being closed anyway, treat this as an ECONNRESET 

78 # instead of an unexpected error. 

79 _ERRNO_CONNRESET += (errno.EPROTOTYPE,) # type: ignore 

80 

81_WINDOWS = sys.platform.startswith("win") 

82 

83 

84class StreamClosedError(IOError): 

85 """Exception raised by `IOStream` methods when the stream is closed. 

86 

87 Note that the close callback is scheduled to run *after* other 

88 callbacks on the stream (to allow for buffered data to be processed), 

89 so you may see this error before you see the close callback. 

90 

91 The ``real_error`` attribute contains the underlying error that caused 

92 the stream to close (if any). 

93 

94 .. versionchanged:: 4.3 

95 Added the ``real_error`` attribute. 

96 """ 

97 

98 def __init__(self, real_error: Optional[BaseException] = None) -> None: 

99 super().__init__("Stream is closed") 

100 self.real_error = real_error 

101 

102 

103class UnsatisfiableReadError(Exception): 

104 """Exception raised when a read cannot be satisfied. 

105 

106 Raised by ``read_until`` and ``read_until_regex`` with a ``max_bytes`` 

107 argument. 

108 """ 

109 

110 pass 

111 

112 

113class StreamBufferFullError(Exception): 

114 """Exception raised by `IOStream` methods when the buffer is full.""" 

115 

116 

117class _StreamBuffer(object): 

118 """ 

119 A specialized buffer that tries to avoid copies when large pieces 

120 of data are encountered. 

121 """ 

122 

123 def __init__(self) -> None: 

124 # A sequence of (False, bytearray) and (True, memoryview) objects 

125 self._buffers = ( 

126 collections.deque() 

127 ) # type: Deque[Tuple[bool, Union[bytearray, memoryview]]] 

128 # Position in the first buffer 

129 self._first_pos = 0 

130 self._size = 0 

131 

132 def __len__(self) -> int: 

133 return self._size 

134 

135 # Data above this size will be appended separately instead 

136 # of extending an existing bytearray 

137 _large_buf_threshold = 2048 

138 

139 def append(self, data: Union[bytes, bytearray, memoryview]) -> None: 

140 """ 

141 Append the given piece of data (should be a buffer-compatible object). 

142 """ 

143 size = len(data) 

144 if size > self._large_buf_threshold: 

145 if not isinstance(data, memoryview): 

146 data = memoryview(data) 

147 self._buffers.append((True, data)) 

148 elif size > 0: 

149 if self._buffers: 

150 is_memview, b = self._buffers[-1] 

151 new_buf = is_memview or len(b) >= self._large_buf_threshold 

152 else: 

153 new_buf = True 

154 if new_buf: 

155 self._buffers.append((False, bytearray(data))) 

156 else: 

157 b += data # type: ignore 

158 

159 self._size += size 

160 

161 def peek(self, size: int) -> memoryview: 

162 """ 

163 Get a view over at most ``size`` bytes (possibly fewer) at the 

164 current buffer position. 

165 """ 

166 assert size > 0 

167 try: 

168 is_memview, b = self._buffers[0] 

169 except IndexError: 

170 return memoryview(b"") 

171 

172 pos = self._first_pos 

173 if is_memview: 

174 return typing.cast(memoryview, b[pos : pos + size]) 

175 else: 

176 return memoryview(b)[pos : pos + size] 

177 

178 def advance(self, size: int) -> None: 

179 """ 

180 Advance the current buffer position by ``size`` bytes. 

181 """ 

182 assert 0 < size <= self._size 

183 self._size -= size 

184 pos = self._first_pos 

185 

186 buffers = self._buffers 

187 while buffers and size > 0: 

188 is_large, b = buffers[0] 

189 b_remain = len(b) - size - pos 

190 if b_remain <= 0: 

191 buffers.popleft() 

192 size -= len(b) - pos 

193 pos = 0 

194 elif is_large: 

195 pos += size 

196 size = 0 

197 else: 

198 # Amortized O(1) shrink for Python 2 

199 pos += size 

200 if len(b) <= 2 * pos: 

201 del typing.cast(bytearray, b)[:pos] 

202 pos = 0 

203 size = 0 

204 

205 assert size == 0 

206 self._first_pos = pos 

207 

208 

209class BaseIOStream(object): 

210 """A utility class to write to and read from a non-blocking file or socket. 

211 

212 We support a non-blocking ``write()`` and a family of ``read_*()`` 

213 methods. When the operation completes, the ``Awaitable`` will resolve 

214 with the data read (or ``None`` for ``write()``). All outstanding 

215 ``Awaitables`` will resolve with a `StreamClosedError` when the 

216 stream is closed; `.BaseIOStream.set_close_callback` can also be used 

217 to be notified of a closed stream. 

218 

219 When a stream is closed due to an error, the IOStream's ``error`` 

220 attribute contains the exception object. 

221 

222 Subclasses must implement `fileno`, `close_fd`, `write_to_fd`, 

223 `read_from_fd`, and optionally `get_fd_error`. 

224 

225 """ 

226 

227 def __init__( 

228 self, 

229 max_buffer_size: Optional[int] = None, 

230 read_chunk_size: Optional[int] = None, 

231 max_write_buffer_size: Optional[int] = None, 

232 ) -> None: 

233 """`BaseIOStream` constructor. 

234 

235 :arg max_buffer_size: Maximum amount of incoming data to buffer; 

236 defaults to 100MB. 

237 :arg read_chunk_size: Amount of data to read at one time from the 

238 underlying transport; defaults to 64KB. 

239 :arg max_write_buffer_size: Amount of outgoing data to buffer; 

240 defaults to unlimited. 

241 

242 .. versionchanged:: 4.0 

243 Add the ``max_write_buffer_size`` parameter. Changed default 

244 ``read_chunk_size`` to 64KB. 

245 .. versionchanged:: 5.0 

246 The ``io_loop`` argument (deprecated since version 4.1) has been 

247 removed. 

248 """ 

249 self.io_loop = ioloop.IOLoop.current() 

250 self.max_buffer_size = max_buffer_size or 104857600 

251 # A chunk size that is too close to max_buffer_size can cause 

252 # spurious failures. 

253 self.read_chunk_size = min(read_chunk_size or 65536, self.max_buffer_size // 2) 

254 self.max_write_buffer_size = max_write_buffer_size 

255 self.error = None # type: Optional[BaseException] 

256 self._read_buffer = bytearray() 

257 self._read_buffer_pos = 0 

258 self._read_buffer_size = 0 

259 self._user_read_buffer = False 

260 self._after_user_read_buffer = None # type: Optional[bytearray] 

261 self._write_buffer = _StreamBuffer() 

262 self._total_write_index = 0 

263 self._total_write_done_index = 0 

264 self._read_delimiter = None # type: Optional[bytes] 

265 self._read_regex = None # type: Optional[Pattern] 

266 self._read_max_bytes = None # type: Optional[int] 

267 self._read_bytes = None # type: Optional[int] 

268 self._read_partial = False 

269 self._read_until_close = False 

270 self._read_future = None # type: Optional[Future] 

271 self._write_futures = ( 

272 collections.deque() 

273 ) # type: Deque[Tuple[int, Future[None]]] 

274 self._close_callback = None # type: Optional[Callable[[], None]] 

275 self._connect_future = None # type: Optional[Future[IOStream]] 

276 # _ssl_connect_future should be defined in SSLIOStream 

277 # but it's here so we can clean it up in _signal_closed 

278 # TODO: refactor that so subclasses can add additional futures 

279 # to be cancelled. 

280 self._ssl_connect_future = None # type: Optional[Future[SSLIOStream]] 

281 self._connecting = False 

282 self._state = None # type: Optional[int] 

283 self._closed = False 

284 

285 def fileno(self) -> Union[int, ioloop._Selectable]: 

286 """Returns the file descriptor for this stream.""" 

287 raise NotImplementedError() 

288 

289 def close_fd(self) -> None: 

290 """Closes the file underlying this stream. 

291 

292 ``close_fd`` is called by `BaseIOStream` and should not be called 

293 elsewhere; other users should call `close` instead. 

294 """ 

295 raise NotImplementedError() 

296 

297 def write_to_fd(self, data: memoryview) -> int: 

298 """Attempts to write ``data`` to the underlying file. 

299 

300 Returns the number of bytes written. 

301 """ 

302 raise NotImplementedError() 

303 

304 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]: 

305 """Attempts to read from the underlying file. 

306 

307 Reads up to ``len(buf)`` bytes, storing them in the buffer. 

308 Returns the number of bytes read. Returns None if there was 

309 nothing to read (the socket returned `~errno.EWOULDBLOCK` or 

310 equivalent), and zero on EOF. 

311 

312 .. versionchanged:: 5.0 

313 

314 Interface redesigned to take a buffer and return a number 

315 of bytes instead of a freshly-allocated object. 

316 """ 

317 raise NotImplementedError() 

318 

319 def get_fd_error(self) -> Optional[Exception]: 

320 """Returns information about any error on the underlying file. 

321 

322 This method is called after the `.IOLoop` has signaled an error on the 

323 file descriptor, and should return an Exception (such as `socket.error` 

324 with additional information, or None if no such information is 

325 available. 

326 """ 

327 return None 

328 

329 def read_until_regex( 

330 self, regex: bytes, max_bytes: Optional[int] = None 

331 ) -> Awaitable[bytes]: 

332 """Asynchronously read until we have matched the given regex. 

333 

334 The result includes the data that matches the regex and anything 

335 that came before it. 

336 

337 If ``max_bytes`` is not None, the connection will be closed 

338 if more than ``max_bytes`` bytes have been read and the regex is 

339 not satisfied. 

340 

341 .. versionchanged:: 4.0 

342 Added the ``max_bytes`` argument. The ``callback`` argument is 

343 now optional and a `.Future` will be returned if it is omitted. 

344 

345 .. versionchanged:: 6.0 

346 

347 The ``callback`` argument was removed. Use the returned 

348 `.Future` instead. 

349 

350 """ 

351 future = self._start_read() 

352 self._read_regex = re.compile(regex) 

353 self._read_max_bytes = max_bytes 

354 try: 

355 self._try_inline_read() 

356 except UnsatisfiableReadError as e: 

357 # Handle this the same way as in _handle_events. 

358 gen_log.info("Unsatisfiable read, closing connection: %s" % e) 

359 self.close(exc_info=e) 

360 return future 

361 except: 

362 # Ensure that the future doesn't log an error because its 

363 # failure was never examined. 

364 future.add_done_callback(lambda f: f.exception()) 

365 raise 

366 return future 

367 

368 def read_until( 

369 self, delimiter: bytes, max_bytes: Optional[int] = None 

370 ) -> Awaitable[bytes]: 

371 """Asynchronously read until we have found the given delimiter. 

372 

373 The result includes all the data read including the delimiter. 

374 

375 If ``max_bytes`` is not None, the connection will be closed 

376 if more than ``max_bytes`` bytes have been read and the delimiter 

377 is not found. 

378 

379 .. versionchanged:: 4.0 

380 Added the ``max_bytes`` argument. The ``callback`` argument is 

381 now optional and a `.Future` will be returned if it is omitted. 

382 

383 .. versionchanged:: 6.0 

384 

385 The ``callback`` argument was removed. Use the returned 

386 `.Future` instead. 

387 """ 

388 future = self._start_read() 

389 self._read_delimiter = delimiter 

390 self._read_max_bytes = max_bytes 

391 try: 

392 self._try_inline_read() 

393 except UnsatisfiableReadError as e: 

394 # Handle this the same way as in _handle_events. 

395 gen_log.info("Unsatisfiable read, closing connection: %s" % e) 

396 self.close(exc_info=e) 

397 return future 

398 except: 

399 future.add_done_callback(lambda f: f.exception()) 

400 raise 

401 return future 

402 

403 def read_bytes(self, num_bytes: int, partial: bool = False) -> Awaitable[bytes]: 

404 """Asynchronously read a number of bytes. 

405 

406 If ``partial`` is true, data is returned as soon as we have 

407 any bytes to return (but never more than ``num_bytes``) 

408 

409 .. versionchanged:: 4.0 

410 Added the ``partial`` argument. The callback argument is now 

411 optional and a `.Future` will be returned if it is omitted. 

412 

413 .. versionchanged:: 6.0 

414 

415 The ``callback`` and ``streaming_callback`` arguments have 

416 been removed. Use the returned `.Future` (and 

417 ``partial=True`` for ``streaming_callback``) instead. 

418 

419 """ 

420 future = self._start_read() 

421 assert isinstance(num_bytes, numbers.Integral) 

422 self._read_bytes = num_bytes 

423 self._read_partial = partial 

424 try: 

425 self._try_inline_read() 

426 except: 

427 future.add_done_callback(lambda f: f.exception()) 

428 raise 

429 return future 

430 

431 def read_into(self, buf: bytearray, partial: bool = False) -> Awaitable[int]: 

432 """Asynchronously read a number of bytes. 

433 

434 ``buf`` must be a writable buffer into which data will be read. 

435 

436 If ``partial`` is true, the callback is run as soon as any bytes 

437 have been read. Otherwise, it is run when the ``buf`` has been 

438 entirely filled with read data. 

439 

440 .. versionadded:: 5.0 

441 

442 .. versionchanged:: 6.0 

443 

444 The ``callback`` argument was removed. Use the returned 

445 `.Future` instead. 

446 

447 """ 

448 future = self._start_read() 

449 

450 # First copy data already in read buffer 

451 available_bytes = self._read_buffer_size 

452 n = len(buf) 

453 if available_bytes >= n: 

454 end = self._read_buffer_pos + n 

455 buf[:] = memoryview(self._read_buffer)[self._read_buffer_pos : end] 

456 del self._read_buffer[:end] 

457 self._after_user_read_buffer = self._read_buffer 

458 elif available_bytes > 0: 

459 buf[:available_bytes] = memoryview(self._read_buffer)[ 

460 self._read_buffer_pos : 

461 ] 

462 

463 # Set up the supplied buffer as our temporary read buffer. 

464 # The original (if it had any data remaining) has been 

465 # saved for later. 

466 self._user_read_buffer = True 

467 self._read_buffer = buf 

468 self._read_buffer_pos = 0 

469 self._read_buffer_size = available_bytes 

470 self._read_bytes = n 

471 self._read_partial = partial 

472 

473 try: 

474 self._try_inline_read() 

475 except: 

476 future.add_done_callback(lambda f: f.exception()) 

477 raise 

478 return future 

479 

480 def read_until_close(self) -> Awaitable[bytes]: 

481 """Asynchronously reads all data from the socket until it is closed. 

482 

483 This will buffer all available data until ``max_buffer_size`` 

484 is reached. If flow control or cancellation are desired, use a 

485 loop with `read_bytes(partial=True) <.read_bytes>` instead. 

486 

487 .. versionchanged:: 4.0 

488 The callback argument is now optional and a `.Future` will 

489 be returned if it is omitted. 

490 

491 .. versionchanged:: 6.0 

492 

493 The ``callback`` and ``streaming_callback`` arguments have 

494 been removed. Use the returned `.Future` (and `read_bytes` 

495 with ``partial=True`` for ``streaming_callback``) instead. 

496 

497 """ 

498 future = self._start_read() 

499 if self.closed(): 

500 self._finish_read(self._read_buffer_size) 

501 return future 

502 self._read_until_close = True 

503 try: 

504 self._try_inline_read() 

505 except: 

506 future.add_done_callback(lambda f: f.exception()) 

507 raise 

508 return future 

509 

510 def write(self, data: Union[bytes, memoryview]) -> "Future[None]": 

511 """Asynchronously write the given data to this stream. 

512 

513 This method returns a `.Future` that resolves (with a result 

514 of ``None``) when the write has been completed. 

515 

516 The ``data`` argument may be of type `bytes` or `memoryview`. 

517 

518 .. versionchanged:: 4.0 

519 Now returns a `.Future` if no callback is given. 

520 

521 .. versionchanged:: 4.5 

522 Added support for `memoryview` arguments. 

523 

524 .. versionchanged:: 6.0 

525 

526 The ``callback`` argument was removed. Use the returned 

527 `.Future` instead. 

528 

529 """ 

530 self._check_closed() 

531 if data: 

532 if isinstance(data, memoryview): 

533 # Make sure that ``len(data) == data.nbytes`` 

534 data = memoryview(data).cast("B") 

535 if ( 

536 self.max_write_buffer_size is not None 

537 and len(self._write_buffer) + len(data) > self.max_write_buffer_size 

538 ): 

539 raise StreamBufferFullError("Reached maximum write buffer size") 

540 self._write_buffer.append(data) 

541 self._total_write_index += len(data) 

542 future = Future() # type: Future[None] 

543 future.add_done_callback(lambda f: f.exception()) 

544 self._write_futures.append((self._total_write_index, future)) 

545 if not self._connecting: 

546 self._handle_write() 

547 if self._write_buffer: 

548 self._add_io_state(self.io_loop.WRITE) 

549 self._maybe_add_error_listener() 

550 return future 

551 

552 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None: 

553 """Call the given callback when the stream is closed. 

554 

555 This mostly is not necessary for applications that use the 

556 `.Future` interface; all outstanding ``Futures`` will resolve 

557 with a `StreamClosedError` when the stream is closed. However, 

558 it is still useful as a way to signal that the stream has been 

559 closed while no other read or write is in progress. 

560 

561 Unlike other callback-based interfaces, ``set_close_callback`` 

562 was not removed in Tornado 6.0. 

563 """ 

564 self._close_callback = callback 

565 self._maybe_add_error_listener() 

566 

567 def close( 

568 self, 

569 exc_info: Union[ 

570 None, 

571 bool, 

572 BaseException, 

573 Tuple[ 

574 "Optional[Type[BaseException]]", 

575 Optional[BaseException], 

576 Optional[TracebackType], 

577 ], 

578 ] = False, 

579 ) -> None: 

580 """Close this stream. 

581 

582 If ``exc_info`` is true, set the ``error`` attribute to the current 

583 exception from `sys.exc_info` (or if ``exc_info`` is a tuple, 

584 use that instead of `sys.exc_info`). 

585 """ 

586 if not self.closed(): 

587 if exc_info: 

588 if isinstance(exc_info, tuple): 

589 self.error = exc_info[1] 

590 elif isinstance(exc_info, BaseException): 

591 self.error = exc_info 

592 else: 

593 exc_info = sys.exc_info() 

594 if any(exc_info): 

595 self.error = exc_info[1] 

596 if self._read_until_close: 

597 self._read_until_close = False 

598 self._finish_read(self._read_buffer_size) 

599 elif self._read_future is not None: 

600 # resolve reads that are pending and ready to complete 

601 try: 

602 pos = self._find_read_pos() 

603 except UnsatisfiableReadError: 

604 pass 

605 else: 

606 if pos is not None: 

607 self._read_from_buffer(pos) 

608 if self._state is not None: 

609 self.io_loop.remove_handler(self.fileno()) 

610 self._state = None 

611 self.close_fd() 

612 self._closed = True 

613 self._signal_closed() 

614 

615 def _signal_closed(self) -> None: 

616 futures = [] # type: List[Future] 

617 if self._read_future is not None: 

618 futures.append(self._read_future) 

619 self._read_future = None 

620 futures += [future for _, future in self._write_futures] 

621 self._write_futures.clear() 

622 if self._connect_future is not None: 

623 futures.append(self._connect_future) 

624 self._connect_future = None 

625 for future in futures: 

626 if not future.done(): 

627 future.set_exception(StreamClosedError(real_error=self.error)) 

628 # Reference the exception to silence warnings. Annoyingly, 

629 # this raises if the future was cancelled, but just 

630 # returns any other error. 

631 try: 

632 future.exception() 

633 except asyncio.CancelledError: 

634 pass 

635 if self._ssl_connect_future is not None: 

636 # _ssl_connect_future expects to see the real exception (typically 

637 # an ssl.SSLError), not just StreamClosedError. 

638 if not self._ssl_connect_future.done(): 

639 if self.error is not None: 

640 self._ssl_connect_future.set_exception(self.error) 

641 else: 

642 self._ssl_connect_future.set_exception(StreamClosedError()) 

643 self._ssl_connect_future.exception() 

644 self._ssl_connect_future = None 

645 if self._close_callback is not None: 

646 cb = self._close_callback 

647 self._close_callback = None 

648 self.io_loop.add_callback(cb) 

649 # Clear the buffers so they can be cleared immediately even 

650 # if the IOStream object is kept alive by a reference cycle. 

651 # TODO: Clear the read buffer too; it currently breaks some tests. 

652 self._write_buffer = None # type: ignore 

653 

654 def reading(self) -> bool: 

655 """Returns ``True`` if we are currently reading from the stream.""" 

656 return self._read_future is not None 

657 

658 def writing(self) -> bool: 

659 """Returns ``True`` if we are currently writing to the stream.""" 

660 return bool(self._write_buffer) 

661 

662 def closed(self) -> bool: 

663 """Returns ``True`` if the stream has been closed.""" 

664 return self._closed 

665 

666 def set_nodelay(self, value: bool) -> None: 

667 """Sets the no-delay flag for this stream. 

668 

669 By default, data written to TCP streams may be held for a time 

670 to make the most efficient use of bandwidth (according to 

671 Nagle's algorithm). The no-delay flag requests that data be 

672 written as soon as possible, even if doing so would consume 

673 additional bandwidth. 

674 

675 This flag is currently defined only for TCP-based ``IOStreams``. 

676 

677 .. versionadded:: 3.1 

678 """ 

679 pass 

680 

681 def _handle_connect(self) -> None: 

682 raise NotImplementedError() 

683 

684 def _handle_events(self, fd: Union[int, ioloop._Selectable], events: int) -> None: 

685 if self.closed(): 

686 gen_log.warning("Got events for closed stream %s", fd) 

687 return 

688 try: 

689 if self._connecting: 

690 # Most IOLoops will report a write failed connect 

691 # with the WRITE event, but SelectIOLoop reports a 

692 # READ as well so we must check for connecting before 

693 # either. 

694 self._handle_connect() 

695 if self.closed(): 

696 return 

697 if events & self.io_loop.READ: 

698 self._handle_read() 

699 if self.closed(): 

700 return 

701 if events & self.io_loop.WRITE: 

702 self._handle_write() 

703 if self.closed(): 

704 return 

705 if events & self.io_loop.ERROR: 

706 self.error = self.get_fd_error() 

707 # We may have queued up a user callback in _handle_read or 

708 # _handle_write, so don't close the IOStream until those 

709 # callbacks have had a chance to run. 

710 self.io_loop.add_callback(self.close) 

711 return 

712 state = self.io_loop.ERROR 

713 if self.reading(): 

714 state |= self.io_loop.READ 

715 if self.writing(): 

716 state |= self.io_loop.WRITE 

717 if state == self.io_loop.ERROR and self._read_buffer_size == 0: 

718 # If the connection is idle, listen for reads too so 

719 # we can tell if the connection is closed. If there is 

720 # data in the read buffer we won't run the close callback 

721 # yet anyway, so we don't need to listen in this case. 

722 state |= self.io_loop.READ 

723 if state != self._state: 

724 assert ( 

725 self._state is not None 

726 ), "shouldn't happen: _handle_events without self._state" 

727 self._state = state 

728 self.io_loop.update_handler(self.fileno(), self._state) 

729 except UnsatisfiableReadError as e: 

730 gen_log.info("Unsatisfiable read, closing connection: %s" % e) 

731 self.close(exc_info=e) 

732 except Exception as e: 

733 gen_log.error("Uncaught exception, closing connection.", exc_info=True) 

734 self.close(exc_info=e) 

735 raise 

736 

737 def _read_to_buffer_loop(self) -> Optional[int]: 

738 # This method is called from _handle_read and _try_inline_read. 

739 if self._read_bytes is not None: 

740 target_bytes = self._read_bytes # type: Optional[int] 

741 elif self._read_max_bytes is not None: 

742 target_bytes = self._read_max_bytes 

743 elif self.reading(): 

744 # For read_until without max_bytes, or 

745 # read_until_close, read as much as we can before 

746 # scanning for the delimiter. 

747 target_bytes = None 

748 else: 

749 target_bytes = 0 

750 next_find_pos = 0 

751 while not self.closed(): 

752 # Read from the socket until we get EWOULDBLOCK or equivalent. 

753 # SSL sockets do some internal buffering, and if the data is 

754 # sitting in the SSL object's buffer select() and friends 

755 # can't see it; the only way to find out if it's there is to 

756 # try to read it. 

757 if self._read_to_buffer() == 0: 

758 break 

759 

760 # If we've read all the bytes we can use, break out of 

761 # this loop. 

762 

763 # If we've reached target_bytes, we know we're done. 

764 if target_bytes is not None and self._read_buffer_size >= target_bytes: 

765 break 

766 

767 # Otherwise, we need to call the more expensive find_read_pos. 

768 # It's inefficient to do this on every read, so instead 

769 # do it on the first read and whenever the read buffer 

770 # size has doubled. 

771 if self._read_buffer_size >= next_find_pos: 

772 pos = self._find_read_pos() 

773 if pos is not None: 

774 return pos 

775 next_find_pos = self._read_buffer_size * 2 

776 return self._find_read_pos() 

777 

778 def _handle_read(self) -> None: 

779 try: 

780 pos = self._read_to_buffer_loop() 

781 except UnsatisfiableReadError: 

782 raise 

783 except asyncio.CancelledError: 

784 raise 

785 except Exception as e: 

786 gen_log.warning("error on read: %s" % e) 

787 self.close(exc_info=e) 

788 return 

789 if pos is not None: 

790 self._read_from_buffer(pos) 

791 

792 def _start_read(self) -> Future: 

793 if self._read_future is not None: 

794 # It is an error to start a read while a prior read is unresolved. 

795 # However, if the prior read is unresolved because the stream was 

796 # closed without satisfying it, it's better to raise 

797 # StreamClosedError instead of AssertionError. In particular, this 

798 # situation occurs in harmless situations in http1connection.py and 

799 # an AssertionError would be logged noisily. 

800 # 

801 # On the other hand, it is legal to start a new read while the 

802 # stream is closed, in case the read can be satisfied from the 

803 # read buffer. So we only want to check the closed status of the 

804 # stream if we need to decide what kind of error to raise for 

805 # "already reading". 

806 # 

807 # These conditions have proven difficult to test; we have no 

808 # unittests that reliably verify this behavior so be careful 

809 # when making changes here. See #2651 and #2719. 

810 self._check_closed() 

811 assert self._read_future is None, "Already reading" 

812 self._read_future = Future() 

813 return self._read_future 

814 

815 def _finish_read(self, size: int) -> None: 

816 if self._user_read_buffer: 

817 self._read_buffer = self._after_user_read_buffer or bytearray() 

818 self._after_user_read_buffer = None 

819 self._read_buffer_pos = 0 

820 self._read_buffer_size = len(self._read_buffer) 

821 self._user_read_buffer = False 

822 result = size # type: Union[int, bytes] 

823 else: 

824 result = self._consume(size) 

825 if self._read_future is not None: 

826 future = self._read_future 

827 self._read_future = None 

828 future_set_result_unless_cancelled(future, result) 

829 self._maybe_add_error_listener() 

830 

831 def _try_inline_read(self) -> None: 

832 """Attempt to complete the current read operation from buffered data. 

833 

834 If the read can be completed without blocking, schedules the 

835 read callback on the next IOLoop iteration; otherwise starts 

836 listening for reads on the socket. 

837 """ 

838 # See if we've already got the data from a previous read 

839 pos = self._find_read_pos() 

840 if pos is not None: 

841 self._read_from_buffer(pos) 

842 return 

843 self._check_closed() 

844 pos = self._read_to_buffer_loop() 

845 if pos is not None: 

846 self._read_from_buffer(pos) 

847 return 

848 # We couldn't satisfy the read inline, so make sure we're 

849 # listening for new data unless the stream is closed. 

850 if not self.closed(): 

851 self._add_io_state(ioloop.IOLoop.READ) 

852 

853 def _read_to_buffer(self) -> Optional[int]: 

854 """Reads from the socket and appends the result to the read buffer. 

855 

856 Returns the number of bytes read. Returns 0 if there is nothing 

857 to read (i.e. the read returns EWOULDBLOCK or equivalent). On 

858 error closes the socket and raises an exception. 

859 """ 

860 try: 

861 while True: 

862 try: 

863 if self._user_read_buffer: 

864 buf = memoryview(self._read_buffer)[ 

865 self._read_buffer_size : 

866 ] # type: Union[memoryview, bytearray] 

867 else: 

868 buf = bytearray(self.read_chunk_size) 

869 bytes_read = self.read_from_fd(buf) 

870 except (socket.error, IOError, OSError) as e: 

871 # ssl.SSLError is a subclass of socket.error 

872 if self._is_connreset(e): 

873 # Treat ECONNRESET as a connection close rather than 

874 # an error to minimize log spam (the exception will 

875 # be available on self.error for apps that care). 

876 self.close(exc_info=e) 

877 return None 

878 self.close(exc_info=e) 

879 raise 

880 break 

881 if bytes_read is None: 

882 return 0 

883 elif bytes_read == 0: 

884 self.close() 

885 return 0 

886 if not self._user_read_buffer: 

887 self._read_buffer += memoryview(buf)[:bytes_read] 

888 self._read_buffer_size += bytes_read 

889 finally: 

890 # Break the reference to buf so we don't waste a chunk's worth of 

891 # memory in case an exception hangs on to our stack frame. 

892 del buf 

893 if self._read_buffer_size > self.max_buffer_size: 

894 gen_log.error("Reached maximum read buffer size") 

895 self.close() 

896 raise StreamBufferFullError("Reached maximum read buffer size") 

897 return bytes_read 

898 

899 def _read_from_buffer(self, pos: int) -> None: 

900 """Attempts to complete the currently-pending read from the buffer. 

901 

902 The argument is either a position in the read buffer or None, 

903 as returned by _find_read_pos. 

904 """ 

905 self._read_bytes = self._read_delimiter = self._read_regex = None 

906 self._read_partial = False 

907 self._finish_read(pos) 

908 

909 def _find_read_pos(self) -> Optional[int]: 

910 """Attempts to find a position in the read buffer that satisfies 

911 the currently-pending read. 

912 

913 Returns a position in the buffer if the current read can be satisfied, 

914 or None if it cannot. 

915 """ 

916 if self._read_bytes is not None and ( 

917 self._read_buffer_size >= self._read_bytes 

918 or (self._read_partial and self._read_buffer_size > 0) 

919 ): 

920 num_bytes = min(self._read_bytes, self._read_buffer_size) 

921 return num_bytes 

922 elif self._read_delimiter is not None: 

923 # Multi-byte delimiters (e.g. '\r\n') may straddle two 

924 # chunks in the read buffer, so we can't easily find them 

925 # without collapsing the buffer. However, since protocols 

926 # using delimited reads (as opposed to reads of a known 

927 # length) tend to be "line" oriented, the delimiter is likely 

928 # to be in the first few chunks. Merge the buffer gradually 

929 # since large merges are relatively expensive and get undone in 

930 # _consume(). 

931 if self._read_buffer: 

932 loc = self._read_buffer.find( 

933 self._read_delimiter, self._read_buffer_pos 

934 ) 

935 if loc != -1: 

936 loc -= self._read_buffer_pos 

937 delimiter_len = len(self._read_delimiter) 

938 self._check_max_bytes(self._read_delimiter, loc + delimiter_len) 

939 return loc + delimiter_len 

940 self._check_max_bytes(self._read_delimiter, self._read_buffer_size) 

941 elif self._read_regex is not None: 

942 if self._read_buffer: 

943 m = self._read_regex.search(self._read_buffer, self._read_buffer_pos) 

944 if m is not None: 

945 loc = m.end() - self._read_buffer_pos 

946 self._check_max_bytes(self._read_regex, loc) 

947 return loc 

948 self._check_max_bytes(self._read_regex, self._read_buffer_size) 

949 return None 

950 

951 def _check_max_bytes(self, delimiter: Union[bytes, Pattern], size: int) -> None: 

952 if self._read_max_bytes is not None and size > self._read_max_bytes: 

953 raise UnsatisfiableReadError( 

954 "delimiter %r not found within %d bytes" 

955 % (delimiter, self._read_max_bytes) 

956 ) 

957 

958 def _handle_write(self) -> None: 

959 while True: 

960 size = len(self._write_buffer) 

961 if not size: 

962 break 

963 assert size > 0 

964 try: 

965 if _WINDOWS: 

966 # On windows, socket.send blows up if given a 

967 # write buffer that's too large, instead of just 

968 # returning the number of bytes it was able to 

969 # process. Therefore we must not call socket.send 

970 # with more than 128KB at a time. 

971 size = 128 * 1024 

972 

973 num_bytes = self.write_to_fd(self._write_buffer.peek(size)) 

974 if num_bytes == 0: 

975 break 

976 self._write_buffer.advance(num_bytes) 

977 self._total_write_done_index += num_bytes 

978 except BlockingIOError: 

979 break 

980 except (socket.error, IOError, OSError) as e: 

981 if not self._is_connreset(e): 

982 # Broken pipe errors are usually caused by connection 

983 # reset, and its better to not log EPIPE errors to 

984 # minimize log spam 

985 gen_log.warning("Write error on %s: %s", self.fileno(), e) 

986 self.close(exc_info=e) 

987 return 

988 

989 while self._write_futures: 

990 index, future = self._write_futures[0] 

991 if index > self._total_write_done_index: 

992 break 

993 self._write_futures.popleft() 

994 future_set_result_unless_cancelled(future, None) 

995 

996 def _consume(self, loc: int) -> bytes: 

997 # Consume loc bytes from the read buffer and return them 

998 if loc == 0: 

999 return b"" 

1000 assert loc <= self._read_buffer_size 

1001 # Slice the bytearray buffer into bytes, without intermediate copying 

1002 b = ( 

1003 memoryview(self._read_buffer)[ 

1004 self._read_buffer_pos : self._read_buffer_pos + loc 

1005 ] 

1006 ).tobytes() 

1007 self._read_buffer_pos += loc 

1008 self._read_buffer_size -= loc 

1009 # Amortized O(1) shrink 

1010 # (this heuristic is implemented natively in Python 3.4+ 

1011 # but is replicated here for Python 2) 

1012 if self._read_buffer_pos > self._read_buffer_size: 

1013 del self._read_buffer[: self._read_buffer_pos] 

1014 self._read_buffer_pos = 0 

1015 return b 

1016 

1017 def _check_closed(self) -> None: 

1018 if self.closed(): 

1019 raise StreamClosedError(real_error=self.error) 

1020 

1021 def _maybe_add_error_listener(self) -> None: 

1022 # This method is part of an optimization: to detect a connection that 

1023 # is closed when we're not actively reading or writing, we must listen 

1024 # for read events. However, it is inefficient to do this when the 

1025 # connection is first established because we are going to read or write 

1026 # immediately anyway. Instead, we insert checks at various times to 

1027 # see if the connection is idle and add the read listener then. 

1028 if self._state is None or self._state == ioloop.IOLoop.ERROR: 

1029 if ( 

1030 not self.closed() 

1031 and self._read_buffer_size == 0 

1032 and self._close_callback is not None 

1033 ): 

1034 self._add_io_state(ioloop.IOLoop.READ) 

1035 

1036 def _add_io_state(self, state: int) -> None: 

1037 """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler. 

1038 

1039 Implementation notes: Reads and writes have a fast path and a 

1040 slow path. The fast path reads synchronously from socket 

1041 buffers, while the slow path uses `_add_io_state` to schedule 

1042 an IOLoop callback. 

1043 

1044 To detect closed connections, we must have called 

1045 `_add_io_state` at some point, but we want to delay this as 

1046 much as possible so we don't have to set an `IOLoop.ERROR` 

1047 listener that will be overwritten by the next slow-path 

1048 operation. If a sequence of fast-path ops do not end in a 

1049 slow-path op, (e.g. for an @asynchronous long-poll request), 

1050 we must add the error handler. 

1051 

1052 TODO: reevaluate this now that callbacks are gone. 

1053 

1054 """ 

1055 if self.closed(): 

1056 # connection has been closed, so there can be no future events 

1057 return 

1058 if self._state is None: 

1059 self._state = ioloop.IOLoop.ERROR | state 

1060 self.io_loop.add_handler(self.fileno(), self._handle_events, self._state) 

1061 elif not self._state & state: 

1062 self._state = self._state | state 

1063 self.io_loop.update_handler(self.fileno(), self._state) 

1064 

1065 def _is_connreset(self, exc: BaseException) -> bool: 

1066 """Return ``True`` if exc is ECONNRESET or equivalent. 

1067 

1068 May be overridden in subclasses. 

1069 """ 

1070 return ( 

1071 isinstance(exc, (socket.error, IOError)) 

1072 and errno_from_exception(exc) in _ERRNO_CONNRESET 

1073 ) 

1074 

1075 

1076class IOStream(BaseIOStream): 

1077 r"""Socket-based `IOStream` implementation. 

1078 

1079 This class supports the read and write methods from `BaseIOStream` 

1080 plus a `connect` method. 

1081 

1082 The ``socket`` parameter may either be connected or unconnected. 

1083 For server operations the socket is the result of calling 

1084 `socket.accept <socket.socket.accept>`. For client operations the 

1085 socket is created with `socket.socket`, and may either be 

1086 connected before passing it to the `IOStream` or connected with 

1087 `IOStream.connect`. 

1088 

1089 A very simple (and broken) HTTP client using this class: 

1090 

1091 .. testcode:: 

1092 

1093 import tornado.ioloop 

1094 import tornado.iostream 

1095 import socket 

1096 

1097 async def main(): 

1098 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 

1099 stream = tornado.iostream.IOStream(s) 

1100 await stream.connect(("friendfeed.com", 80)) 

1101 await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n") 

1102 header_data = await stream.read_until(b"\r\n\r\n") 

1103 headers = {} 

1104 for line in header_data.split(b"\r\n"): 

1105 parts = line.split(b":") 

1106 if len(parts) == 2: 

1107 headers[parts[0].strip()] = parts[1].strip() 

1108 body_data = await stream.read_bytes(int(headers[b"Content-Length"])) 

1109 print(body_data) 

1110 stream.close() 

1111 

1112 if __name__ == '__main__': 

1113 asyncio.run(main()) 

1114 

1115 .. testoutput:: 

1116 :hide: 

1117 

1118 """ 

1119 

1120 def __init__(self, socket: socket.socket, *args: Any, **kwargs: Any) -> None: 

1121 self.socket = socket 

1122 self.socket.setblocking(False) 

1123 super().__init__(*args, **kwargs) 

1124 

1125 def fileno(self) -> Union[int, ioloop._Selectable]: 

1126 return self.socket 

1127 

1128 def close_fd(self) -> None: 

1129 self.socket.close() 

1130 self.socket = None # type: ignore 

1131 

1132 def get_fd_error(self) -> Optional[Exception]: 

1133 errno = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

1134 return socket.error(errno, os.strerror(errno)) 

1135 

1136 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]: 

1137 try: 

1138 return self.socket.recv_into(buf, len(buf)) 

1139 except BlockingIOError: 

1140 return None 

1141 finally: 

1142 del buf 

1143 

1144 def write_to_fd(self, data: memoryview) -> int: 

1145 try: 

1146 return self.socket.send(data) # type: ignore 

1147 finally: 

1148 # Avoid keeping to data, which can be a memoryview. 

1149 # See https://github.com/tornadoweb/tornado/pull/2008 

1150 del data 

1151 

1152 def connect( 

1153 self: _IOStreamType, address: Any, server_hostname: Optional[str] = None 

1154 ) -> "Future[_IOStreamType]": 

1155 """Connects the socket to a remote address without blocking. 

1156 

1157 May only be called if the socket passed to the constructor was 

1158 not previously connected. The address parameter is in the 

1159 same format as for `socket.connect <socket.socket.connect>` for 

1160 the type of socket passed to the IOStream constructor, 

1161 e.g. an ``(ip, port)`` tuple. Hostnames are accepted here, 

1162 but will be resolved synchronously and block the IOLoop. 

1163 If you have a hostname instead of an IP address, the `.TCPClient` 

1164 class is recommended instead of calling this method directly. 

1165 `.TCPClient` will do asynchronous DNS resolution and handle 

1166 both IPv4 and IPv6. 

1167 

1168 If ``callback`` is specified, it will be called with no 

1169 arguments when the connection is completed; if not this method 

1170 returns a `.Future` (whose result after a successful 

1171 connection will be the stream itself). 

1172 

1173 In SSL mode, the ``server_hostname`` parameter will be used 

1174 for certificate validation (unless disabled in the 

1175 ``ssl_options``) and SNI (if supported; requires Python 

1176 2.7.9+). 

1177 

1178 Note that it is safe to call `IOStream.write 

1179 <BaseIOStream.write>` while the connection is pending, in 

1180 which case the data will be written as soon as the connection 

1181 is ready. Calling `IOStream` read methods before the socket is 

1182 connected works on some platforms but is non-portable. 

1183 

1184 .. versionchanged:: 4.0 

1185 If no callback is given, returns a `.Future`. 

1186 

1187 .. versionchanged:: 4.2 

1188 SSL certificates are validated by default; pass 

1189 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a 

1190 suitably-configured `ssl.SSLContext` to the 

1191 `SSLIOStream` constructor to disable. 

1192 

1193 .. versionchanged:: 6.0 

1194 

1195 The ``callback`` argument was removed. Use the returned 

1196 `.Future` instead. 

1197 

1198 """ 

1199 self._connecting = True 

1200 future = Future() # type: Future[_IOStreamType] 

1201 self._connect_future = typing.cast("Future[IOStream]", future) 

1202 try: 

1203 self.socket.connect(address) 

1204 except BlockingIOError: 

1205 # In non-blocking mode we expect connect() to raise an 

1206 # exception with EINPROGRESS or EWOULDBLOCK. 

1207 pass 

1208 except socket.error as e: 

1209 # On freebsd, other errors such as ECONNREFUSED may be 

1210 # returned immediately when attempting to connect to 

1211 # localhost, so handle them the same way as an error 

1212 # reported later in _handle_connect. 

1213 if future is None: 

1214 gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), e) 

1215 self.close(exc_info=e) 

1216 return future 

1217 self._add_io_state(self.io_loop.WRITE) 

1218 return future 

1219 

1220 def start_tls( 

1221 self, 

1222 server_side: bool, 

1223 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, 

1224 server_hostname: Optional[str] = None, 

1225 ) -> Awaitable["SSLIOStream"]: 

1226 """Convert this `IOStream` to an `SSLIOStream`. 

1227 

1228 This enables protocols that begin in clear-text mode and 

1229 switch to SSL after some initial negotiation (such as the 

1230 ``STARTTLS`` extension to SMTP and IMAP). 

1231 

1232 This method cannot be used if there are outstanding reads 

1233 or writes on the stream, or if there is any data in the 

1234 IOStream's buffer (data in the operating system's socket 

1235 buffer is allowed). This means it must generally be used 

1236 immediately after reading or writing the last clear-text 

1237 data. It can also be used immediately after connecting, 

1238 before any reads or writes. 

1239 

1240 The ``ssl_options`` argument may be either an `ssl.SSLContext` 

1241 object or a dictionary of keyword arguments for the 

1242 `ssl.wrap_socket` function. The ``server_hostname`` argument 

1243 will be used for certificate validation unless disabled 

1244 in the ``ssl_options``. 

1245 

1246 This method returns a `.Future` whose result is the new 

1247 `SSLIOStream`. After this method has been called, 

1248 any other operation on the original stream is undefined. 

1249 

1250 If a close callback is defined on this stream, it will be 

1251 transferred to the new stream. 

1252 

1253 .. versionadded:: 4.0 

1254 

1255 .. versionchanged:: 4.2 

1256 SSL certificates are validated by default; pass 

1257 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a 

1258 suitably-configured `ssl.SSLContext` to disable. 

1259 """ 

1260 if ( 

1261 self._read_future 

1262 or self._write_futures 

1263 or self._connect_future 

1264 or self._closed 

1265 or self._read_buffer 

1266 or self._write_buffer 

1267 ): 

1268 raise ValueError("IOStream is not idle; cannot convert to SSL") 

1269 if ssl_options is None: 

1270 if server_side: 

1271 ssl_options = _server_ssl_defaults 

1272 else: 

1273 ssl_options = _client_ssl_defaults 

1274 

1275 socket = self.socket 

1276 self.io_loop.remove_handler(socket) 

1277 self.socket = None # type: ignore 

1278 socket = ssl_wrap_socket( 

1279 socket, 

1280 ssl_options, 

1281 server_hostname=server_hostname, 

1282 server_side=server_side, 

1283 do_handshake_on_connect=False, 

1284 ) 

1285 orig_close_callback = self._close_callback 

1286 self._close_callback = None 

1287 

1288 future = Future() # type: Future[SSLIOStream] 

1289 ssl_stream = SSLIOStream(socket, ssl_options=ssl_options) 

1290 ssl_stream.set_close_callback(orig_close_callback) 

1291 ssl_stream._ssl_connect_future = future 

1292 ssl_stream.max_buffer_size = self.max_buffer_size 

1293 ssl_stream.read_chunk_size = self.read_chunk_size 

1294 return future 

1295 

1296 def _handle_connect(self) -> None: 

1297 try: 

1298 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

1299 except socket.error as e: 

1300 # Hurd doesn't allow SO_ERROR for loopback sockets because all 

1301 # errors for such sockets are reported synchronously. 

1302 if errno_from_exception(e) == errno.ENOPROTOOPT: 

1303 err = 0 

1304 if err != 0: 

1305 self.error = socket.error(err, os.strerror(err)) 

1306 # IOLoop implementations may vary: some of them return 

1307 # an error state before the socket becomes writable, so 

1308 # in that case a connection failure would be handled by the 

1309 # error path in _handle_events instead of here. 

1310 if self._connect_future is None: 

1311 gen_log.warning( 

1312 "Connect error on fd %s: %s", 

1313 self.socket.fileno(), 

1314 errno.errorcode[err], 

1315 ) 

1316 self.close() 

1317 return 

1318 if self._connect_future is not None: 

1319 future = self._connect_future 

1320 self._connect_future = None 

1321 future_set_result_unless_cancelled(future, self) 

1322 self._connecting = False 

1323 

1324 def set_nodelay(self, value: bool) -> None: 

1325 if self.socket is not None and self.socket.family in ( 

1326 socket.AF_INET, 

1327 socket.AF_INET6, 

1328 ): 

1329 try: 

1330 self.socket.setsockopt( 

1331 socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 if value else 0 

1332 ) 

1333 except socket.error as e: 

1334 # Sometimes setsockopt will fail if the socket is closed 

1335 # at the wrong time. This can happen with HTTPServer 

1336 # resetting the value to ``False`` between requests. 

1337 if e.errno != errno.EINVAL and not self._is_connreset(e): 

1338 raise 

1339 

1340 

1341class SSLIOStream(IOStream): 

1342 """A utility class to write to and read from a non-blocking SSL socket. 

1343 

1344 If the socket passed to the constructor is already connected, 

1345 it should be wrapped with:: 

1346 

1347 ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs) 

1348 

1349 before constructing the `SSLIOStream`. Unconnected sockets will be 

1350 wrapped when `IOStream.connect` is finished. 

1351 """ 

1352 

1353 socket = None # type: ssl.SSLSocket 

1354 

1355 def __init__(self, *args: Any, **kwargs: Any) -> None: 

1356 """The ``ssl_options`` keyword argument may either be an 

1357 `ssl.SSLContext` object or a dictionary of keywords arguments 

1358 for `ssl.wrap_socket` 

1359 """ 

1360 self._ssl_options = kwargs.pop("ssl_options", _client_ssl_defaults) 

1361 super().__init__(*args, **kwargs) 

1362 self._ssl_accepting = True 

1363 self._handshake_reading = False 

1364 self._handshake_writing = False 

1365 self._server_hostname = None # type: Optional[str] 

1366 

1367 # If the socket is already connected, attempt to start the handshake. 

1368 try: 

1369 self.socket.getpeername() 

1370 except socket.error: 

1371 pass 

1372 else: 

1373 # Indirectly start the handshake, which will run on the next 

1374 # IOLoop iteration and then the real IO state will be set in 

1375 # _handle_events. 

1376 self._add_io_state(self.io_loop.WRITE) 

1377 

1378 def reading(self) -> bool: 

1379 return self._handshake_reading or super().reading() 

1380 

1381 def writing(self) -> bool: 

1382 return self._handshake_writing or super().writing() 

1383 

1384 def _do_ssl_handshake(self) -> None: 

1385 # Based on code from test_ssl.py in the python stdlib 

1386 try: 

1387 self._handshake_reading = False 

1388 self._handshake_writing = False 

1389 self.socket.do_handshake() 

1390 except ssl.SSLError as err: 

1391 if err.args[0] == ssl.SSL_ERROR_WANT_READ: 

1392 self._handshake_reading = True 

1393 return 

1394 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: 

1395 self._handshake_writing = True 

1396 return 

1397 elif err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): 

1398 return self.close(exc_info=err) 

1399 elif err.args[0] == ssl.SSL_ERROR_SSL: 

1400 try: 

1401 peer = self.socket.getpeername() 

1402 except Exception: 

1403 peer = "(not connected)" 

1404 gen_log.warning( 

1405 "SSL Error on %s %s: %s", self.socket.fileno(), peer, err 

1406 ) 

1407 return self.close(exc_info=err) 

1408 raise 

1409 except ssl.CertificateError as err: 

1410 # CertificateError can happen during handshake (hostname 

1411 # verification) and should be passed to user. Starting 

1412 # in Python 3.7, this error is a subclass of SSLError 

1413 # and will be handled by the previous block instead. 

1414 return self.close(exc_info=err) 

1415 except socket.error as err: 

1416 # Some port scans (e.g. nmap in -sT mode) have been known 

1417 # to cause do_handshake to raise EBADF and ENOTCONN, so make 

1418 # those errors quiet as well. 

1419 # https://groups.google.com/forum/?fromgroups#!topic/python-tornado/ApucKJat1_0 

1420 # Errno 0 is also possible in some cases (nc -z). 

1421 # https://github.com/tornadoweb/tornado/issues/2504 

1422 if self._is_connreset(err) or err.args[0] in ( 

1423 0, 

1424 errno.EBADF, 

1425 errno.ENOTCONN, 

1426 ): 

1427 return self.close(exc_info=err) 

1428 raise 

1429 except AttributeError as err: 

1430 # On Linux, if the connection was reset before the call to 

1431 # wrap_socket, do_handshake will fail with an 

1432 # AttributeError. 

1433 return self.close(exc_info=err) 

1434 else: 

1435 self._ssl_accepting = False 

1436 if not self._verify_cert(self.socket.getpeercert()): 

1437 self.close() 

1438 return 

1439 self._finish_ssl_connect() 

1440 

1441 def _finish_ssl_connect(self) -> None: 

1442 if self._ssl_connect_future is not None: 

1443 future = self._ssl_connect_future 

1444 self._ssl_connect_future = None 

1445 future_set_result_unless_cancelled(future, self) 

1446 

1447 def _verify_cert(self, peercert: Any) -> bool: 

1448 """Returns ``True`` if peercert is valid according to the configured 

1449 validation mode and hostname. 

1450 

1451 The ssl handshake already tested the certificate for a valid 

1452 CA signature; the only thing that remains is to check 

1453 the hostname. 

1454 """ 

1455 if isinstance(self._ssl_options, dict): 

1456 verify_mode = self._ssl_options.get("cert_reqs", ssl.CERT_NONE) 

1457 elif isinstance(self._ssl_options, ssl.SSLContext): 

1458 verify_mode = self._ssl_options.verify_mode 

1459 assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL) 

1460 if verify_mode == ssl.CERT_NONE or self._server_hostname is None: 

1461 return True 

1462 cert = self.socket.getpeercert() 

1463 if cert is None and verify_mode == ssl.CERT_REQUIRED: 

1464 gen_log.warning("No SSL certificate given") 

1465 return False 

1466 try: 

1467 ssl.match_hostname(peercert, self._server_hostname) 

1468 except ssl.CertificateError as e: 

1469 gen_log.warning("Invalid SSL certificate: %s" % e) 

1470 return False 

1471 else: 

1472 return True 

1473 

1474 def _handle_read(self) -> None: 

1475 if self._ssl_accepting: 

1476 self._do_ssl_handshake() 

1477 return 

1478 super()._handle_read() 

1479 

1480 def _handle_write(self) -> None: 

1481 if self._ssl_accepting: 

1482 self._do_ssl_handshake() 

1483 return 

1484 super()._handle_write() 

1485 

1486 def connect( 

1487 self, address: Tuple, server_hostname: Optional[str] = None 

1488 ) -> "Future[SSLIOStream]": 

1489 self._server_hostname = server_hostname 

1490 # Ignore the result of connect(). If it fails, 

1491 # wait_for_handshake will raise an error too. This is 

1492 # necessary for the old semantics of the connect callback 

1493 # (which takes no arguments). In 6.0 this can be refactored to 

1494 # be a regular coroutine. 

1495 # TODO: This is trickier than it looks, since if write() 

1496 # is called with a connect() pending, we want the connect 

1497 # to resolve before the write. Or do we care about this? 

1498 # (There's a test for it, but I think in practice users 

1499 # either wait for the connect before performing a write or 

1500 # they don't care about the connect Future at all) 

1501 fut = super().connect(address) 

1502 fut.add_done_callback(lambda f: f.exception()) 

1503 return self.wait_for_handshake() 

1504 

1505 def _handle_connect(self) -> None: 

1506 # Call the superclass method to check for errors. 

1507 super()._handle_connect() 

1508 if self.closed(): 

1509 return 

1510 # When the connection is complete, wrap the socket for SSL 

1511 # traffic. Note that we do this by overriding _handle_connect 

1512 # instead of by passing a callback to super().connect because 

1513 # user callbacks are enqueued asynchronously on the IOLoop, 

1514 # but since _handle_events calls _handle_connect immediately 

1515 # followed by _handle_write we need this to be synchronous. 

1516 # 

1517 # The IOLoop will get confused if we swap out self.socket while the 

1518 # fd is registered, so remove it now and re-register after 

1519 # wrap_socket(). 

1520 self.io_loop.remove_handler(self.socket) 

1521 old_state = self._state 

1522 assert old_state is not None 

1523 self._state = None 

1524 self.socket = ssl_wrap_socket( 

1525 self.socket, 

1526 self._ssl_options, 

1527 server_hostname=self._server_hostname, 

1528 do_handshake_on_connect=False, 

1529 server_side=False, 

1530 ) 

1531 self._add_io_state(old_state) 

1532 

1533 def wait_for_handshake(self) -> "Future[SSLIOStream]": 

1534 """Wait for the initial SSL handshake to complete. 

1535 

1536 If a ``callback`` is given, it will be called with no 

1537 arguments once the handshake is complete; otherwise this 

1538 method returns a `.Future` which will resolve to the 

1539 stream itself after the handshake is complete. 

1540 

1541 Once the handshake is complete, information such as 

1542 the peer's certificate and NPN/ALPN selections may be 

1543 accessed on ``self.socket``. 

1544 

1545 This method is intended for use on server-side streams 

1546 or after using `IOStream.start_tls`; it should not be used 

1547 with `IOStream.connect` (which already waits for the 

1548 handshake to complete). It may only be called once per stream. 

1549 

1550 .. versionadded:: 4.2 

1551 

1552 .. versionchanged:: 6.0 

1553 

1554 The ``callback`` argument was removed. Use the returned 

1555 `.Future` instead. 

1556 

1557 """ 

1558 if self._ssl_connect_future is not None: 

1559 raise RuntimeError("Already waiting") 

1560 future = self._ssl_connect_future = Future() 

1561 if not self._ssl_accepting: 

1562 self._finish_ssl_connect() 

1563 return future 

1564 

1565 def write_to_fd(self, data: memoryview) -> int: 

1566 # clip buffer size at 1GB since SSL sockets only support upto 2GB 

1567 # this change in behaviour is transparent, since the function is 

1568 # already expected to (possibly) write less than the provided buffer 

1569 if len(data) >> 30: 

1570 data = memoryview(data)[: 1 << 30] 

1571 try: 

1572 return self.socket.send(data) # type: ignore 

1573 except ssl.SSLError as e: 

1574 if e.args[0] == ssl.SSL_ERROR_WANT_WRITE: 

1575 # In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if 

1576 # the socket is not writeable; we need to transform this into 

1577 # an EWOULDBLOCK socket.error or a zero return value, 

1578 # either of which will be recognized by the caller of this 

1579 # method. Prior to Python 3.5, an unwriteable socket would 

1580 # simply return 0 bytes written. 

1581 return 0 

1582 raise 

1583 finally: 

1584 # Avoid keeping to data, which can be a memoryview. 

1585 # See https://github.com/tornadoweb/tornado/pull/2008 

1586 del data 

1587 

1588 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]: 

1589 try: 

1590 if self._ssl_accepting: 

1591 # If the handshake hasn't finished yet, there can't be anything 

1592 # to read (attempting to read may or may not raise an exception 

1593 # depending on the SSL version) 

1594 return None 

1595 # clip buffer size at 1GB since SSL sockets only support upto 2GB 

1596 # this change in behaviour is transparent, since the function is 

1597 # already expected to (possibly) read less than the provided buffer 

1598 if len(buf) >> 30: 

1599 buf = memoryview(buf)[: 1 << 30] 

1600 try: 

1601 return self.socket.recv_into(buf, len(buf)) 

1602 except ssl.SSLError as e: 

1603 # SSLError is a subclass of socket.error, so this except 

1604 # block must come first. 

1605 if e.args[0] == ssl.SSL_ERROR_WANT_READ: 

1606 return None 

1607 else: 

1608 raise 

1609 except BlockingIOError: 

1610 return None 

1611 finally: 

1612 del buf 

1613 

1614 def _is_connreset(self, e: BaseException) -> bool: 

1615 if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF: 

1616 return True 

1617 return super()._is_connreset(e) 

1618 

1619 

1620class PipeIOStream(BaseIOStream): 

1621 """Pipe-based `IOStream` implementation. 

1622 

1623 The constructor takes an integer file descriptor (such as one returned 

1624 by `os.pipe`) rather than an open file object. Pipes are generally 

1625 one-way, so a `PipeIOStream` can be used for reading or writing but not 

1626 both. 

1627 

1628 ``PipeIOStream`` is only available on Unix-based platforms. 

1629 """ 

1630 

1631 def __init__(self, fd: int, *args: Any, **kwargs: Any) -> None: 

1632 self.fd = fd 

1633 self._fio = io.FileIO(self.fd, "r+") 

1634 if sys.platform == "win32": 

1635 # The form and placement of this assertion is important to mypy. 

1636 # A plain assert statement isn't recognized here. If the assertion 

1637 # were earlier it would worry that the attributes of self aren't 

1638 # set on windows. If it were missing it would complain about 

1639 # the absence of the set_blocking function. 

1640 raise AssertionError("PipeIOStream is not supported on Windows") 

1641 os.set_blocking(fd, False) 

1642 super().__init__(*args, **kwargs) 

1643 

1644 def fileno(self) -> int: 

1645 return self.fd 

1646 

1647 def close_fd(self) -> None: 

1648 self._fio.close() 

1649 

1650 def write_to_fd(self, data: memoryview) -> int: 

1651 try: 

1652 return os.write(self.fd, data) # type: ignore 

1653 finally: 

1654 # Avoid keeping to data, which can be a memoryview. 

1655 # See https://github.com/tornadoweb/tornado/pull/2008 

1656 del data 

1657 

1658 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]: 

1659 try: 

1660 return self._fio.readinto(buf) # type: ignore 

1661 except (IOError, OSError) as e: 

1662 if errno_from_exception(e) == errno.EBADF: 

1663 # If the writing half of a pipe is closed, select will 

1664 # report it as readable but reads will fail with EBADF. 

1665 self.close(exc_info=e) 

1666 return None 

1667 else: 

1668 raise 

1669 finally: 

1670 del buf 

1671 

1672 

1673def doctests() -> Any: 

1674 import doctest 

1675 

1676 return doctest.DocTestSuite()