Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/iostream.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

723 statements  

1# 

2# Copyright 2009 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Utility classes to write to and read from non-blocking files and sockets. 

17 

18Contents: 

19 

20* `BaseIOStream`: Generic interface for reading and writing. 

21* `IOStream`: Implementation of BaseIOStream using non-blocking sockets. 

22* `SSLIOStream`: SSL-aware version of IOStream. 

23* `PipeIOStream`: Pipe-based IOStream implementation. 

24""" 

25 

26import asyncio 

27import collections 

28import errno 

29import io 

30import numbers 

31import os 

32import socket 

33import ssl 

34import sys 

35import re 

36 

37from tornado.concurrent import Future, future_set_result_unless_cancelled 

38from tornado import ioloop 

39from tornado.log import gen_log 

40from tornado.netutil import ssl_wrap_socket, _client_ssl_defaults, _server_ssl_defaults 

41from tornado.util import errno_from_exception 

42 

43import typing 

44from typing import ( 

45 Union, 

46 Optional, 

47 Awaitable, 

48 Callable, 

49 Pattern, 

50 Any, 

51 Dict, 

52 TypeVar, 

53 Tuple, 

54) 

55from types import TracebackType 

56 

57if typing.TYPE_CHECKING: 

58 from typing import Deque, List, Type # noqa: F401 

59 

60_IOStreamType = TypeVar("_IOStreamType", bound="IOStream") 

61 

62# These errnos indicate that a connection has been abruptly terminated. 

63# They should be caught and handled less noisily than other errors. 

64_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE, errno.ETIMEDOUT) 

65 

66if hasattr(errno, "WSAECONNRESET"): 

67 _ERRNO_CONNRESET += ( # type: ignore 

68 errno.WSAECONNRESET, # type: ignore 

69 errno.WSAECONNABORTED, # type: ignore 

70 errno.WSAETIMEDOUT, # type: ignore 

71 ) 

72 

73if sys.platform == "darwin": 

74 # OSX appears to have a race condition that causes send(2) to return 

75 # EPROTOTYPE if called while a socket is being torn down: 

76 # http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/ 

77 # Since the socket is being closed anyway, treat this as an ECONNRESET 

78 # instead of an unexpected error. 

79 _ERRNO_CONNRESET += (errno.EPROTOTYPE,) # type: ignore 

80 

81_WINDOWS = sys.platform.startswith("win") 

82 

83 

84class StreamClosedError(IOError): 

85 """Exception raised by `IOStream` methods when the stream is closed. 

86 

87 Note that the close callback is scheduled to run *after* other 

88 callbacks on the stream (to allow for buffered data to be processed), 

89 so you may see this error before you see the close callback. 

90 

91 The ``real_error`` attribute contains the underlying error that caused 

92 the stream to close (if any). 

93 

94 .. versionchanged:: 4.3 

95 Added the ``real_error`` attribute. 

96 """ 

97 

98 def __init__(self, real_error: Optional[BaseException] = None) -> None: 

99 super().__init__("Stream is closed") 

100 self.real_error = real_error 

101 

102 

103class UnsatisfiableReadError(Exception): 

104 """Exception raised when a read cannot be satisfied. 

105 

106 Raised by ``read_until`` and ``read_until_regex`` with a ``max_bytes`` 

107 argument. 

108 """ 

109 

110 pass 

111 

112 

113class StreamBufferFullError(Exception): 

114 """Exception raised by `IOStream` methods when the buffer is full.""" 

115 

116 

117class _StreamBuffer(object): 

118 """ 

119 A specialized buffer that tries to avoid copies when large pieces 

120 of data are encountered. 

121 """ 

122 

123 def __init__(self) -> None: 

124 # A sequence of (False, bytearray) and (True, memoryview) objects 

125 self._buffers = ( 

126 collections.deque() 

127 ) # type: Deque[Tuple[bool, Union[bytearray, memoryview]]] 

128 # Position in the first buffer 

129 self._first_pos = 0 

130 self._size = 0 

131 

132 def __len__(self) -> int: 

133 return self._size 

134 

135 # Data above this size will be appended separately instead 

136 # of extending an existing bytearray 

137 _large_buf_threshold = 2048 

138 

139 def append(self, data: Union[bytes, bytearray, memoryview]) -> None: 

140 """ 

141 Append the given piece of data (should be a buffer-compatible object). 

142 """ 

143 size = len(data) 

144 if size > self._large_buf_threshold: 

145 if not isinstance(data, memoryview): 

146 data = memoryview(data) 

147 self._buffers.append((True, data)) 

148 elif size > 0: 

149 if self._buffers: 

150 is_memview, b = self._buffers[-1] 

151 new_buf = is_memview or len(b) >= self._large_buf_threshold 

152 else: 

153 new_buf = True 

154 if new_buf: 

155 self._buffers.append((False, bytearray(data))) 

156 else: 

157 b += data # type: ignore 

158 

159 self._size += size 

160 

161 def peek(self, size: int) -> memoryview: 

162 """ 

163 Get a view over at most ``size`` bytes (possibly fewer) at the 

164 current buffer position. 

165 """ 

166 assert size > 0 

167 try: 

168 is_memview, b = self._buffers[0] 

169 except IndexError: 

170 return memoryview(b"") 

171 

172 pos = self._first_pos 

173 if is_memview: 

174 return typing.cast(memoryview, b[pos : pos + size]) 

175 else: 

176 return memoryview(b)[pos : pos + size] 

177 

178 def advance(self, size: int) -> None: 

179 """ 

180 Advance the current buffer position by ``size`` bytes. 

181 """ 

182 assert 0 < size <= self._size 

183 self._size -= size 

184 pos = self._first_pos 

185 

186 buffers = self._buffers 

187 while buffers and size > 0: 

188 is_large, b = buffers[0] 

189 b_remain = len(b) - size - pos 

190 if b_remain <= 0: 

191 buffers.popleft() 

192 size -= len(b) - pos 

193 pos = 0 

194 elif is_large: 

195 pos += size 

196 size = 0 

197 else: 

198 pos += size 

199 del typing.cast(bytearray, b)[:pos] 

200 pos = 0 

201 size = 0 

202 

203 assert size == 0 

204 self._first_pos = pos 

205 

206 

207class BaseIOStream(object): 

208 """A utility class to write to and read from a non-blocking file or socket. 

209 

210 We support a non-blocking ``write()`` and a family of ``read_*()`` 

211 methods. When the operation completes, the ``Awaitable`` will resolve 

212 with the data read (or ``None`` for ``write()``). All outstanding 

213 ``Awaitables`` will resolve with a `StreamClosedError` when the 

214 stream is closed; `.BaseIOStream.set_close_callback` can also be used 

215 to be notified of a closed stream. 

216 

217 When a stream is closed due to an error, the IOStream's ``error`` 

218 attribute contains the exception object. 

219 

220 Subclasses must implement `fileno`, `close_fd`, `write_to_fd`, 

221 `read_from_fd`, and optionally `get_fd_error`. 

222 

223 """ 

224 

225 def __init__( 

226 self, 

227 max_buffer_size: Optional[int] = None, 

228 read_chunk_size: Optional[int] = None, 

229 max_write_buffer_size: Optional[int] = None, 

230 ) -> None: 

231 """`BaseIOStream` constructor. 

232 

233 :arg max_buffer_size: Maximum amount of incoming data to buffer; 

234 defaults to 100MB. 

235 :arg read_chunk_size: Amount of data to read at one time from the 

236 underlying transport; defaults to 64KB. 

237 :arg max_write_buffer_size: Amount of outgoing data to buffer; 

238 defaults to unlimited. 

239 

240 .. versionchanged:: 4.0 

241 Add the ``max_write_buffer_size`` parameter. Changed default 

