Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/iostream.py: 15%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Copyright 2009 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""Utility classes to write to and read from non-blocking files and sockets.
18Contents:
20* `BaseIOStream`: Generic interface for reading and writing.
21* `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
22* `SSLIOStream`: SSL-aware version of IOStream.
23* `PipeIOStream`: Pipe-based IOStream implementation.
24"""
26import asyncio
27import collections
28import errno
29import io
30import numbers
31import os
32import socket
33import ssl
34import sys
35import re
37from tornado.concurrent import Future, future_set_result_unless_cancelled
38from tornado import ioloop
39from tornado.log import gen_log
40from tornado.netutil import ssl_wrap_socket, _client_ssl_defaults, _server_ssl_defaults
41from tornado.util import errno_from_exception
43import typing
44from typing import (
45 Union,
46 Optional,
47 Awaitable,
48 Callable,
49 Pattern,
50 Any,
51 Dict,
52 TypeVar,
53 Tuple,
54)
55from types import TracebackType
57if typing.TYPE_CHECKING:
58 from typing import Deque, List, Type # noqa: F401
60_IOStreamType = TypeVar("_IOStreamType", bound="IOStream")
62# These errnos indicate that a connection has been abruptly terminated.
63# They should be caught and handled less noisily than other errors.
64_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE, errno.ETIMEDOUT)
66if hasattr(errno, "WSAECONNRESET"):
67 _ERRNO_CONNRESET += ( # type: ignore
68 errno.WSAECONNRESET, # type: ignore
69 errno.WSAECONNABORTED, # type: ignore
70 errno.WSAETIMEDOUT, # type: ignore
71 )
73if sys.platform == "darwin":
74 # OSX appears to have a race condition that causes send(2) to return
75 # EPROTOTYPE if called while a socket is being torn down:
76 # http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/
77 # Since the socket is being closed anyway, treat this as an ECONNRESET
78 # instead of an unexpected error.
79 _ERRNO_CONNRESET += (errno.EPROTOTYPE,) # type: ignore
81_WINDOWS = sys.platform.startswith("win")
84class StreamClosedError(IOError):
85 """Exception raised by `IOStream` methods when the stream is closed.
87 Note that the close callback is scheduled to run *after* other
88 callbacks on the stream (to allow for buffered data to be processed),
89 so you may see this error before you see the close callback.
91 The ``real_error`` attribute contains the underlying error that caused
92 the stream to close (if any).
94 .. versionchanged:: 4.3
95 Added the ``real_error`` attribute.
96 """
98 def __init__(self, real_error: Optional[BaseException] = None) -> None:
99 super().__init__("Stream is closed")
100 self.real_error = real_error
103class UnsatisfiableReadError(Exception):
104 """Exception raised when a read cannot be satisfied.
106 Raised by ``read_until`` and ``read_until_regex`` with a ``max_bytes``
107 argument.
108 """
110 pass
113class StreamBufferFullError(Exception):
114 """Exception raised by `IOStream` methods when the buffer is full."""
117class _StreamBuffer(object):
118 """
119 A specialized buffer that tries to avoid copies when large pieces
120 of data are encountered.
121 """
123 def __init__(self) -> None:
124 # A sequence of (False, bytearray) and (True, memoryview) objects
125 self._buffers = (
126 collections.deque()
127 ) # type: Deque[Tuple[bool, Union[bytearray, memoryview]]]
128 # Position in the first buffer
129 self._first_pos = 0
130 self._size = 0
132 def __len__(self) -> int:
133 return self._size
135 # Data above this size will be appended separately instead
136 # of extending an existing bytearray
137 _large_buf_threshold = 2048
139 def append(self, data: Union[bytes, bytearray, memoryview]) -> None:
140 """
141 Append the given piece of data (should be a buffer-compatible object).
142 """
143 size = len(data)
144 if size > self._large_buf_threshold:
145 if not isinstance(data, memoryview):
146 data = memoryview(data)
147 self._buffers.append((True, data))
148 elif size > 0:
149 if self._buffers:
150 is_memview, b = self._buffers[-1]
151 new_buf = is_memview or len(b) >= self._large_buf_threshold
152 else:
153 new_buf = True
154 if new_buf:
155 self._buffers.append((False, bytearray(data)))
156 else:
157 b += data # type: ignore
159 self._size += size
161 def peek(self, size: int) -> memoryview:
162 """
163 Get a view over at most ``size`` bytes (possibly fewer) at the
164 current buffer position.
165 """
166 assert size > 0
167 try:
168 is_memview, b = self._buffers[0]
169 except IndexError:
170 return memoryview(b"")
172 pos = self._first_pos
173 if is_memview:
174 return typing.cast(memoryview, b[pos : pos + size])
175 else:
176 return memoryview(b)[pos : pos + size]
178 def advance(self, size: int) -> None:
179 """
180 Advance the current buffer position by ``size`` bytes.
181 """
182 assert 0 < size <= self._size
183 self._size -= size
184 pos = self._first_pos
186 buffers = self._buffers
187 while buffers and size > 0:
188 is_large, b = buffers[0]
189 b_remain = len(b) - size - pos
190 if b_remain <= 0:
191 buffers.popleft()
192 size -= len(b) - pos
193 pos = 0
194 elif is_large:
195 pos += size
196 size = 0
197 else:
198 pos += size
199 del typing.cast(bytearray, b)[:pos]
200 pos = 0
201 size = 0
203 assert size == 0
204 self._first_pos = pos
207class BaseIOStream(object):
208 """A utility class to write to and read from a non-blocking file or socket.
210 We support a non-blocking ``write()`` and a family of ``read_*()``
211 methods. When the operation completes, the ``Awaitable`` will resolve
212 with the data read (or ``None`` for ``write()``). All outstanding
213 ``Awaitables`` will resolve with a `StreamClosedError` when the
214 stream is closed; `.BaseIOStream.set_close_callback` can also be used
215 to be notified of a closed stream.
217 When a stream is closed due to an error, the IOStream's ``error``
218 attribute contains the exception object.
220 Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
221 `read_from_fd`, and optionally `get_fd_error`.
223 """
225 def __init__(
226 self,
227 max_buffer_size: Optional[int] = None,
228 read_chunk_size: Optional[int] = None,
229 max_write_buffer_size: Optional[int] = None,
230 ) -> None:
231 """`BaseIOStream` constructor.
233 :arg max_buffer_size: Maximum amount of incoming data to buffer;
234 defaults to 100MB.
235 :arg read_chunk_size: Amount of data to read at one time from the
236 underlying transport; defaults to 64KB.
237 :arg max_write_buffer_size: Amount of outgoing data to buffer;
238 defaults to unlimited.
240 .. versionchanged:: 4.0
241 Add the ``max_write_buffer_size`` parameter. Changed default
242 ``read_chunk_size`` to 64KB.
243 .. versionchanged:: 5.0
244 The ``io_loop`` argument (deprecated since version 4.1) has been
245 removed.
246 """
247 self.io_loop = ioloop.IOLoop.current()
248 self.max_buffer_size = max_buffer_size or 104857600
249 # A chunk size that is too close to max_buffer_size can cause
250 # spurious failures.
251 self.read_chunk_size = min(read_chunk_size or 65536, self.max_buffer_size // 2)
252 self.max_write_buffer_size = max_write_buffer_size
253 self.error = None # type: Optional[BaseException]
254 self._read_buffer = bytearray()
255 self._read_buffer_size = 0
256 self._user_read_buffer = False
257 self._after_user_read_buffer = None # type: Optional[bytearray]
258 self._write_buffer = _StreamBuffer()
259 self._total_write_index = 0
260 self._total_write_done_index = 0
261 self._read_delimiter = None # type: Optional[bytes]
262 self._read_regex = None # type: Optional[Pattern]
263 self._read_max_bytes = None # type: Optional[int]
264 self._read_bytes = None # type: Optional[int]
265 self._read_partial = False
266 self._read_until_close = False
267 self._read_future = None # type: Optional[Future]
268 self._write_futures = (
269 collections.deque()
270 ) # type: Deque[Tuple[int, Future[None]]]
271 self._close_callback = None # type: Optional[Callable[[], None]]
272 self._connect_future = None # type: Optional[Future[IOStream]]
273 # _ssl_connect_future should be defined in SSLIOStream
274 # but it's here so we can clean it up in _signal_closed
275 # TODO: refactor that so subclasses can add additional futures
276 # to be cancelled.
277 self._ssl_connect_future = None # type: Optional[Future[SSLIOStream]]
278 self._connecting = False
279 self._state = None # type: Optional[int]
280 self._closed = False
282 def fileno(self) -> Union[int, ioloop._Selectable]:
283 """Returns the file descriptor for this stream."""
284 raise NotImplementedError()
286 def close_fd(self) -> None:
287 """Closes the file underlying this stream.
289 ``close_fd`` is called by `BaseIOStream` and should not be called
290 elsewhere; other users should call `close` instead.
291 """
292 raise NotImplementedError()
294 def write_to_fd(self, data: memoryview) -> int:
295 """Attempts to write ``data`` to the underlying file.
297 Returns the number of bytes written.
298 """
299 raise NotImplementedError()
301 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
302 """Attempts to read from the underlying file.
304 Reads up to ``len(buf)`` bytes, storing them in the buffer.
305 Returns the number of bytes read. Returns None if there was
306 nothing to read (the socket returned `~errno.EWOULDBLOCK` or
307 equivalent), and zero on EOF.
309 .. versionchanged:: 5.0
311 Interface redesigned to take a buffer and return a number
312 of bytes instead of a freshly-allocated object.
313 """
314 raise NotImplementedError()
316 def get_fd_error(self) -> Optional[Exception]:
317 """Returns information about any error on the underlying file.
319 This method is called after the `.IOLoop` has signaled an error on the
320 file descriptor, and should return an Exception (such as `socket.error`
321 with additional information, or None if no such information is
322 available.
323 """
324 return None
326 def read_until_regex(
327 self, regex: bytes, max_bytes: Optional[int] = None
328 ) -> Awaitable[bytes]:
329 """Asynchronously read until we have matched the given regex.
331 The result includes the data that matches the regex and anything
332 that came before it.
334 If ``max_bytes`` is not None, the connection will be closed
335 if more than ``max_bytes`` bytes have been read and the regex is
336 not satisfied.
338 .. versionchanged:: 4.0
339 Added the ``max_bytes`` argument. The ``callback`` argument is
340 now optional and a `.Future` will be returned if it is omitted.
342 .. versionchanged:: 6.0
344 The ``callback`` argument was removed. Use the returned
345 `.Future` instead.
347 """
348 future = self._start_read()
349 self._read_regex = re.compile(regex)
350 self._read_max_bytes = max_bytes
351 try:
352 self._try_inline_read()
353 except UnsatisfiableReadError as e:
354 # Handle this the same way as in _handle_events.
355 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
356 self.close(exc_info=e)
357 return future
358 except:
359 # Ensure that the future doesn't log an error because its
360 # failure was never examined.
361 future.add_done_callback(lambda f: f.exception())
362 raise
363 return future
365 def read_until(
366 self, delimiter: bytes, max_bytes: Optional[int] = None
367 ) -> Awaitable[bytes]:
368 """Asynchronously read until we have found the given delimiter.
370 The result includes all the data read including the delimiter.
372 If ``max_bytes`` is not None, the connection will be closed
373 if more than ``max_bytes`` bytes have been read and the delimiter
374 is not found.
376 .. versionchanged:: 4.0
377 Added the ``max_bytes`` argument. The ``callback`` argument is
378 now optional and a `.Future` will be returned if it is omitted.
380 .. versionchanged:: 6.0
382 The ``callback`` argument was removed. Use the returned
383 `.Future` instead.
384 """
385 future = self._start_read()
386 self._read_delimiter = delimiter
387 self._read_max_bytes = max_bytes
388 try:
389 self._try_inline_read()
390 except UnsatisfiableReadError as e:
391 # Handle this the same way as in _handle_events.
392 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
393 self.close(exc_info=e)
394 return future
395 except:
396 future.add_done_callback(lambda f: f.exception())
397 raise
398 return future
400 def read_bytes(self, num_bytes: int, partial: bool = False) -> Awaitable[bytes]:
401 """Asynchronously read a number of bytes.
403 If ``partial`` is true, data is returned as soon as we have
404 any bytes to return (but never more than ``num_bytes``)
406 .. versionchanged:: 4.0
407 Added the ``partial`` argument. The callback argument is now
408 optional and a `.Future` will be returned if it is omitted.
410 .. versionchanged:: 6.0
412 The ``callback`` and ``streaming_callback`` arguments have
413 been removed. Use the returned `.Future` (and
414 ``partial=True`` for ``streaming_callback``) instead.
416 """
417 future = self._start_read()
418 assert isinstance(num_bytes, numbers.Integral)
419 self._read_bytes = num_bytes
420 self._read_partial = partial
421 try:
422 self._try_inline_read()
423 except:
424 future.add_done_callback(lambda f: f.exception())
425 raise
426 return future
428 def read_into(self, buf: bytearray, partial: bool = False) -> Awaitable[int]:
429 """Asynchronously read a number of bytes.
431 ``buf`` must be a writable buffer into which data will be read.
433 If ``partial`` is true, the callback is run as soon as any bytes
434 have been read. Otherwise, it is run when the ``buf`` has been
435 entirely filled with read data.
437 .. versionadded:: 5.0
439 .. versionchanged:: 6.0
441 The ``callback`` argument was removed. Use the returned
442 `.Future` instead.
444 """
445 future = self._start_read()
447 # First copy data already in read buffer
448 available_bytes = self._read_buffer_size
449 n = len(buf)
450 if available_bytes >= n:
451 buf[:] = memoryview(self._read_buffer)[:n]
452 del self._read_buffer[:n]
453 self._after_user_read_buffer = self._read_buffer
454 elif available_bytes > 0:
455 buf[:available_bytes] = memoryview(self._read_buffer)[:]
457 # Set up the supplied buffer as our temporary read buffer.
458 # The original (if it had any data remaining) has been
459 # saved for later.
460 self._user_read_buffer = True
461 self._read_buffer = buf
462 self._read_buffer_size = available_bytes
463 self._read_bytes = n
464 self._read_partial = partial
466 try:
467 self._try_inline_read()
468 except:
469 future.add_done_callback(lambda f: f.exception())
470 raise
471 return future
473 def read_until_close(self) -> Awaitable[bytes]:
474 """Asynchronously reads all data from the socket until it is closed.
476 This will buffer all available data until ``max_buffer_size``
477 is reached. If flow control or cancellation are desired, use a
478 loop with `read_bytes(partial=True) <.read_bytes>` instead.
480 .. versionchanged:: 4.0
481 The callback argument is now optional and a `.Future` will
482 be returned if it is omitted.
484 .. versionchanged:: 6.0
486 The ``callback`` and ``streaming_callback`` arguments have
487 been removed. Use the returned `.Future` (and `read_bytes`
488 with ``partial=True`` for ``streaming_callback``) instead.
490 """
491 future = self._start_read()
492 if self.closed():
493 self._finish_read(self._read_buffer_size)
494 return future
495 self._read_until_close = True
496 try:
497 self._try_inline_read()
498 except:
499 future.add_done_callback(lambda f: f.exception())
500 raise
501 return future
503 def write(self, data: Union[bytes, memoryview]) -> "Future[None]":
504 """Asynchronously write the given data to this stream.
506 This method returns a `.Future` that resolves (with a result
507 of ``None``) when the write has been completed.
509 The ``data`` argument may be of type `bytes` or `memoryview`.
511 .. versionchanged:: 4.0
512 Now returns a `.Future` if no callback is given.
514 .. versionchanged:: 4.5
515 Added support for `memoryview` arguments.
517 .. versionchanged:: 6.0
519 The ``callback`` argument was removed. Use the returned
520 `.Future` instead.
522 """
523 self._check_closed()
524 if data:
525 if isinstance(data, memoryview):
526 # Make sure that ``len(data) == data.nbytes``
527 data = memoryview(data).cast("B")
528 if (
529 self.max_write_buffer_size is not None
530 and len(self._write_buffer) + len(data) > self.max_write_buffer_size
531 ):
532 raise StreamBufferFullError("Reached maximum write buffer size")
533 self._write_buffer.append(data)
534 self._total_write_index += len(data)
535 future = Future() # type: Future[None]
536 future.add_done_callback(lambda f: f.exception())
537 self._write_futures.append((self._total_write_index, future))
538 if not self._connecting:
539 self._handle_write()
540 if self._write_buffer:
541 self._add_io_state(self.io_loop.WRITE)
542 self._maybe_add_error_listener()
543 return future
545 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None:
546 """Call the given callback when the stream is closed.
548 This mostly is not necessary for applications that use the
549 `.Future` interface; all outstanding ``Futures`` will resolve
550 with a `StreamClosedError` when the stream is closed. However,
551 it is still useful as a way to signal that the stream has been
552 closed while no other read or write is in progress.
554 Unlike other callback-based interfaces, ``set_close_callback``
555 was not removed in Tornado 6.0.
556 """
557 self._close_callback = callback
558 self._maybe_add_error_listener()
560 def close(
561 self,
562 exc_info: Union[
563 None,
564 bool,
565 BaseException,
566 Tuple[
567 "Optional[Type[BaseException]]",
568 Optional[BaseException],
569 Optional[TracebackType],
570 ],
571 ] = False,
572 ) -> None:
573 """Close this stream.
575 If ``exc_info`` is true, set the ``error`` attribute to the current
576 exception from `sys.exc_info` (or if ``exc_info`` is a tuple,
577 use that instead of `sys.exc_info`).
578 """
579 if not self.closed():
580 if exc_info:
581 if isinstance(exc_info, tuple):
582 self.error = exc_info[1]
583 elif isinstance(exc_info, BaseException):
584 self.error = exc_info
585 else:
586 exc_info = sys.exc_info()
587 if any(exc_info):
588 self.error = exc_info[1]
589 if self._read_until_close:
590 self._read_until_close = False
591 self._finish_read(self._read_buffer_size)
592 elif self._read_future is not None:
593 # resolve reads that are pending and ready to complete
594 try:
595 pos = self._find_read_pos()
596 except UnsatisfiableReadError:
597 pass
598 else:
599 if pos is not None:
600 self._read_from_buffer(pos)
601 if self._state is not None:
602 self.io_loop.remove_handler(self.fileno())
603 self._state = None
604 self.close_fd()
605 self._closed = True
606 self._signal_closed()
608 def _signal_closed(self) -> None:
609 futures = [] # type: List[Future]
610 if self._read_future is not None:
611 futures.append(self._read_future)
612 self._read_future = None
613 futures += [future for _, future in self._write_futures]
614 self._write_futures.clear()
615 if self._connect_future is not None:
616 futures.append(self._connect_future)
617 self._connect_future = None
618 for future in futures:
619 if not future.done():
620 future.set_exception(StreamClosedError(real_error=self.error))
621 # Reference the exception to silence warnings. Annoyingly,
622 # this raises if the future was cancelled, but just
623 # returns any other error.
624 try:
625 future.exception()
626 except asyncio.CancelledError:
627 pass
628 if self._ssl_connect_future is not None:
629 # _ssl_connect_future expects to see the real exception (typically
630 # an ssl.SSLError), not just StreamClosedError.
631 if not self._ssl_connect_future.done():
632 if self.error is not None:
633 self._ssl_connect_future.set_exception(self.error)
634 else:
635 self._ssl_connect_future.set_exception(StreamClosedError())
636 self._ssl_connect_future.exception()
637 self._ssl_connect_future = None
638 if self._close_callback is not None:
639 cb = self._close_callback
640 self._close_callback = None
641 self.io_loop.add_callback(cb)
642 # Clear the buffers so they can be cleared immediately even
643 # if the IOStream object is kept alive by a reference cycle.
644 # TODO: Clear the read buffer too; it currently breaks some tests.
645 self._write_buffer = None # type: ignore
647 def reading(self) -> bool:
648 """Returns ``True`` if we are currently reading from the stream."""
649 return self._read_future is not None
651 def writing(self) -> bool:
652 """Returns ``True`` if we are currently writing to the stream."""
653 return bool(self._write_buffer)
655 def closed(self) -> bool:
656 """Returns ``True`` if the stream has been closed."""
657 return self._closed
659 def set_nodelay(self, value: bool) -> None:
660 """Sets the no-delay flag for this stream.
662 By default, data written to TCP streams may be held for a time
663 to make the most efficient use of bandwidth (according to
664 Nagle's algorithm). The no-delay flag requests that data be
665 written as soon as possible, even if doing so would consume
666 additional bandwidth.
668 This flag is currently defined only for TCP-based ``IOStreams``.
670 .. versionadded:: 3.1
671 """
672 pass
674 def _handle_connect(self) -> None:
675 raise NotImplementedError()
677 def _handle_events(self, fd: Union[int, ioloop._Selectable], events: int) -> None:
678 if self.closed():
679 gen_log.warning("Got events for closed stream %s", fd)
680 return
681 try:
682 if self._connecting:
683 # Most IOLoops will report a write failed connect
684 # with the WRITE event, but SelectIOLoop reports a
685 # READ as well so we must check for connecting before
686 # either.
687 self._handle_connect()
688 if self.closed():
689 return
690 if events & self.io_loop.READ:
691 self._handle_read()
692 if self.closed():
693 return
694 if events & self.io_loop.WRITE:
695 self._handle_write()
696 if self.closed():
697 return
698 if events & self.io_loop.ERROR:
699 self.error = self.get_fd_error()
700 # We may have queued up a user callback in _handle_read or
701 # _handle_write, so don't close the IOStream until those
702 # callbacks have had a chance to run.
703 self.io_loop.add_callback(self.close)
704 return
705 state = self.io_loop.ERROR
706 if self.reading():
707 state |= self.io_loop.READ
708 if self.writing():
709 state |= self.io_loop.WRITE
710 if state == self.io_loop.ERROR and self._read_buffer_size == 0:
711 # If the connection is idle, listen for reads too so
712 # we can tell if the connection is closed. If there is
713 # data in the read buffer we won't run the close callback
714 # yet anyway, so we don't need to listen in this case.
715 state |= self.io_loop.READ
716 if state != self._state:
717 assert (
718 self._state is not None
719 ), "shouldn't happen: _handle_events without self._state"
720 self._state = state
721 self.io_loop.update_handler(self.fileno(), self._state)
722 except UnsatisfiableReadError as e:
723 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
724 self.close(exc_info=e)
725 except Exception as e:
726 gen_log.error("Uncaught exception, closing connection.", exc_info=True)
727 self.close(exc_info=e)
728 raise
730 def _read_to_buffer_loop(self) -> Optional[int]:
731 # This method is called from _handle_read and _try_inline_read.
732 if self._read_bytes is not None:
733 target_bytes = self._read_bytes # type: Optional[int]
734 elif self._read_max_bytes is not None:
735 target_bytes = self._read_max_bytes
736 elif self.reading():
737 # For read_until without max_bytes, or
738 # read_until_close, read as much as we can before
739 # scanning for the delimiter.
740 target_bytes = None
741 else:
742 target_bytes = 0
743 next_find_pos = 0
744 while not self.closed():
745 # Read from the socket until we get EWOULDBLOCK or equivalent.
746 # SSL sockets do some internal buffering, and if the data is
747 # sitting in the SSL object's buffer select() and friends
748 # can't see it; the only way to find out if it's there is to
749 # try to read it.
750 if self._read_to_buffer() == 0:
751 break
753 # If we've read all the bytes we can use, break out of
754 # this loop.
756 # If we've reached target_bytes, we know we're done.
757 if target_bytes is not None and self._read_buffer_size >= target_bytes:
758 break
760 # Otherwise, we need to call the more expensive find_read_pos.
761 # It's inefficient to do this on every read, so instead
762 # do it on the first read and whenever the read buffer
763 # size has doubled.
764 if self._read_buffer_size >= next_find_pos:
765 pos = self._find_read_pos()
766 if pos is not None:
767 return pos
768 next_find_pos = self._read_buffer_size * 2
769 return self._find_read_pos()
771 def _handle_read(self) -> None:
772 try:
773 pos = self._read_to_buffer_loop()
774 except UnsatisfiableReadError:
775 raise
776 except asyncio.CancelledError:
777 raise
778 except Exception as e:
779 gen_log.warning("error on read: %s" % e)
780 self.close(exc_info=e)
781 return
782 if pos is not None:
783 self._read_from_buffer(pos)
785 def _start_read(self) -> Future:
786 if self._read_future is not None:
787 # It is an error to start a read while a prior read is unresolved.
788 # However, if the prior read is unresolved because the stream was
789 # closed without satisfying it, it's better to raise
790 # StreamClosedError instead of AssertionError. In particular, this
791 # situation occurs in harmless situations in http1connection.py and
792 # an AssertionError would be logged noisily.
793 #
794 # On the other hand, it is legal to start a new read while the
795 # stream is closed, in case the read can be satisfied from the
796 # read buffer. So we only want to check the closed status of the
797 # stream if we need to decide what kind of error to raise for
798 # "already reading".
799 #
800 # These conditions have proven difficult to test; we have no
801 # unittests that reliably verify this behavior so be careful
802 # when making changes here. See #2651 and #2719.
803 self._check_closed()
804 assert self._read_future is None, "Already reading"
805 self._read_future = Future()
806 return self._read_future
808 def _finish_read(self, size: int) -> None:
809 if self._user_read_buffer:
810 self._read_buffer = self._after_user_read_buffer or bytearray()
811 self._after_user_read_buffer = None
812 self._read_buffer_size = len(self._read_buffer)
813 self._user_read_buffer = False
814 result = size # type: Union[int, bytes]
815 else:
816 result = self._consume(size)
817 if self._read_future is not None:
818 future = self._read_future
819 self._read_future = None
820 future_set_result_unless_cancelled(future, result)
821 self._maybe_add_error_listener()
823 def _try_inline_read(self) -> None:
824 """Attempt to complete the current read operation from buffered data.
826 If the read can be completed without blocking, schedules the
827 read callback on the next IOLoop iteration; otherwise starts
828 listening for reads on the socket.
829 """
830 # See if we've already got the data from a previous read
831 pos = self._find_read_pos()
832 if pos is not None:
833 self._read_from_buffer(pos)
834 return
835 self._check_closed()
836 pos = self._read_to_buffer_loop()
837 if pos is not None:
838 self._read_from_buffer(pos)
839 return
840 # We couldn't satisfy the read inline, so make sure we're
841 # listening for new data unless the stream is closed.
842 if not self.closed():
843 self._add_io_state(ioloop.IOLoop.READ)
845 def _read_to_buffer(self) -> Optional[int]:
846 """Reads from the socket and appends the result to the read buffer.
848 Returns the number of bytes read. Returns 0 if there is nothing
849 to read (i.e. the read returns EWOULDBLOCK or equivalent). On
850 error closes the socket and raises an exception.
851 """
852 try:
853 while True:
854 try:
855 if self._user_read_buffer:
856 buf = memoryview(self._read_buffer)[
857 self._read_buffer_size :
858 ] # type: Union[memoryview, bytearray]
859 else:
860 buf = bytearray(self.read_chunk_size)
861 bytes_read = self.read_from_fd(buf)
862 except (socket.error, IOError, OSError) as e:
863 # ssl.SSLError is a subclass of socket.error
864 if self._is_connreset(e):
865 # Treat ECONNRESET as a connection close rather than
866 # an error to minimize log spam (the exception will
867 # be available on self.error for apps that care).
868 self.close(exc_info=e)
869 return None
870 self.close(exc_info=e)
871 raise
872 break
873 if bytes_read is None:
874 return 0
875 elif bytes_read == 0:
876 self.close()
877 return 0
878 if not self._user_read_buffer:
879 self._read_buffer += memoryview(buf)[:bytes_read]
880 self._read_buffer_size += bytes_read
881 finally:
882 # Break the reference to buf so we don't waste a chunk's worth of
883 # memory in case an exception hangs on to our stack frame.
884 del buf
885 if self._read_buffer_size > self.max_buffer_size:
886 gen_log.error("Reached maximum read buffer size")
887 self.close()
888 raise StreamBufferFullError("Reached maximum read buffer size")
889 return bytes_read
891 def _read_from_buffer(self, pos: int) -> None:
892 """Attempts to complete the currently-pending read from the buffer.
894 The argument is either a position in the read buffer or None,
895 as returned by _find_read_pos.
896 """
897 self._read_bytes = self._read_delimiter = self._read_regex = None
898 self._read_partial = False
899 self._finish_read(pos)
901 def _find_read_pos(self) -> Optional[int]:
902 """Attempts to find a position in the read buffer that satisfies
903 the currently-pending read.
905 Returns a position in the buffer if the current read can be satisfied,
906 or None if it cannot.
907 """
908 if self._read_bytes is not None and (
909 self._read_buffer_size >= self._read_bytes
910 or (self._read_partial and self._read_buffer_size > 0)
911 ):
912 num_bytes = min(self._read_bytes, self._read_buffer_size)
913 return num_bytes
914 elif self._read_delimiter is not None:
915 # Multi-byte delimiters (e.g. '\r\n') may straddle two
916 # chunks in the read buffer, so we can't easily find them
917 # without collapsing the buffer. However, since protocols
918 # using delimited reads (as opposed to reads of a known
919 # length) tend to be "line" oriented, the delimiter is likely
920 # to be in the first few chunks. Merge the buffer gradually
921 # since large merges are relatively expensive and get undone in
922 # _consume().
923 if self._read_buffer:
924 loc = self._read_buffer.find(self._read_delimiter)
925 if loc != -1:
926 delimiter_len = len(self._read_delimiter)
927 self._check_max_bytes(self._read_delimiter, loc + delimiter_len)
928 return loc + delimiter_len
929 self._check_max_bytes(self._read_delimiter, self._read_buffer_size)
930 elif self._read_regex is not None:
931 if self._read_buffer:
932 m = self._read_regex.search(self._read_buffer)
933 if m is not None:
934 loc = m.end()
935 self._check_max_bytes(self._read_regex, loc)
936 return loc
937 self._check_max_bytes(self._read_regex, self._read_buffer_size)
938 return None
940 def _check_max_bytes(self, delimiter: Union[bytes, Pattern], size: int) -> None:
941 if self._read_max_bytes is not None and size > self._read_max_bytes:
942 raise UnsatisfiableReadError(
943 "delimiter %r not found within %d bytes"
944 % (delimiter, self._read_max_bytes)
945 )
947 def _handle_write(self) -> None:
948 while True:
949 size = len(self._write_buffer)
950 if not size:
951 break
952 assert size > 0
953 try:
954 if _WINDOWS:
955 # On windows, socket.send blows up if given a
956 # write buffer that's too large, instead of just
957 # returning the number of bytes it was able to
958 # process. Therefore we must not call socket.send
959 # with more than 128KB at a time.
960 size = 128 * 1024
962 num_bytes = self.write_to_fd(self._write_buffer.peek(size))
963 if num_bytes == 0:
964 break
965 self._write_buffer.advance(num_bytes)
966 self._total_write_done_index += num_bytes
967 except BlockingIOError:
968 break
969 except (socket.error, IOError, OSError) as e:
970 if not self._is_connreset(e):
971 # Broken pipe errors are usually caused by connection
972 # reset, and its better to not log EPIPE errors to
973 # minimize log spam
974 gen_log.warning("Write error on %s: %s", self.fileno(), e)
975 self.close(exc_info=e)
976 return
978 while self._write_futures:
979 index, future = self._write_futures[0]
980 if index > self._total_write_done_index:
981 break
982 self._write_futures.popleft()
983 future_set_result_unless_cancelled(future, None)
985 def _consume(self, loc: int) -> bytes:
986 # Consume loc bytes from the read buffer and return them
987 if loc == 0:
988 return b""
989 assert loc <= self._read_buffer_size
990 # Slice the bytearray buffer into bytes, without intermediate copying
991 b = (memoryview(self._read_buffer)[:loc]).tobytes()
992 self._read_buffer_size -= loc
993 del self._read_buffer[:loc]
994 return b
996 def _check_closed(self) -> None:
997 if self.closed():
998 raise StreamClosedError(real_error=self.error)
1000 def _maybe_add_error_listener(self) -> None:
1001 # This method is part of an optimization: to detect a connection that
1002 # is closed when we're not actively reading or writing, we must listen
1003 # for read events. However, it is inefficient to do this when the
1004 # connection is first established because we are going to read or write
1005 # immediately anyway. Instead, we insert checks at various times to
1006 # see if the connection is idle and add the read listener then.
1007 if self._state is None or self._state == ioloop.IOLoop.ERROR:
1008 if (
1009 not self.closed()
1010 and self._read_buffer_size == 0
1011 and self._close_callback is not None
1012 ):
1013 self._add_io_state(ioloop.IOLoop.READ)
1015 def _add_io_state(self, state: int) -> None:
1016 """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
1018 Implementation notes: Reads and writes have a fast path and a
1019 slow path. The fast path reads synchronously from socket
1020 buffers, while the slow path uses `_add_io_state` to schedule
1021 an IOLoop callback.
1023 To detect closed connections, we must have called
1024 `_add_io_state` at some point, but we want to delay this as
1025 much as possible so we don't have to set an `IOLoop.ERROR`
1026 listener that will be overwritten by the next slow-path
1027 operation. If a sequence of fast-path ops do not end in a
1028 slow-path op, (e.g. for an @asynchronous long-poll request),
1029 we must add the error handler.
1031 TODO: reevaluate this now that callbacks are gone.
1033 """
1034 if self.closed():
1035 # connection has been closed, so there can be no future events
1036 return
1037 if self._state is None:
1038 self._state = ioloop.IOLoop.ERROR | state
1039 self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
1040 elif not self._state & state:
1041 self._state = self._state | state
1042 self.io_loop.update_handler(self.fileno(), self._state)
1044 def _is_connreset(self, exc: BaseException) -> bool:
1045 """Return ``True`` if exc is ECONNRESET or equivalent.
1047 May be overridden in subclasses.
1048 """
1049 return (
1050 isinstance(exc, (socket.error, IOError))
1051 and errno_from_exception(exc) in _ERRNO_CONNRESET
1052 )
1055class IOStream(BaseIOStream):
1056 r"""Socket-based `IOStream` implementation.
1058 This class supports the read and write methods from `BaseIOStream`
1059 plus a `connect` method.
1061 The ``socket`` parameter may either be connected or unconnected.
1062 For server operations the socket is the result of calling
1063 `socket.accept <socket.socket.accept>`. For client operations the
1064 socket is created with `socket.socket`, and may either be
1065 connected before passing it to the `IOStream` or connected with
1066 `IOStream.connect`.
1068 A very simple (and broken) HTTP client using this class:
1070 .. testcode::
1072 import socket
1073 import tornado
1075 async def main():
1076 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1077 stream = tornado.iostream.IOStream(s)
1078 await stream.connect(("friendfeed.com", 80))
1079 await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
1080 header_data = await stream.read_until(b"\r\n\r\n")
1081 headers = {}
1082 for line in header_data.split(b"\r\n"):
1083 parts = line.split(b":")
1084 if len(parts) == 2:
1085 headers[parts[0].strip()] = parts[1].strip()
1086 body_data = await stream.read_bytes(int(headers[b"Content-Length"]))
1087 print(body_data)
1088 stream.close()
1090 if __name__ == '__main__':
1091 asyncio.run(main())
1093 .. testoutput::
1094 :hide:
1096 """
1098 def __init__(self, socket: socket.socket, *args: Any, **kwargs: Any) -> None:
1099 self.socket = socket
1100 self.socket.setblocking(False)
1101 super().__init__(*args, **kwargs)
1103 def fileno(self) -> Union[int, ioloop._Selectable]:
1104 return self.socket
1106 def close_fd(self) -> None:
1107 self.socket.close()
1108 self.socket = None # type: ignore
1110 def get_fd_error(self) -> Optional[Exception]:
1111 errno = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1112 return socket.error(errno, os.strerror(errno))
1114 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1115 try:
1116 return self.socket.recv_into(buf, len(buf))
1117 except BlockingIOError:
1118 return None
1119 finally:
1120 del buf
1122 def write_to_fd(self, data: memoryview) -> int:
1123 try:
1124 return self.socket.send(data) # type: ignore
1125 finally:
1126 # Avoid keeping to data, which can be a memoryview.
1127 # See https://github.com/tornadoweb/tornado/pull/2008
1128 del data
1130 def connect(
1131 self: _IOStreamType, address: Any, server_hostname: Optional[str] = None
1132 ) -> "Future[_IOStreamType]":
1133 """Connects the socket to a remote address without blocking.
1135 May only be called if the socket passed to the constructor was
1136 not previously connected. The address parameter is in the
1137 same format as for `socket.connect <socket.socket.connect>` for
1138 the type of socket passed to the IOStream constructor,
1139 e.g. an ``(ip, port)`` tuple. Hostnames are accepted here,
1140 but will be resolved synchronously and block the IOLoop.
1141 If you have a hostname instead of an IP address, the `.TCPClient`
1142 class is recommended instead of calling this method directly.
1143 `.TCPClient` will do asynchronous DNS resolution and handle
1144 both IPv4 and IPv6.
1146 If ``callback`` is specified, it will be called with no
1147 arguments when the connection is completed; if not this method
1148 returns a `.Future` (whose result after a successful
1149 connection will be the stream itself).
1151 In SSL mode, the ``server_hostname`` parameter will be used
1152 for certificate validation (unless disabled in the
1153 ``ssl_options``) and SNI (if supported; requires Python
1154 2.7.9+).
1156 Note that it is safe to call `IOStream.write
1157 <BaseIOStream.write>` while the connection is pending, in
1158 which case the data will be written as soon as the connection
1159 is ready. Calling `IOStream` read methods before the socket is
1160 connected works on some platforms but is non-portable.
1162 .. versionchanged:: 4.0
1163 If no callback is given, returns a `.Future`.
1165 .. versionchanged:: 4.2
1166 SSL certificates are validated by default; pass
1167 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
1168 suitably-configured `ssl.SSLContext` to the
1169 `SSLIOStream` constructor to disable.
1171 .. versionchanged:: 6.0
1173 The ``callback`` argument was removed. Use the returned
1174 `.Future` instead.
1176 """
1177 self._connecting = True
1178 future = Future() # type: Future[_IOStreamType]
1179 self._connect_future = typing.cast("Future[IOStream]", future)
1180 try:
1181 self.socket.connect(address)
1182 except BlockingIOError:
1183 # In non-blocking mode we expect connect() to raise an
1184 # exception with EINPROGRESS or EWOULDBLOCK.
1185 pass
1186 except socket.error as e:
1187 # On freebsd, other errors such as ECONNREFUSED may be
1188 # returned immediately when attempting to connect to
1189 # localhost, so handle them the same way as an error
1190 # reported later in _handle_connect.
1191 if future is None:
1192 gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), e)
1193 self.close(exc_info=e)
1194 return future
1195 self._add_io_state(self.io_loop.WRITE)
1196 return future
1198 def start_tls(
1199 self,
1200 server_side: bool,
1201 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
1202 server_hostname: Optional[str] = None,
1203 ) -> Awaitable["SSLIOStream"]:
1204 """Convert this `IOStream` to an `SSLIOStream`.
1206 This enables protocols that begin in clear-text mode and
1207 switch to SSL after some initial negotiation (such as the
1208 ``STARTTLS`` extension to SMTP and IMAP).
1210 This method cannot be used if there are outstanding reads
1211 or writes on the stream, or if there is any data in the
1212 IOStream's buffer (data in the operating system's socket
1213 buffer is allowed). This means it must generally be used
1214 immediately after reading or writing the last clear-text
1215 data. It can also be used immediately after connecting,
1216 before any reads or writes.
1218 The ``ssl_options`` argument may be either an `ssl.SSLContext`
1219 object or a dictionary of keyword arguments for the
1220 `ssl.SSLContext.wrap_socket` function. The ``server_hostname`` argument
1221 will be used for certificate validation unless disabled
1222 in the ``ssl_options``.
1224 This method returns a `.Future` whose result is the new
1225 `SSLIOStream`. After this method has been called,
1226 any other operation on the original stream is undefined.
1228 If a close callback is defined on this stream, it will be
1229 transferred to the new stream.
1231 .. versionadded:: 4.0
1233 .. versionchanged:: 4.2
1234 SSL certificates are validated by default; pass
1235 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
1236 suitably-configured `ssl.SSLContext` to disable.
1237 """
1238 if (
1239 self._read_future
1240 or self._write_futures
1241 or self._connect_future
1242 or self._closed
1243 or self._read_buffer
1244 or self._write_buffer
1245 ):
1246 raise ValueError("IOStream is not idle; cannot convert to SSL")
1247 if ssl_options is None:
1248 if server_side:
1249 ssl_options = _server_ssl_defaults
1250 else:
1251 ssl_options = _client_ssl_defaults
1253 socket = self.socket
1254 self.io_loop.remove_handler(socket)
1255 self.socket = None # type: ignore
1256 socket = ssl_wrap_socket(
1257 socket,
1258 ssl_options,
1259 server_hostname=server_hostname,
1260 server_side=server_side,
1261 do_handshake_on_connect=False,
1262 )
1263 orig_close_callback = self._close_callback
1264 self._close_callback = None
1266 future = Future() # type: Future[SSLIOStream]
1267 ssl_stream = SSLIOStream(socket, ssl_options=ssl_options)
1268 ssl_stream.set_close_callback(orig_close_callback)
1269 ssl_stream._ssl_connect_future = future
1270 ssl_stream.max_buffer_size = self.max_buffer_size
1271 ssl_stream.read_chunk_size = self.read_chunk_size
1272 return future
1274 def _handle_connect(self) -> None:
1275 try:
1276 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1277 except socket.error as e:
1278 # Hurd doesn't allow SO_ERROR for loopback sockets because all
1279 # errors for such sockets are reported synchronously.
1280 if errno_from_exception(e) == errno.ENOPROTOOPT:
1281 err = 0
1282 if err != 0:
1283 self.error = socket.error(err, os.strerror(err))
1284 # IOLoop implementations may vary: some of them return
1285 # an error state before the socket becomes writable, so
1286 # in that case a connection failure would be handled by the
1287 # error path in _handle_events instead of here.
1288 if self._connect_future is None:
1289 gen_log.warning(
1290 "Connect error on fd %s: %s",
1291 self.socket.fileno(),
1292 errno.errorcode[err],
1293 )
1294 self.close()
1295 return
1296 if self._connect_future is not None:
1297 future = self._connect_future
1298 self._connect_future = None
1299 future_set_result_unless_cancelled(future, self)
1300 self._connecting = False
1302 def set_nodelay(self, value: bool) -> None:
1303 if self.socket is not None and self.socket.family in (
1304 socket.AF_INET,
1305 socket.AF_INET6,
1306 ):
1307 try:
1308 self.socket.setsockopt(
1309 socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 if value else 0
1310 )
1311 except socket.error as e:
1312 # Sometimes setsockopt will fail if the socket is closed
1313 # at the wrong time. This can happen with HTTPServer
1314 # resetting the value to ``False`` between requests.
1315 if e.errno != errno.EINVAL and not self._is_connreset(e):
1316 raise
1319class SSLIOStream(IOStream):
1320 """A utility class to write to and read from a non-blocking SSL socket.
1322 If the socket passed to the constructor is already connected,
1323 it should be wrapped with::
1325 ssl.SSLContext(...).wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
1327 before constructing the `SSLIOStream`. Unconnected sockets will be
1328 wrapped when `IOStream.connect` is finished.
1329 """
1331 socket = None # type: ssl.SSLSocket
1333 def __init__(self, *args: Any, **kwargs: Any) -> None:
1334 """The ``ssl_options`` keyword argument may either be an
1335 `ssl.SSLContext` object or a dictionary of keywords arguments
1336 for `ssl.SSLContext.wrap_socket`
1337 """
1338 self._ssl_options = kwargs.pop("ssl_options", _client_ssl_defaults)
1339 super().__init__(*args, **kwargs)
1340 self._ssl_accepting = True
1341 self._handshake_reading = False
1342 self._handshake_writing = False
1343 self._server_hostname = None # type: Optional[str]
1345 # If the socket is already connected, attempt to start the handshake.
1346 try:
1347 self.socket.getpeername()
1348 except socket.error:
1349 pass
1350 else:
1351 # Indirectly start the handshake, which will run on the next
1352 # IOLoop iteration and then the real IO state will be set in
1353 # _handle_events.
1354 self._add_io_state(self.io_loop.WRITE)
1356 def reading(self) -> bool:
1357 return self._handshake_reading or super().reading()
1359 def writing(self) -> bool:
1360 return self._handshake_writing or super().writing()
1362 def _do_ssl_handshake(self) -> None:
1363 # Based on code from test_ssl.py in the python stdlib
1364 try:
1365 self._handshake_reading = False
1366 self._handshake_writing = False
1367 self.socket.do_handshake()
1368 except ssl.SSLError as err:
1369 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
1370 self._handshake_reading = True
1371 return
1372 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
1373 self._handshake_writing = True
1374 return
1375 elif err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
1376 return self.close(exc_info=err)
1377 elif err.args[0] in (ssl.SSL_ERROR_SSL, ssl.SSL_ERROR_SYSCALL):
1378 try:
1379 peer = self.socket.getpeername()
1380 except Exception:
1381 peer = "(not connected)"
1382 gen_log.warning(
1383 "SSL Error on %s %s: %s", self.socket.fileno(), peer, err
1384 )
1385 return self.close(exc_info=err)
1386 raise
1387 except ssl.CertificateError as err:
1388 # CertificateError can happen during handshake (hostname
1389 # verification) and should be passed to user. Starting
1390 # in Python 3.7, this error is a subclass of SSLError
1391 # and will be handled by the previous block instead.
1392 return self.close(exc_info=err)
1393 except socket.error as err:
1394 # Some port scans (e.g. nmap in -sT mode) have been known
1395 # to cause do_handshake to raise EBADF and ENOTCONN, so make
1396 # those errors quiet as well.
1397 # https://groups.google.com/forum/?fromgroups#!topic/python-tornado/ApucKJat1_0
1398 # Errno 0 is also possible in some cases (nc -z).
1399 # https://github.com/tornadoweb/tornado/issues/2504
1400 if self._is_connreset(err) or err.args[0] in (
1401 0,
1402 errno.EBADF,
1403 errno.ENOTCONN,
1404 ):
1405 return self.close(exc_info=err)
1406 raise
1407 except AttributeError as err:
1408 # On Linux, if the connection was reset before the call to
1409 # wrap_socket, do_handshake will fail with an
1410 # AttributeError.
1411 return self.close(exc_info=err)
1412 else:
1413 self._ssl_accepting = False
1414 # Prior to the introduction of SNI, this is where we would check
1415 # the server's claimed hostname.
1416 assert ssl.HAS_SNI
1417 self._finish_ssl_connect()
1419 def _finish_ssl_connect(self) -> None:
1420 if self._ssl_connect_future is not None:
1421 future = self._ssl_connect_future
1422 self._ssl_connect_future = None
1423 future_set_result_unless_cancelled(future, self)
1425 def _handle_read(self) -> None:
1426 if self._ssl_accepting:
1427 self._do_ssl_handshake()
1428 return
1429 super()._handle_read()
1431 def _handle_write(self) -> None:
1432 if self._ssl_accepting:
1433 self._do_ssl_handshake()
1434 return
1435 super()._handle_write()
1437 def connect(
1438 self, address: Tuple, server_hostname: Optional[str] = None
1439 ) -> "Future[SSLIOStream]":
1440 self._server_hostname = server_hostname
1441 # Ignore the result of connect(). If it fails,
1442 # wait_for_handshake will raise an error too. This is
1443 # necessary for the old semantics of the connect callback
1444 # (which takes no arguments). In 6.0 this can be refactored to
1445 # be a regular coroutine.
1446 # TODO: This is trickier than it looks, since if write()
1447 # is called with a connect() pending, we want the connect
1448 # to resolve before the write. Or do we care about this?
1449 # (There's a test for it, but I think in practice users
1450 # either wait for the connect before performing a write or
1451 # they don't care about the connect Future at all)
1452 fut = super().connect(address)
1453 fut.add_done_callback(lambda f: f.exception())
1454 return self.wait_for_handshake()
1456 def _handle_connect(self) -> None:
1457 # Call the superclass method to check for errors.
1458 super()._handle_connect()
1459 if self.closed():
1460 return
1461 # When the connection is complete, wrap the socket for SSL
1462 # traffic. Note that we do this by overriding _handle_connect
1463 # instead of by passing a callback to super().connect because
1464 # user callbacks are enqueued asynchronously on the IOLoop,
1465 # but since _handle_events calls _handle_connect immediately
1466 # followed by _handle_write we need this to be synchronous.
1467 #
1468 # The IOLoop will get confused if we swap out self.socket while the
1469 # fd is registered, so remove it now and re-register after
1470 # wrap_socket().
1471 self.io_loop.remove_handler(self.socket)
1472 old_state = self._state
1473 assert old_state is not None
1474 self._state = None
1475 self.socket = ssl_wrap_socket(
1476 self.socket,
1477 self._ssl_options,
1478 server_hostname=self._server_hostname,
1479 do_handshake_on_connect=False,
1480 server_side=False,
1481 )
1482 self._add_io_state(old_state)
1484 def wait_for_handshake(self) -> "Future[SSLIOStream]":
1485 """Wait for the initial SSL handshake to complete.
1487 If a ``callback`` is given, it will be called with no
1488 arguments once the handshake is complete; otherwise this
1489 method returns a `.Future` which will resolve to the
1490 stream itself after the handshake is complete.
1492 Once the handshake is complete, information such as
1493 the peer's certificate and NPN/ALPN selections may be
1494 accessed on ``self.socket``.
1496 This method is intended for use on server-side streams
1497 or after using `IOStream.start_tls`; it should not be used
1498 with `IOStream.connect` (which already waits for the
1499 handshake to complete). It may only be called once per stream.
1501 .. versionadded:: 4.2
1503 .. versionchanged:: 6.0
1505 The ``callback`` argument was removed. Use the returned
1506 `.Future` instead.
1508 """
1509 if self._ssl_connect_future is not None:
1510 raise RuntimeError("Already waiting")
1511 future = self._ssl_connect_future = Future()
1512 if not self._ssl_accepting:
1513 self._finish_ssl_connect()
1514 return future
1516 def write_to_fd(self, data: memoryview) -> int:
1517 # clip buffer size at 1GB since SSL sockets only support upto 2GB
1518 # this change in behaviour is transparent, since the function is
1519 # already expected to (possibly) write less than the provided buffer
1520 if len(data) >> 30:
1521 data = memoryview(data)[: 1 << 30]
1522 try:
1523 return self.socket.send(data) # type: ignore
1524 except ssl.SSLError as e:
1525 if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
1526 # In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
1527 # the socket is not writeable; we need to transform this into
1528 # an EWOULDBLOCK socket.error or a zero return value,
1529 # either of which will be recognized by the caller of this
1530 # method. Prior to Python 3.5, an unwriteable socket would
1531 # simply return 0 bytes written.
1532 return 0
1533 raise
1534 finally:
1535 # Avoid keeping to data, which can be a memoryview.
1536 # See https://github.com/tornadoweb/tornado/pull/2008
1537 del data
1539 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1540 try:
1541 if self._ssl_accepting:
1542 # If the handshake hasn't finished yet, there can't be anything
1543 # to read (attempting to read may or may not raise an exception
1544 # depending on the SSL version)
1545 return None
1546 # clip buffer size at 1GB since SSL sockets only support upto 2GB
1547 # this change in behaviour is transparent, since the function is
1548 # already expected to (possibly) read less than the provided buffer
1549 if len(buf) >> 30:
1550 buf = memoryview(buf)[: 1 << 30]
1551 try:
1552 return self.socket.recv_into(buf, len(buf))
1553 except ssl.SSLError as e:
1554 # SSLError is a subclass of socket.error, so this except
1555 # block must come first.
1556 if e.args[0] == ssl.SSL_ERROR_WANT_READ:
1557 return None
1558 else:
1559 raise
1560 except BlockingIOError:
1561 return None
1562 finally:
1563 del buf
1565 def _is_connreset(self, e: BaseException) -> bool:
1566 if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF:
1567 return True
1568 return super()._is_connreset(e)
1571class PipeIOStream(BaseIOStream):
1572 """Pipe-based `IOStream` implementation.
1574 The constructor takes an integer file descriptor (such as one returned
1575 by `os.pipe`) rather than an open file object. Pipes are generally
1576 one-way, so a `PipeIOStream` can be used for reading or writing but not
1577 both.
1579 ``PipeIOStream`` is only available on Unix-based platforms.
1580 """
1582 def __init__(self, fd: int, *args: Any, **kwargs: Any) -> None:
1583 self.fd = fd
1584 self._fio = io.FileIO(self.fd, "r+")
1585 if sys.platform == "win32":
1586 # The form and placement of this assertion is important to mypy.
1587 # A plain assert statement isn't recognized here. If the assertion
1588 # were earlier it would worry that the attributes of self aren't
1589 # set on windows. If it were missing it would complain about
1590 # the absence of the set_blocking function.
1591 raise AssertionError("PipeIOStream is not supported on Windows")
1592 os.set_blocking(fd, False)
1593 super().__init__(*args, **kwargs)
1595 def fileno(self) -> int:
1596 return self.fd
1598 def close_fd(self) -> None:
1599 self._fio.close()
1601 def write_to_fd(self, data: memoryview) -> int:
1602 try:
1603 return os.write(self.fd, data) # type: ignore
1604 finally:
1605 # Avoid keeping to data, which can be a memoryview.
1606 # See https://github.com/tornadoweb/tornado/pull/2008
1607 del data
1609 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1610 try:
1611 return self._fio.readinto(buf) # type: ignore
1612 except (IOError, OSError) as e:
1613 if errno_from_exception(e) == errno.EBADF:
1614 # If the writing half of a pipe is closed, select will
1615 # report it as readable but reads will fail with EBADF.
1616 self.close(exc_info=e)
1617 return None
1618 else:
1619 raise
1620 finally:
1621 del buf
1624def doctests() -> Any:
1625 import doctest
1627 return doctest.DocTestSuite()