242 ``read_chunk_size`` to 64KB. 

243 .. versionchanged:: 5.0 

244 The ``io_loop`` argument (deprecated since version 4.1) has been 

245 removed. 

246 """ 

247 self.io_loop = ioloop.IOLoop.current() 

248 self.max_buffer_size = max_buffer_size or 104857600 

249 # A chunk size that is too close to max_buffer_size can cause 

250 # spurious failures. 

251 self.read_chunk_size = min(read_chunk_size or 65536, self.max_buffer_size // 2) 

252 self.max_write_buffer_size = max_write_buffer_size 

253 self.error = None # type: Optional[BaseException] 

254 self._read_buffer = bytearray() 

255 self._read_buffer_size = 0 

256 self._user_read_buffer = False 

257 self._after_user_read_buffer = None # type: Optional[bytearray] 

258 self._write_buffer = _StreamBuffer() 

259 self._total_write_index = 0 

260 self._total_write_done_index = 0 

261 self._read_delimiter = None # type: Optional[bytes] 

262 self._read_regex = None # type: Optional[Pattern] 

263 self._read_max_bytes = None # type: Optional[int] 

264 self._read_bytes = None # type: Optional[int] 

265 self._read_partial = False 

266 self._read_until_close = False 

267 self._read_future = None # type: Optional[Future] 

268 self._write_futures = ( 

269 collections.deque() 

270 ) # type: Deque[Tuple[int, Future[None]]] 

271 self._close_callback = None # type: Optional[Callable[[], None]] 

272 self._connect_future = None # type: Optional[Future[IOStream]] 

273 # _ssl_connect_future should be defined in SSLIOStream 

274 # but it's here so we can clean it up in _signal_closed 

275 # TODO: refactor that so subclasses can add additional futures 

276 # to be cancelled. 

277 self._ssl_connect_future = None # type: Optional[Future[SSLIOStream]] 

278 self._connecting = False 

279 self._state = None # type: Optional[int] 

280 self._closed = False 

281 

282 def fileno(self) -> Union[int, ioloop._Selectable]: 

283 """Returns the file descriptor for this stream.""" 

284 raise NotImplementedError() 

285 

286 def close_fd(self) -> None: 

287 """Closes the file underlying this stream. 

288 

289 ``close_fd`` is called by `BaseIOStream` and should not be called 

290 elsewhere; other users should call `close` instead. 

291 """ 

292 raise NotImplementedError() 

293 

294 def write_to_fd(self, data: memoryview) -> int: 

295 """Attempts to write ``data`` to the underlying file. 

296 

297 Returns the number of bytes written. 

298 """ 

299 raise NotImplementedError() 

300 

301 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]: 

302 """Attempts to read from the underlying file. 

303 

304 Reads up to ``len(buf)`` bytes, storing them in the buffer. 

305 Returns the number of bytes read. Returns None if there was 

306 nothing to read (the socket returned `~errno.EWOULDBLOCK` or 

307 equivalent), and zero on EOF. 

308 

309 .. versionchanged:: 5.0 

310 

311 Interface redesigned to take a buffer and return a number 

312 of bytes instead of a freshly-allocated object. 

313 """ 

314 raise NotImplementedError() 

315 

316 def get_fd_error(self) -> Optional[Exception]: 

317 """Returns information about any error on the underlying file. 

318 

319 This method is called after the `.IOLoop` has signaled an error on the 

320 file descriptor, and should return an Exception (such as `socket.error` 

321 with additional information, or None if no such information is 

322 available. 

323 """ 

324 return None 

325 

326 def read_until_regex( 

327 self, regex: bytes, max_bytes: Optional[int] = None 

328 ) -> Awaitable[bytes]: 

329 """Asynchronously read until we have matched the given regex. 

330 

331 The result includes the data that matches the regex and anything 

332 that came before it. 

333 

334 If ``max_bytes`` is not None, the connection will be closed 

335 if more than ``max_bytes`` bytes have been read and the regex is 

336 not satisfied. 

337 

338 .. versionchanged:: 4.0 

339 Added the ``max_bytes`` argument. The ``callback`` argument is 

340 now optional and a `.Future` will be returned if it is omitted. 

341 

342 .. versionchanged:: 6.0 

343 

344 The ``callback`` argument was removed. Use the returned 

345 `.Future` instead. 

346 

347 """ 

348 future = self._start_read() 

349 self._read_regex = re.compile(regex) 

350 self._read_max_bytes = max_bytes 

351 try: 

352 self._try_inline_read() 

353 except UnsatisfiableReadError as e: 

354 # Handle this the same way as in _handle_events. 

355 gen_log.info("Unsatisfiable read, closing connection: %s" % e) 

356 self.close(exc_info=e) 

357 return future 

358 except: 

359 # Ensure that the future doesn't log an error because its 

360 # failure was never examined. 

361 future.add_done_callback(lambda f: f.exception()) 

362 raise 

363 return future 

364 

365 def read_until( 

366 self, delimiter: bytes, max_bytes: Optional[int] = None 

367 ) -> Awaitable[bytes]: 

368 """Asynchronously read until we have found the given delimiter. 

369 

370 The result includes all the data read including the delimiter. 

371 

372 If ``max_bytes`` is not None, the connection will be closed 

373 if more than ``max_bytes`` bytes have been read and the delimiter 

374 is not found. 

375 

376 .. versionchanged:: 4.0 

377 Added the ``max_bytes`` argument. The ``callback`` argument is 

378 now optional and a `.Future` will be returned if it is omitted. 

379 

380 .. versionchanged:: 6.0 

381 

382 The ``callback`` argument was removed. Use the returned 

383 `.Future` instead. 

384 """ 

385 future = self._start_read() 

386 self._read_delimiter = delimiter 

387 self._read_max_bytes = max_bytes 

388 try: 

389 self._try_inline_read() 

390 except UnsatisfiableReadError as e: 

391 # Handle this the same way as in _handle_events. 

392 gen_log.info("Unsatisfiable read, closing connection: %s" % e) 

393 self.close(exc_info=e) 

394 return future 

395 except: 

396 future.add_done_callback(lambda f: f.exception()) 

397 raise 

398 return future 

399 

400 def read_bytes(self, num_bytes: int, partial: bool = False) -> Awaitable[bytes]: 

401 """Asynchronously read a number of bytes. 

402 

403 If ``partial`` is true, data is returned as soon as we have 

404 any bytes to return (but never more than ``num_bytes``) 

405 

406 .. versionchanged:: 4.0 

407 Added the ``partial`` argument. The callback argument is now 

408 optional and a `.Future` will be returned if it is omitted. 

409 

410 .. versionchanged:: 6.0 

411 

412 The ``callback`` and ``streaming_callback`` arguments have 

413 been removed. Use the returned `.Future` (and 

414 ``partial=True`` for ``streaming_callback``) instead. 

415 

416 """ 

417 future = self._start_read() 

418 assert isinstance(num_bytes, numbers.Integral) 

419 self._read_bytes = num_bytes 

420 self._read_partial = partial 

421 try: 

422 self._try_inline_read() 

423 except: 

424 future.add_done_callback(lambda f: f.exception()) 

425 raise 

426 return future 

427 

428 def read_into(self, buf: bytearray, partial: bool = False) -> Awaitable[int]: 

429 """Asynchronously read a number of bytes. 

430 

431 ``buf`` must be a writable buffer into which data will be read. 

432 

433 If ``partial`` is true, the callback is run as soon as any bytes 

434 have been read. Otherwise, it is run when the ``buf`` has been 

435 entirely filled with read data. 

436 

437 .. versionadded:: 5.0 

438 

439 .. versionchanged:: 6.0 

440 

441 The ``callback`` argument was removed. Use the returned 

442 `.Future` instead. 

443 

444 """ 

445 future = self._start_read() 

446 

447 # First copy data already in read buffer 

448 available_bytes = self._read_buffer_size 

449 n = len(buf) 

450 if available_bytes >= n: 

451 buf[:] = memoryview(self._read_buffer)[:n] 

452 del self._read_buffer[:n] 

453 self._after_user_read_buffer = self._read_buffer 

454 elif available_bytes > 0: 

455 buf[:available_bytes] = memoryview(self._read_buffer)[:] 

456 

457 # Set up the supplied buffer as our temporary read buffer. 

458 # The original (if it had any data remaining) has been 

459 # saved for later. 

460 self._user_read_buffer = True 

461 self._read_buffer = buf 

462 self._read_buffer_size = available_bytes 

463 self._read_bytes = n 

464 self._read_partial = partial 

465 

466 try: 

467 self._try_inline_read() 

468 except: 

469 future.add_done_callback(lambda f: f.exception()) 

470 raise 

471 return future 

472 

473 def read_until_close(self) -> Awaitable[bytes]: 

474 """Asynchronously reads all data from the socket until it is closed. 

475 

476 This will buffer all available data until ``max_buffer_size`` 

477 is reached. If flow control or cancellation are desired, use a 

478 loop with `read_bytes(partial=True) <.read_bytes>` instead. 

479 

480 .. versionchanged:: 4.0 

481 The callback argument is now optional and a `.Future` will 

482 be returned if it is omitted. 

483 

484 .. versionchanged:: 6.0 

485 

486 The ``callback`` and ``streaming_callback`` arguments have 

487 been removed. Use the returned `.Future` (and `read_bytes` 

488 with ``partial=True`` for ``streaming_callback``) instead. 

489 

490 """ 

491 future = self._start_read() 

492 if self.closed(): 

493 self._finish_read(self._read_buffer_size) 

494 return future 

495 self._read_until_close = True 

496 try: 

497 self._try_inline_read() 

498 except: 

499 future.add_done_callback(lambda f: f.exception()) 

500 raise 

501 return future 

502 

503 def write(self, data: Union[bytes, memoryview]) -> "Future[None]": 

504 """Asynchronously write the given data to this stream. 

505 

506 This method returns a `.Future` that resolves (with a result 

507 of ``None``) when the write has been completed. 

508 

509 The ``data`` argument may be of type `bytes` or `memoryview`. 

510 

511 .. versionchanged:: 4.0 

512 Now returns a `.Future` if no callback is given. 

513 

514 .. versionchanged:: 4.5 

515 Added support for `memoryview` arguments. 

516 

517 .. versionchanged:: 6.0 

518 

519 The ``callback`` argument was removed. Use the returned 

520 `.Future` instead. 

521 

522 """ 

523 self._check_closed() 

524 if data: 

525 if isinstance(data, memoryview): 

526 # Make sure that ``len(data) == data.nbytes`` 

527 data = memoryview(data).cast("B") 

528 if ( 

529 self.max_write_buffer_size is not None 

530 and len(self._write_buffer) + len(data) > self.max_write_buffer_size 

531 ): 

532 raise StreamBufferFullError("Reached maximum write buffer size") 

533 self._write_buffer.append(data) 

534 self._total_write_index += len(data) 

535 future = Future() # type: Future[None] 

536 future.add_done_callback(lambda f: f.exception()) 

537 self._write_futures.append((self._total_write_index, future)) 

538 if not self._connecting: 

539 self._handle_write() 

540 if self._write_buffer: 

541 self._add_io_state(self.io_loop.WRITE) 

542 self._maybe_add_error_listener() 

543 return future 

544 

545 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None: 

546 """Call the given callback when the stream is closed. 

547 

548 This mostly is not necessary for applications that use the 

549 `.Future` interface; all outstanding ``Futures`` will resolve 

550 with a `StreamClosedError` when the stream is closed. However, 

551 it is still useful as a way to signal that the stream has been 

552 closed while no other read or write is in progress. 

553 

554 Unlike other callback-based interfaces, ``set_close_callback`` 

555 was not removed in Tornado 6.0. 

556 """ 

557 self._close_callback = callback 

558 self._maybe_add_error_listener() 

559 

560 def close( 

561 self, 

562 exc_info: Union[ 

563 None, 

564 bool, 

565 BaseException, 

566 Tuple[ 

567 "Optional[Type[BaseException]]", 

568 Optional[BaseException], 

569 Optional[TracebackType], 

570 ], 

571 ] = False, 

572 ) -> None: 

573 """Close this stream. 

574 

575 If ``exc_info`` is true, set the ``error`` attribute to the current 

576 exception from `sys.exc_info` (or if ``exc_info`` is a tuple, 

577 use that instead of `sys.exc_info`). 

578 """ 

579 if not self.closed(): 

580 if exc_info: 

581 if isinstance(exc_info, tuple): 

582 self.error = exc_info[1] 

583 elif isinstance(exc_info, BaseException): 

584 self.error = exc_info 

585 else: 

586 exc_info = sys.exc_info() 

587 if any(exc_info): 

588 self.error = exc_info[1] 

589 if self._read_until_close: 

590 self._read_until_close = False 

591 self._finish_read(self._read_buffer_size) 

592 elif self._read_future is not None: 

593 # resolve reads that are pending and ready to complete 

594 try: 

595 pos = self._find_read_pos() 

596 except UnsatisfiableReadError: 

597 pass 

598 else: 

599 if pos is not None: 

600 self._read_from_buffer(pos) 

601 if self._state is not None: 

602 self.io_loop.remove_handler(self.fileno()) 

603 self._state = None 

604 self.close_fd() 

605 self._closed = True 

606 self._signal_closed() 

607 

608 def _signal_closed(self) -> None: 

609 futures = [] # type: List[Future] 

610 if self._read_future is not None: 

611 futures.append(self._read_future) 

612 self._read_future = None 

613 futures += [future for _, future in self._write_futures] 

614 self._write_futures.clear() 

615 if self._connect_future is not None: 

616 futures.append(self._connect_future) 

617 self._connect_future = None 

618 for future in futures: 

619 if not future.done(): 

620 future.set_exception(StreamClosedError(real_error=self.error)) 

621 # Reference the exception to silence warnings. Annoyingly, 

622 # this raises if the future was cancelled, but just 

623 # returns any other error. 

624 try: 

625 future.exception() 

626 except asyncio.CancelledError: 

627 pass 

628 if self._ssl_connect_future is not None: 

629 # _ssl_connect_future expects to see the real exception (typically 

630 # an ssl.SSLError), not just StreamClosedError. 

631 if not self._ssl_connect_future.done(): 

632 if self.error is not None: 

633 self._ssl_connect_future.set_exception(self.error) 

634 else: 

635 self._ssl_connect_future.set_exception(StreamClosedError()) 

636 self._ssl_connect_future.exception() 

637 self._ssl_connect_future = None 

638 if self._close_callback is not None: 

639 cb = self._close_callback 

640 self._close_callback = None 

641 self.io_loop.add_callback(cb) 

642 # Clear the buffers so they can be cleared immediately even 

643 # if the IOStream object is kept alive by a reference cycle. 

644 # TODO: Clear the read buffer too; it currently breaks some tests. 

645 self._write_buffer = None # type: ignore 

646 

647 def reading(self) -> bool: 

648 """Returns ``True`` if we are currently reading from the stream.""" 

649 return self._read_future is not None 

650 

651 def writing(self) -> bool: 

652 """Returns ``True`` if we are currently writing to the stream.""" 

653 return bool(self._write_buffer) 

654 

655 def closed(self) -> bool: 

656 """Returns ``True`` if the stream has been closed.""" 

657 return self._closed 

658 

659 def set_nodelay(self, value: bool) -> None: 

660 """Sets the no-delay flag for this stream. 

661 

662 By default, data written to TCP streams may be held for a time 

663 to make the most efficient use of bandwidth (according to 

664 Nagle's algorithm). The no-delay flag requests that data be 

665 written as soon as possible, even if doing so would consume 

666 additional bandwidth. 

667 

668 This flag is currently defined only for TCP-based ``IOStreams``. 

669 

670 .. versionadded:: 3.1 

671 """ 

672 pass 

673 

674 def _handle_connect(self) -> None: 

675 raise NotImplementedError() 

676 

677 def _handle_events(self, fd: Union[int, ioloop._Selectable], events: int) -> None: 

678 if self.closed(): 

679 gen_log.warning("Got events for closed stream %s", fd) 

680 return 

681 try: 

682 if self._connecting: 

683 # Most IOLoops will report a write failed connect 

684 # with the WRITE event, but SelectIOLoop reports a 

685 # READ as well so we must check for connecting before 

686 # either. 

687 self._handle_connect() 

688 if self.closed(): 

689 return 

690 if events & self.io_loop.READ: 

691 self._handle_read() 

692 if self.closed(): 

693 return 

694 if events & self.io_loop.WRITE: 

695 self._handle_write() 

696 if self.closed(): 

697 return 

698 if events & self.io_loop.ERROR: 

699 self.error = self.get_fd_error() 

700 # We may have queued up a user callback in _handle_read or 

701 # _handle_write, so don't close the IOStream until those 

702 # callbacks have had a chance to run. 

703 self.io_loop.add_callback(self.close) 

704 return 

705 state = self.io_loop.ERROR 

706 if self.reading(): 

707 state |= self.io_loop.READ 

708 if self.writing(): 

709 state |= self.io_loop.WRITE 

710 if state == self.io_loop.ERROR and self._read_buffer_size == 0: 

711 # If the connection is idle, listen for reads too so 

712 # we can tell if the connection is closed. If there is 

713 # data in the read buffer we won't run the close callback 

714 # yet anyway, so we don't need to listen in this case. 

715 state |= self.io_loop.READ 

716 if state != self._state: 

717 assert ( 

718 self._state is not None 

719 ), "shouldn't happen: _handle_events without self._state" 

720 self._state = state 

721 self.io_loop.update_handler(self.fileno(), self._state) 

722 except UnsatisfiableReadError as e: 

723 gen_log.info("Unsatisfiable read, closing connection: %s" % e) 

724 self.close(exc_info=e) 

725 except Exception as e: 

726 gen_log.error("Uncaught exception, closing connection.", exc_info=True) 

727 self.close(exc_info=e) 

728 raise 

729 

730 def _read_to_buffer_loop(self) -> Optional[int]: 

731 # This method is called from _handle_read and _try_inline_read. 

732 if self._read_bytes is not None: 

733 target_bytes = self._read_bytes # type: Optional[int] 

734 elif self._read_max_bytes is not None: 

735 target_bytes = self._read_max_bytes 

736 elif self.reading(): 

737 # For read_until without max_bytes, or 

738 # read_until_close, read as much as we can before 

739 # scanning for the delimiter. 

740 target_bytes = None 

741 else: 

742 target_bytes = 0 

743 next_find_pos = 0 

744 while not self.closed(): 

745 # Read from the socket until we get EWOULDBLOCK or equivalent. 

746 # SSL sockets do some internal buffering, and if the data is 

747 # sitting in the SSL object's buffer select() and friends 

748 # can't see it; the only way to find out if it's there is to 

749 # try to read it. 

750 if self._read_to_buffer() == 0: 

751 break 

752 

753 # If we've read all the bytes we can use, break out of 

754 # this loop. 

755 

756 # If we've reached target_bytes, we know we're done. 

757 if target_bytes is not None and self._read_buffer_size >= target_bytes: 

758 break 

759 

760 # Otherwise, we need to call the more expensive find_read_pos. 

761 # It's inefficient to do this on every read, so instead 

762 # do it on the first read and whenever the read buffer 

763 # size has doubled. 

764 if self._read_buffer_size >= next_find_pos: 

765 pos = self._find_read_pos() 

766 if pos is not None: 

767 return pos 

768 next_find_pos = self._read_buffer_size * 2 

769 return self._find_read_pos() 

770 

771 def _handle_read(self) -> None: 

772 try: 

773 pos = self._read_to_buffer_loop() 

774 except UnsatisfiableReadError: 

775 raise 

776 except asyncio.CancelledError: 

777 raise 

778 except Exception as e: 

779 gen_log.warning("error on read: %s" % e) 

780 self.close(exc_info=e) 

781 return 

782 if pos is not None: 

783 self._read_from_buffer(pos) 

784 

785 def _start_read(self) -> Future: 

786 if self._read_future is not None: 

787 # It is an error to start a read while a prior read is unresolved. 

788 # However, if the prior read is unresolved because the stream was 

789 # closed without satisfying it, it's better to raise 

790 # StreamClosedError instead of AssertionError. In particular, this 

791 # situation occurs in harmless situations in http1connection.py and 

792 # an AssertionError would be logged noisily. 

793 # 

794 # On the other hand, it is legal to start a new read while the 

795 # stream is closed, in case the read can be satisfied from the 

796 # read buffer. So we only want to check the closed status of the 

797 # stream if we need to decide what kind of error to raise for 

798 # "already reading". 

799 # 

800 # These conditions have proven difficult to test; we have no 

801 # unittests that reliably verify this behavior so be careful 

802 # when making changes here. See #2651 and #2719. 

803 self._check_closed() 

804 assert self._read_future is None, "Already reading" 

805 self._read_future = Future() 

806 return self._read_future 

807 

808 def _finish_read(self, size: int) -> None: 

809 if self._user_read_buffer: 

810 self._read_buffer = self._after_user_read_buffer or bytearray() 

811 self._after_user_read_buffer = None 

812 self._read_buffer_size = len(self._read_buffer) 

813 self._user_read_buffer = False 

814 result = size # type: Union[int, bytes] 

815 else: 

816 result = self._consume(size) 

817 if self._read_future is not None: 

818 future = self._read_future 

819 self._read_future = None 

820 future_set_result_unless_cancelled(future, result) 

821 self._maybe_add_error_listener() 

822 

823 def _try_inline_read(self) -> None: 

824 """Attempt to complete the current read operation from buffered data. 

825 

826 If the read can be completed without blocking, schedules the 

827 read callback on the next IOLoop iteration; otherwise starts 

828 listening for reads on the socket. 

829 """ 

830 # See if we've already got the data from a previous read 

831 pos = self._find_read_pos() 

832 if pos is not None: 

833 self._read_from_buffer(pos) 

834 return 

835 self._check_closed() 

836 pos = self._read_to_buffer_loop() 

837 if pos is not None: 

838 self._read_from_buffer(pos) 

839 return 

840 # We couldn't satisfy the read inline, so make sure we're 

841 # listening for new data unless the stream is closed. 

842 if not self.closed(): 

843 self._add_io_state(ioloop.IOLoop.READ) 

844 

845 def _read_to_buffer(self) -> Optional[int]: 

846 """Reads from the socket and appends the result to the read buffer. 

847 

848 Returns the number of bytes read. Returns 0 if there is nothing 

849 to read (i.e. the read returns EWOULDBLOCK or equivalent). On 

850 error closes the socket and raises an exception. 

851 """ 

852 try: 

853 while True: 

854 try: 

855 if self._user_read_buffer: 

856 buf = memoryview(self._read_buffer)[ 

857 self._read_buffer_size : 

858 ] # type: Union[memoryview, bytearray] 

859 else: 

860 buf = bytearray(self.read_chunk_size) 

861 bytes_read = self.read_from_fd(buf) 

862 except (socket.error, IOError, OSError) as e: 

863 # ssl.SSLError is a subclass of socket.error 

864 if self._is_connreset(e): 

865 # Treat ECONNRESET as a connection close rather than 

866 # an error to minimize log spam (the exception will 

867 # be available on self.error for apps that care). 

868 self.close(exc_info=e) 

869 return None 

870 self.close(exc_info=e) 

871 raise 

872 break 

873 if bytes_read is None: 

874 return 0 

875 elif bytes_read == 0: 

876 self.close() 

877 return 0 

878 if not self._user_read_buffer: 

879 self._read_buffer += memoryview(buf)[:bytes_read] 

880 self._read_buffer_size += bytes_read 

881 finally: 

882 # Break the reference to buf so we don't waste a chunk's worth of 

883 # memory in case an exception hangs on to our stack frame. 

884 del buf 

885 if self._read_buffer_size > self.max_buffer_size: 

886 gen_log.error("Reached maximum read buffer size") 

887 self.close() 

888 raise StreamBufferFullError("Reached maximum read buffer size") 

889 return bytes_read 

890 

891 def _read_from_buffer(self, pos: int) -> None: 

892 """Attempts to complete the currently-pending read from the buffer. 

893 

894 The argument is either a position in the read buffer or None, 

895 as returned by _find_read_pos. 

896 """ 

897 self._read_bytes = self._read_delimiter = self._read_regex = None 

898 self._read_partial = False 

899 self._finish_read(pos) 

900 

901 def _find_read_pos(self) -> Optional[int]: 

902 """Attempts to find a position in the read buffer that satisfies 

903 the currently-pending read. 

904 

905 Returns a position in the buffer if the current read can be satisfied, 

906 or None if it cannot. 

907 """ 

908 if self._read_bytes is not None and ( 

909 self._read_buffer_size >= self._read_bytes 

910 or (self._read_partial and self._read_buffer_size > 0) 

911 ): 

912 num_bytes = min(self._read_bytes, self._read_buffer_size) 

913 return num_bytes 

914 elif self._read_delimiter is not None: 

915 # Multi-byte delimiters (e.g. '\r\n') may straddle two 

916 # chunks in the read buffer, so we can't easily find them 

917 # without collapsing the buffer. However, since protocols 

918 # using delimited reads (as opposed to reads of a known 

919 # length) tend to be "line" oriented, the delimiter is likely 

920 # to be in the first few chunks. Merge the buffer gradually 

921 # since large merges are relatively expensive and get undone in 

922 # _consume(). 

923 if self._read_buffer: 

924 loc = self._read_buffer.find(self._read_delimiter) 

925 if loc != -1: 

926 delimiter_len = len(self._read_delimiter) 

927 self._check_max_bytes(self._read_delimiter, loc + delimiter_len) 

928 return loc + delimiter_len 

929 self._check_max_bytes(self._read_delimiter, self._read_buffer_size) 

930 elif self._read_regex is not None: 

931 if self._read_buffer: 

932 m = self._read_regex.search(self._read_buffer) 

933 if m is not None: 

934 loc = m.end() 

935 self._check_max_bytes(self._read_regex, loc) 

936 return loc 

937 self._check_max_bytes(self._read_regex, self._read_buffer_size) 

938 return None 

939 

940 def _check_max_bytes(self, delimiter: Union[bytes, Pattern], size: int) -> None: 

941 if self._read_max_bytes is not None and size > self._read_max_bytes: 

942 raise UnsatisfiableReadError( 

943 "delimiter %r not found within %d bytes" 

944 % (delimiter, self._read_max_bytes) 

945 ) 

946 

947 def _handle_write(self) -> None: 

948 while True: 

949 size = len(self._write_buffer) 

950 if not size: 

951 break 

952 assert size > 0 

953 try: 

954 if _WINDOWS: 

955 # On windows, socket.send blows up if given a 

956 # write buffer that's too large, instead of just 

957 # returning the number of bytes it was able to 

958 # process. Therefore we must not call socket.send 

959 # with more than 128KB at a time. 

960 size = 128 * 1024 

961 

962 num_bytes = self.write_to_fd(self._write_buffer.peek(size)) 

963 if num_bytes == 0: 

964 break 

965 self._write_buffer.advance(num_bytes) 

966 self._total_write_done_index += num_bytes 

967 except BlockingIOError: 

968 break 

969 except (socket.error, IOError, OSError) as e: 

970 if not self._is_connreset(e): 

971 # Broken pipe errors are usually caused by connection 

972 # reset, and its better to not log EPIPE errors to 

973 # minimize log spam 

974 gen_log.warning("Write error on %s: %s", self.fileno(), e) 

975 self.close(exc_info=e) 

976 return 

977 

978 while self._write_futures: 

979 index, future = self._write_futures[0] 

980 if index > self._total_write_done_index: 

981 break 

982 self._write_futures.popleft() 

983 future_set_result_unless_cancelled(future, None) 

984 

985 def _consume(self, loc: int) -> bytes: 

986 # Consume loc bytes from the read buffer and return them 

987 if loc == 0: 

988 return b"" 

989 assert loc <= self._read_buffer_size 

990 # Slice the bytearray buffer into bytes, without intermediate copying 

991 b = (memoryview(self._read_buffer)[:loc]).tobytes() 

992 self._read_buffer_size -= loc 

993 del self._read_buffer[:loc] 

994 return b 

995 

996 def _check_closed(self) -> None: 

997 if self.closed(): 

998 raise StreamClosedError(real_error=self.error) 

999 

1000 def _maybe_add_error_listener(self) -> None: 

1001 # This method is part of an optimization: to detect a connection that 

1002 # is closed when we're not actively reading or writing, we must listen 

1003 # for read events. However, it is inefficient to do this when the 

1004 # connection is first established because we are going to read or write 

1005 # immediately anyway. Instead, we insert checks at various times to 

1006 # see if the connection is idle and add the read listener then. 

1007 if self._state is None or self._state == ioloop.IOLoop.ERROR: 

1008 if ( 

1009 not self.closed() 

1010 and self._read_buffer_size == 0 

1011 and self._close_callback is not None 

1012 ): 

1013 self._add_io_state(ioloop.IOLoop.READ) 

1014 

1015 def _add_io_state(self, state: int) -> None: 

1016 """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler. 

1017 

1018 Implementation notes: Reads and writes have a fast path and a 

1019 slow path. The fast path reads synchronously from socket 

1020 buffers, while the slow path uses `_add_io_state` to schedule 

1021 an IOLoop callback. 

1022 

1023 To detect closed connections, we must have called 

1024 `_add_io_state` at some point, but we want to delay this as 

1025 much as possible so we don't have to set an `IOLoop.ERROR` 

1026 listener that will be overwritten by the next slow-path 

1027 operation. If a sequence of fast-path ops do not end in a 

1028 slow-path op, (e.g. for an @asynchronous long-poll request), 

1029 we must add the error handler. 

1030 

1031 TODO: reevaluate this now that callbacks are gone. 

1032 

1033 """ 

1034 if self.closed(): 

1035 # connection has been closed, so there can be no future events 

1036 return 

1037 if self._state is None: 

1038 self._state = ioloop.IOLoop.ERROR | state 

1039 self.io_loop.add_handler(self.fileno(), self._handle_events, self._state) 

1040 elif not self._state & state: 

1041 self._state = self._state | state 

1042 self.io_loop.update_handler(self.fileno(), self._state) 

1043 

1044 def _is_connreset(self, exc: BaseException) -> bool: 

1045 """Return ``True`` if exc is ECONNRESET or equivalent. 

1046 

1047 May be overridden in subclasses. 

1048 """ 

1049 return ( 

1050 isinstance(exc, (socket.error, IOError)) 

1051 and errno_from_exception(exc) in _ERRNO_CONNRESET 

1052 ) 

1053 

1054 

1055class IOStream(BaseIOStream): 

1056 r"""Socket-based `IOStream` implementation. 

1057 

1058 This class supports the read and write methods from `BaseIOStream` 

1059 plus a `connect` method. 

1060 

1061 The ``socket`` parameter may either be connected or unconnected. 

1062 For server operations the socket is the result of calling 

1063 `socket.accept <socket.socket.accept>`. For client operations the 

1064 socket is created with `socket.socket`, and may either be 

1065 connected before passing it to the `IOStream` or connected with 

1066 `IOStream.connect`. 

1067 

1068 A very simple (and broken) HTTP client using this class: 

1069 

1070 .. testcode:: 

1071 

1072 import socket 

1073 import tornado 

1074 

1075 async def main(): 

1076 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 

1077 stream = tornado.iostream.IOStream(s) 

1078 await stream.connect(("friendfeed.com", 80)) 

1079 await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n") 

1080 header_data = await stream.read_until(b"\r\n\r\n") 

1081 headers = {} 

1082 for line in header_data.split(b"\r\n"): 

1083 parts = line.split(b":") 

1084 if len(parts) == 2: 

1085 headers[parts[0].strip()] = parts[1].strip() 

1086 body_data = await stream.read_bytes(int(headers[b"Content-Length"])) 

1087 print(body_data) 

1088 stream.close() 

1089 

1090 if __name__ == '__main__': 

1091 asyncio.run(main()) 

1092 

1093 .. testoutput:: 

1094 :hide: 

1095 

1096 """ 

1097 

1098 def __init__(self, socket: socket.socket, *args: Any, **kwargs: Any) -> None: 

1099 self.socket = socket 

1100 self.socket.setblocking(False) 

1101 super().__init__(*args, **kwargs) 

1102 

1103 def fileno(self) -> Union[int, ioloop._Selectable]: 

1104 return self.socket 

1105 

1106 def close_fd(self) -> None: 

1107 self.socket.close() 

1108 self.socket = None # type: ignore 

1109 

1110 def get_fd_error(self) -> Optional[Exception]: 

1111 errno = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

1112 return socket.error(errno, os.strerror(errno)) 

1113 

1114 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]: 

1115 try: 

1116 return self.socket.recv_into(buf, len(buf)) 

1117 except BlockingIOError: 

1118 return None 

1119 finally: 

1120 del buf 

1121 

1122 def write_to_fd(self, data: memoryview) -> int: 

1123 try: 

1124 return self.socket.send(data) # type: ignore 

1125 finally: 

1126 # Avoid keeping to data, which can be a memoryview. 

1127 # See https://github.com/tornadoweb/tornado/pull/2008 

1128 del data 

1129 

1130 def connect( 

1131 self: _IOStreamType, address: Any, server_hostname: Optional[str] = None 

1132 ) -> "Future[_IOStreamType]": 

1133 """Connects the socket to a remote address without blocking. 

1134 

1135 May only be called if the socket passed to the constructor was 

1136 not previously connected. The address parameter is in the 

1137 same format as for `socket.connect <socket.socket.connect>` for 

1138 the type of socket passed to the IOStream constructor, 

1139 e.g. an ``(ip, port)`` tuple. Hostnames are accepted here, 

1140 but will be resolved synchronously and block the IOLoop. 

1141 If you have a hostname instead of an IP address, the `.TCPClient` 

1142 class is recommended instead of calling this method directly. 

1143 `.TCPClient` will do asynchronous DNS resolution and handle 

1144 both IPv4 and IPv6. 

1145 

1146 If ``callback`` is specified, it will be called with no 

1147 arguments when the connection is completed; if not this method 

1148 returns a `.Future` (whose result after a successful 

1149 connection will be the stream itself). 

1150 

1151 In SSL mode, the ``server_hostname`` parameter will be used 

1152 for certificate validation (unless disabled in the 

1153 ``ssl_options``) and SNI (if supported; requires Python 

1154 2.7.9+). 

1155 

1156 Note that it is safe to call `IOStream.write 

1157 <BaseIOStream.write>` while the connection is pending, in 

1158 which case the data will be written as soon as the connection 

1159 is ready. Calling `IOStream` read methods before the socket is 

1160 connected works on some platforms but is non-portable. 

1161 

1162 .. versionchanged:: 4.0 

1163 If no callback is given, returns a `.Future`. 

1164 

1165 .. versionchanged:: 4.2 

1166 SSL certificates are validated by default; pass 

1167 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a 

1168 suitably-configured `ssl.SSLContext` to the 

1169 `SSLIOStream` constructor to disable. 

1170 

1171 .. versionchanged:: 6.0 

1172 

1173 The ``callback`` argument was removed. Use the returned 

1174 `.Future` instead. 

1175 

1176 """ 

1177 self._connecting = True 

1178 future = Future() # type: Future[_IOStreamType] 

1179 self._connect_future = typing.cast("Future[IOStream]", future) 

1180 try: 

1181 self.socket.connect(address) 

1182 except BlockingIOError: 

1183 # In non-blocking mode we expect connect() to raise an 

1184 # exception with EINPROGRESS or EWOULDBLOCK. 

1185 pass 

1186 except socket.error as e: 

1187 # On freebsd, other errors such as ECONNREFUSED may be 

1188 # returned immediately when attempting to connect to 

1189 # localhost, so handle them the same way as an error 

1190 # reported later in _handle_connect. 

1191 if future is None: 

1192 gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), e) 

1193 self.close(exc_info=e) 

1194 return future 

1195 self._add_io_state(self.io_loop.WRITE) 

1196 return future 

1197 

1198 def start_tls( 

1199 self, 

1200 server_side: bool, 

1201 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None, 

1202 server_hostname: Optional[str] = None, 

1203 ) -> Awaitable["SSLIOStream"]: 

1204 """Convert this `IOStream` to an `SSLIOStream`. 

1205 

1206 This enables protocols that begin in clear-text mode and 

1207 switch to SSL after some initial negotiation (such as the 

1208 ``STARTTLS`` extension to SMTP and IMAP). 

1209 

1210 This method cannot be used if there are outstanding reads 

1211 or writes on the stream, or if there is any data in the 

1212 IOStream's buffer (data in the operating system's socket 

1213 buffer is allowed). This means it must generally be used 

1214 immediately after reading or writing the last clear-text 

1215 data. It can also be used immediately after connecting, 

1216 before any reads or writes. 

1217 

1218 The ``ssl_options`` argument may be either an `ssl.SSLContext` 

1219 object or a dictionary of keyword arguments for the 

1220 `ssl.SSLContext.wrap_socket` function. The ``server_hostname`` argument 

1221 will be used for certificate validation unless disabled 

1222 in the ``ssl_options``. 

1223 

1224 This method returns a `.Future` whose result is the new 

1225 `SSLIOStream`. After this method has been called, 

1226 any other operation on the original stream is undefined. 

1227 

1228 If a close callback is defined on this stream, it will be 

1229 transferred to the new stream. 

1230 

1231 .. versionadded:: 4.0 

1232 

1233 .. versionchanged:: 4.2 

1234 SSL certificates are validated by default; pass 

1235 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a 

1236 suitably-configured `ssl.SSLContext` to disable. 

1237 """ 

1238 if ( 

1239 self._read_future 

1240 or self._write_futures 

1241 or self._connect_future 

1242 or self._closed 

1243 or self._read_buffer 

1244 or self._write_buffer 

1245 ): 

1246 raise ValueError("IOStream is not idle; cannot convert to SSL") 

1247 if ssl_options is None: 

1248 if server_side: 

1249 ssl_options = _server_ssl_defaults 

1250 else: 

1251 ssl_options = _client_ssl_defaults 

1252 

1253 socket = self.socket 

1254 self.io_loop.remove_handler(socket) 

1255 self.socket = None # type: ignore 

1256 socket = ssl_wrap_socket( 

1257 socket, 

1258 ssl_options, 

1259 server_hostname=server_hostname, 

1260 server_side=server_side, 

1261 do_handshake_on_connect=False, 

1262 ) 

1263 orig_close_callback = self._close_callback 

1264 self._close_callback = None 

1265 

1266 future = Future() # type: Future[SSLIOStream] 

1267 ssl_stream = SSLIOStream(socket, ssl_options=ssl_options) 

1268 ssl_stream.set_close_callback(orig_close_callback) 

1269 ssl_stream._ssl_connect_future = future 

1270 ssl_stream.max_buffer_size = self.max_buffer_size 

1271 ssl_stream.read_chunk_size = self.read_chunk_size 

1272 return future 

1273 

1274 def _handle_connect(self) -> None: 

1275 try: 

1276 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

1277 except socket.error as e: 

1278 # Hurd doesn't allow SO_ERROR for loopback sockets because all 

1279 # errors for such sockets are reported synchronously. 

1280 if errno_from_exception(e) == errno.ENOPROTOOPT: 

1281 err = 0 

1282 if err != 0: 

1283 self.error = socket.error(err, os.strerror(err)) 

1284 # IOLoop implementations may vary: some of them return 

1285 # an error state before the socket becomes writable, so 

1286 # in that case a connection failure would be handled by the 

1287 # error path in _handle_events instead of here. 

1288 if self._connect_future is None: 

1289 gen_log.warning( 

1290 "Connect error on fd %s: %s", 

1291 self.socket.fileno(), 

1292 errno.errorcode[err], 

1293 ) 

1294 self.close() 

1295 return 

1296 if self._connect_future is not None: 

1297 future = self._connect_future 

1298 self._connect_future = None 

1299 future_set_result_unless_cancelled(future, self) 

1300 self._connecting = False 

1301 

1302 def set_nodelay(self, value: bool) -> None: 

1303 if self.socket is not None and self.socket.family in ( 

1304 socket.AF_INET, 

1305 socket.AF_INET6, 

1306 ): 

1307 try: 

1308 self.socket.setsockopt( 

1309 socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 if value else 0 

1310 ) 

1311 except socket.error as e: 

1312 # Sometimes setsockopt will fail if the socket is closed 

1313 # at the wrong time. This can happen with HTTPServer 

1314 # resetting the value to ``False`` between requests. 

1315 if e.errno != errno.EINVAL and not self._is_connreset(e): 

1316 raise 

1317 

1318 

1319class SSLIOStream(IOStream): 

1320 """A utility class to write to and read from a non-blocking SSL socket. 

1321 

1322 If the socket passed to the constructor is already connected, 

1323 it should be wrapped with:: 

1324 

1325 ssl.SSLContext(...).wrap_socket(sock, do_handshake_on_connect=False, **kwargs) 

1326 

1327 before constructing the `SSLIOStream`. Unconnected sockets will be 

1328 wrapped when `IOStream.connect` is finished. 

1329 """ 

1330 

1331 socket = None # type: ssl.SSLSocket 

1332 

1333 def __init__(self, *args: Any, **kwargs: Any) -> None: 

1334 """The ``ssl_options`` keyword argument may either be an 

1335 `ssl.SSLContext` object or a dictionary of keywords arguments 

1336 for `ssl.SSLContext.wrap_socket` 

1337 """ 

1338 self._ssl_options = kwargs.pop("ssl_options", _client_ssl_defaults) 

1339 super().__init__(*args, **kwargs) 

1340 self._ssl_accepting = True 

1341 self._handshake_reading = False 

1342 self._handshake_writing = False 

1343 self._server_hostname = None # type: Optional[str] 

1344 

1345 # If the socket is already connected, attempt to start the handshake. 

1346 try: 

1347 self.socket.getpeername() 

1348 except socket.error: 

1349 pass 

1350 else: 

1351 # Indirectly start the handshake, which will run on the next 

1352 # IOLoop iteration and then the real IO state will be set in 

1353 # _handle_events. 

1354 self._add_io_state(self.io_loop.WRITE) 

1355 

1356 def reading(self) -> bool: 

1357 return self._handshake_reading or super().reading() 

1358 

1359 def writing(self) -> bool: 

1360 return self._handshake_writing or super().writing() 

1361 

1362 def _do_ssl_handshake(self) -> None: 

1363 # Based on code from test_ssl.py in the python stdlib 

1364 try: 

1365 self._handshake_reading = False 

1366 self._handshake_writing = False 

1367 self.socket.do_handshake() 

1368 except ssl.SSLError as err: 

1369 if err.args[0] == ssl.SSL_ERROR_WANT_READ: 

1370 self._handshake_reading = True 

1371 return 

1372 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: 

1373 self._handshake_writing = True 

1374 return 

1375 elif err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): 

1376 return self.close(exc_info=err) 

1377 elif err.args[0] in (ssl.SSL_ERROR_SSL, ssl.SSL_ERROR_SYSCALL): 

1378 try: 

1379 peer = self.socket.getpeername() 

1380 except Exception: 

1381 peer = "(not connected)" 

1382 gen_log.warning( 

1383 "SSL Error on %s %s: %s", self.socket.fileno(), peer, err 

1384 ) 

1385 return self.close(exc_info=err) 

1386 raise 

1387 except ssl.CertificateError as err: 

1388 # CertificateError can happen during handshake (hostname 

1389 # verification) and should be passed to user. Starting 

1390 # in Python 3.7, this error is a subclass of SSLError 

1391 # and will be handled by the previous block instead. 

1392 return self.close(exc_info=err) 

1393 except socket.error as err: 

1394 # Some port scans (e.g. nmap in -sT mode) have been known 

1395 # to cause do_handshake to raise EBADF and ENOTCONN, so make 

1396 # those errors quiet as well. 

1397 # https://groups.google.com/forum/?fromgroups#!topic/python-tornado/ApucKJat1_0 

1398 # Errno 0 is also possible in some cases (nc -z). 

1399 # https://github.com/tornadoweb/tornado/issues/2504 

1400 if self._is_connreset(err) or err.args[0] in ( 

1401 0, 

1402 errno.EBADF, 

1403 errno.ENOTCONN, 

1404 ): 

1405 return self.close(exc_info=err) 

1406 raise 

1407 except AttributeError as err: 

1408 # On Linux, if the connection was reset before the call to 

1409 # wrap_socket, do_handshake will fail with an 

1410 # AttributeError. 

1411 return self.close(exc_info=err) 

1412 else: 

1413 self._ssl_accepting = False 

1414 # Prior to the introduction of SNI, this is where we would check 

1415 # the server's claimed hostname. 

1416 assert ssl.HAS_SNI 

1417 self._finish_ssl_connect() 

1418 

1419 def _finish_ssl_connect(self) -> None: 

1420 if self._ssl_connect_future is not None: 

1421 future = self._ssl_connect_future 

1422 self._ssl_connect_future = None 

1423 future_set_result_unless_cancelled(future, self) 

1424 

1425 def _handle_read(self) -> None: 

1426 if self._ssl_accepting: 

1427 self._do_ssl_handshake() 

1428 return 

1429 super()._handle_read() 

1430 

1431 def _handle_write(self) -> None: 

1432 if self._ssl_accepting: 

1433 self._do_ssl_handshake() 

1434 return 

1435 super()._handle_write() 

1436 

1437 def connect( 

1438 self, address: Tuple, server_hostname: Optional[str] = None 

1439 ) -> "Future[SSLIOStream]": 

1440 self._server_hostname = server_hostname 

1441 # Ignore the result of connect(). If it fails, 

1442 # wait_for_handshake will raise an error too. This is 

1443 # necessary for the old semantics of the connect callback 

1444 # (which takes no arguments). In 6.0 this can be refactored to 

1445 # be a regular coroutine. 

1446 # TODO: This is trickier than it looks, since if write() 

1447 # is called with a connect() pending, we want the connect 

1448 # to resolve before the write. Or do we care about this? 

1449 # (There's a test for it, but I think in practice users 

1450 # either wait for the connect before performing a write or 

1451 # they don't care about the connect Future at all) 

1452 fut = super().connect(address) 

1453 fut.add_done_callback(lambda f: f.exception()) 

1454 return self.wait_for_handshake() 

1455 

1456 def _handle_connect(self) -> None: 

1457 # Call the superclass method to check for errors. 

1458 super()._handle_connect() 

1459 if self.closed(): 

1460 return 

1461 # When the connection is complete, wrap the socket for SSL 

1462 # traffic. Note that we do this by overriding _handle_connect 

1463 # instead of by passing a callback to super().connect because 

1464 # user callbacks are enqueued asynchronously on the IOLoop, 

1465 # but since _handle_events calls _handle_connect immediately 

1466 # followed by _handle_write we need this to be synchronous. 

1467 # 

1468 # The IOLoop will get confused if we swap out self.socket while the 

1469 # fd is registered, so remove it now and re-register after 

1470 # wrap_socket(). 

1471 self.io_loop.remove_handler(self.socket) 

1472 old_state = self._state 

1473 assert old_state is not None 

1474 self._state = None 

1475 self.socket = ssl_wrap_socket( 

1476 self.socket, 

1477 self._ssl_options, 

1478 server_hostname=self._server_hostname, 

1479 do_handshake_on_connect=False, 

1480 server_side=False, 

1481 ) 

1482 self._add_io_state(old_state) 

1483 

1484 def wait_for_handshake(self) -> "Future[SSLIOStream]": 

1485 """Wait for the initial SSL handshake to complete. 

1486 

1487 If a ``callback`` is given, it will be called with no 

1488 arguments once the handshake is complete; otherwise this 

1489 method returns a `.Future` which will resolve to the 

1490 stream itself after the handshake is complete. 

1491 

1492 Once the handshake is complete, information such as 

1493 the peer's certificate and NPN/ALPN selections may be 

1494 accessed on ``self.socket``. 

1495 

1496 This method is intended for use on server-side streams 

1497 or after using `IOStream.start_tls`; it should not be used 

1498 with `IOStream.connect` (which already waits for the 

1499 handshake to complete). It may only be called once per stream. 

1500 

1501 .. versionadded:: 4.2 

1502 

1503 .. versionchanged:: 6.0 

1504 

1505 The ``callback`` argument was removed. Use the returned 

1506 `.Future` instead. 

1507 

1508 """ 

1509 if self._ssl_connect_future is not None: 

1510 raise RuntimeError("Already waiting") 

1511 future = self._ssl_connect_future = Future() 

1512 if not self._ssl_accepting: 

1513 self._finish_ssl_connect() 

1514 return future 

1515 

1516 def write_to_fd(self, data: memoryview) -> int: 

1517 # clip buffer size at 1GB since SSL sockets only support upto 2GB 

1518 # this change in behaviour is transparent, since the function is 

1519 # already expected to (possibly) write less than the provided buffer 

1520 if len(data) >> 30: 

1521 data = memoryview(data)[: 1 << 30] 

1522 try: 

1523 return self.socket.send(data) # type: ignore 

1524 except ssl.SSLError as e: 

1525 if e.args[0] == ssl.SSL_ERROR_WANT_WRITE: 

1526 # In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if 

1527 # the socket is not writeable; we need to transform this into 

1528 # an EWOULDBLOCK socket.error or a zero return value, 

1529 # either of which will be recognized by the caller of this 

1530 # method. Prior to Python 3.5, an unwriteable socket would 

1531 # simply return 0 bytes written. 

1532 return 0 

1533 raise 

1534 finally: 

1535 # Avoid keeping to data, which can be a memoryview. 

1536 # See https://github.com/tornadoweb/tornado/pull/2008 

1537 del data 

1538 

1539 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]: 

1540 try: 

1541 if self._ssl_accepting: 

1542 # If the handshake hasn't finished yet, there can't be anything 

1543 # to read (attempting to read may or may not raise an exception 

1544 # depending on the SSL version) 

1545 return None 

1546 # clip buffer size at 1GB since SSL sockets only support upto 2GB 

1547 # this change in behaviour is transparent, since the function is 

1548 # already expected to (possibly) read less than the provided buffer 

1549 if len(buf) >> 30: 

1550 buf = memoryview(buf)[: 1 << 30] 

1551 try: 

1552 return self.socket.recv_into(buf, len(buf)) 

1553 except ssl.SSLError as e: 

1554 # SSLError is a subclass of socket.error, so this except 

1555 # block must come first. 

1556 if e.args[0] == ssl.SSL_ERROR_WANT_READ: 

1557 return None 

1558 else: 

1559 raise 

1560 except BlockingIOError: 

1561 return None 

1562 finally: 

1563 del buf 

1564 

1565 def _is_connreset(self, e: BaseException) -> bool: 

1566 if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF: 

1567 return True 

1568 return super()._is_connreset(e) 

1569 

1570 

1571class PipeIOStream(BaseIOStream): 

1572 """Pipe-based `IOStream` implementation. 

1573 

1574 The constructor takes an integer file descriptor (such as one returned 

1575 by `os.pipe`) rather than an open file object. Pipes are generally 

1576 one-way, so a `PipeIOStream` can be used for reading or writing but not 

1577 both. 

1578 

1579 ``PipeIOStream`` is only available on Unix-based platforms. 

1580 """ 

1581 

1582 def __init__(self, fd: int, *args: Any, **kwargs: Any) -> None: 

1583 self.fd = fd 

1584 self._fio = io.FileIO(self.fd, "r+") 

1585 if sys.platform == "win32": 

1586 # The form and placement of this assertion is important to mypy. 

1587 # A plain assert statement isn't recognized here. If the assertion 

1588 # were earlier it would worry that the attributes of self aren't 

1589 # set on windows. If it were missing it would complain about 

1590 # the absence of the set_blocking function. 

1591 raise AssertionError("PipeIOStream is not supported on Windows") 

1592 os.set_blocking(fd, False) 

1593 super().__init__(*args, **kwargs) 

1594 

1595 def fileno(self) -> int: 

1596 return self.fd 

1597 

1598 def close_fd(self) -> None: 

1599 self._fio.close() 

1600 

1601 def write_to_fd(self, data: memoryview) -> int: 

1602 try: 

1603 return os.write(self.fd, data) # type: ignore 

1604 finally: 

1605 # Avoid keeping to data, which can be a memoryview. 

1606 # See https://github.com/tornadoweb/tornado/pull/2008 

1607 del data 

1608 

1609 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]: 

1610 try: 

1611 return self._fio.readinto(buf) # type: ignore 

1612 except (IOError, OSError) as e: 

1613 if errno_from_exception(e) == errno.EBADF: 

1614 # If the writing half of a pipe is closed, select will 

1615 # report it as readable but reads will fail with EBADF. 

1616 self.close(exc_info=e) 

1617 return None 

1618 else: 

1619 raise 

1620 finally: 

1621 del buf 

1622 

1623 

1624def doctests() -> Any: 

1625 import doctest 

1626 

1627 return doctest.DocTestSuite()