Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/iostream.py: 14%
752 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
1#
2# Copyright 2009 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""Utility classes to write to and read from non-blocking files and sockets.
18Contents:
20* `BaseIOStream`: Generic interface for reading and writing.
21* `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
22* `SSLIOStream`: SSL-aware version of IOStream.
23* `PipeIOStream`: Pipe-based IOStream implementation.
24"""
26import asyncio
27import collections
28import errno
29import io
30import numbers
31import os
32import socket
33import ssl
34import sys
35import re
37from tornado.concurrent import Future, future_set_result_unless_cancelled
38from tornado import ioloop
39from tornado.log import gen_log
40from tornado.netutil import ssl_wrap_socket, _client_ssl_defaults, _server_ssl_defaults
41from tornado.util import errno_from_exception
43import typing
44from typing import (
45 Union,
46 Optional,
47 Awaitable,
48 Callable,
49 Pattern,
50 Any,
51 Dict,
52 TypeVar,
53 Tuple,
54)
55from types import TracebackType
57if typing.TYPE_CHECKING:
58 from typing import Deque, List, Type # noqa: F401
60_IOStreamType = TypeVar("_IOStreamType", bound="IOStream")
62# These errnos indicate that a connection has been abruptly terminated.
63# They should be caught and handled less noisily than other errors.
64_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE, errno.ETIMEDOUT)
66if hasattr(errno, "WSAECONNRESET"):
67 _ERRNO_CONNRESET += ( # type: ignore
68 errno.WSAECONNRESET, # type: ignore
69 errno.WSAECONNABORTED, # type: ignore
70 errno.WSAETIMEDOUT, # type: ignore
71 )
73if sys.platform == "darwin":
74 # OSX appears to have a race condition that causes send(2) to return
75 # EPROTOTYPE if called while a socket is being torn down:
76 # http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/
77 # Since the socket is being closed anyway, treat this as an ECONNRESET
78 # instead of an unexpected error.
79 _ERRNO_CONNRESET += (errno.EPROTOTYPE,) # type: ignore
81_WINDOWS = sys.platform.startswith("win")
84class StreamClosedError(IOError):
85 """Exception raised by `IOStream` methods when the stream is closed.
87 Note that the close callback is scheduled to run *after* other
88 callbacks on the stream (to allow for buffered data to be processed),
89 so you may see this error before you see the close callback.
91 The ``real_error`` attribute contains the underlying error that caused
92 the stream to close (if any).
94 .. versionchanged:: 4.3
95 Added the ``real_error`` attribute.
96 """
98 def __init__(self, real_error: Optional[BaseException] = None) -> None:
99 super().__init__("Stream is closed")
100 self.real_error = real_error
103class UnsatisfiableReadError(Exception):
104 """Exception raised when a read cannot be satisfied.
106 Raised by ``read_until`` and ``read_until_regex`` with a ``max_bytes``
107 argument.
108 """
110 pass
113class StreamBufferFullError(Exception):
114 """Exception raised by `IOStream` methods when the buffer is full."""
117class _StreamBuffer(object):
118 """
119 A specialized buffer that tries to avoid copies when large pieces
120 of data are encountered.
121 """
123 def __init__(self) -> None:
124 # A sequence of (False, bytearray) and (True, memoryview) objects
125 self._buffers = (
126 collections.deque()
127 ) # type: Deque[Tuple[bool, Union[bytearray, memoryview]]]
128 # Position in the first buffer
129 self._first_pos = 0
130 self._size = 0
132 def __len__(self) -> int:
133 return self._size
135 # Data above this size will be appended separately instead
136 # of extending an existing bytearray
137 _large_buf_threshold = 2048
139 def append(self, data: Union[bytes, bytearray, memoryview]) -> None:
140 """
141 Append the given piece of data (should be a buffer-compatible object).
142 """
143 size = len(data)
144 if size > self._large_buf_threshold:
145 if not isinstance(data, memoryview):
146 data = memoryview(data)
147 self._buffers.append((True, data))
148 elif size > 0:
149 if self._buffers:
150 is_memview, b = self._buffers[-1]
151 new_buf = is_memview or len(b) >= self._large_buf_threshold
152 else:
153 new_buf = True
154 if new_buf:
155 self._buffers.append((False, bytearray(data)))
156 else:
157 b += data # type: ignore
159 self._size += size
161 def peek(self, size: int) -> memoryview:
162 """
163 Get a view over at most ``size`` bytes (possibly fewer) at the
164 current buffer position.
165 """
166 assert size > 0
167 try:
168 is_memview, b = self._buffers[0]
169 except IndexError:
170 return memoryview(b"")
172 pos = self._first_pos
173 if is_memview:
174 return typing.cast(memoryview, b[pos : pos + size])
175 else:
176 return memoryview(b)[pos : pos + size]
178 def advance(self, size: int) -> None:
179 """
180 Advance the current buffer position by ``size`` bytes.
181 """
182 assert 0 < size <= self._size
183 self._size -= size
184 pos = self._first_pos
186 buffers = self._buffers
187 while buffers and size > 0:
188 is_large, b = buffers[0]
189 b_remain = len(b) - size - pos
190 if b_remain <= 0:
191 buffers.popleft()
192 size -= len(b) - pos
193 pos = 0
194 elif is_large:
195 pos += size
196 size = 0
197 else:
198 # Amortized O(1) shrink for Python 2
199 pos += size
200 if len(b) <= 2 * pos:
201 del typing.cast(bytearray, b)[:pos]
202 pos = 0
203 size = 0
205 assert size == 0
206 self._first_pos = pos
209class BaseIOStream(object):
210 """A utility class to write to and read from a non-blocking file or socket.
212 We support a non-blocking ``write()`` and a family of ``read_*()``
213 methods. When the operation completes, the ``Awaitable`` will resolve
214 with the data read (or ``None`` for ``write()``). All outstanding
215 ``Awaitables`` will resolve with a `StreamClosedError` when the
216 stream is closed; `.BaseIOStream.set_close_callback` can also be used
217 to be notified of a closed stream.
219 When a stream is closed due to an error, the IOStream's ``error``
220 attribute contains the exception object.
222 Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
223 `read_from_fd`, and optionally `get_fd_error`.
225 """
227 def __init__(
228 self,
229 max_buffer_size: Optional[int] = None,
230 read_chunk_size: Optional[int] = None,
231 max_write_buffer_size: Optional[int] = None,
232 ) -> None:
233 """`BaseIOStream` constructor.
235 :arg max_buffer_size: Maximum amount of incoming data to buffer;
236 defaults to 100MB.
237 :arg read_chunk_size: Amount of data to read at one time from the
238 underlying transport; defaults to 64KB.
239 :arg max_write_buffer_size: Amount of outgoing data to buffer;
240 defaults to unlimited.
242 .. versionchanged:: 4.0
243 Add the ``max_write_buffer_size`` parameter. Changed default
244 ``read_chunk_size`` to 64KB.
245 .. versionchanged:: 5.0
246 The ``io_loop`` argument (deprecated since version 4.1) has been
247 removed.
248 """
249 self.io_loop = ioloop.IOLoop.current()
250 self.max_buffer_size = max_buffer_size or 104857600
251 # A chunk size that is too close to max_buffer_size can cause
252 # spurious failures.
253 self.read_chunk_size = min(read_chunk_size or 65536, self.max_buffer_size // 2)
254 self.max_write_buffer_size = max_write_buffer_size
255 self.error = None # type: Optional[BaseException]
256 self._read_buffer = bytearray()
257 self._read_buffer_pos = 0
258 self._read_buffer_size = 0
259 self._user_read_buffer = False
260 self._after_user_read_buffer = None # type: Optional[bytearray]
261 self._write_buffer = _StreamBuffer()
262 self._total_write_index = 0
263 self._total_write_done_index = 0
264 self._read_delimiter = None # type: Optional[bytes]
265 self._read_regex = None # type: Optional[Pattern]
266 self._read_max_bytes = None # type: Optional[int]
267 self._read_bytes = None # type: Optional[int]
268 self._read_partial = False
269 self._read_until_close = False
270 self._read_future = None # type: Optional[Future]
271 self._write_futures = (
272 collections.deque()
273 ) # type: Deque[Tuple[int, Future[None]]]
274 self._close_callback = None # type: Optional[Callable[[], None]]
275 self._connect_future = None # type: Optional[Future[IOStream]]
276 # _ssl_connect_future should be defined in SSLIOStream
277 # but it's here so we can clean it up in _signal_closed
278 # TODO: refactor that so subclasses can add additional futures
279 # to be cancelled.
280 self._ssl_connect_future = None # type: Optional[Future[SSLIOStream]]
281 self._connecting = False
282 self._state = None # type: Optional[int]
283 self._closed = False
285 def fileno(self) -> Union[int, ioloop._Selectable]:
286 """Returns the file descriptor for this stream."""
287 raise NotImplementedError()
289 def close_fd(self) -> None:
290 """Closes the file underlying this stream.
292 ``close_fd`` is called by `BaseIOStream` and should not be called
293 elsewhere; other users should call `close` instead.
294 """
295 raise NotImplementedError()
297 def write_to_fd(self, data: memoryview) -> int:
298 """Attempts to write ``data`` to the underlying file.
300 Returns the number of bytes written.
301 """
302 raise NotImplementedError()
304 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
305 """Attempts to read from the underlying file.
307 Reads up to ``len(buf)`` bytes, storing them in the buffer.
308 Returns the number of bytes read. Returns None if there was
309 nothing to read (the socket returned `~errno.EWOULDBLOCK` or
310 equivalent), and zero on EOF.
312 .. versionchanged:: 5.0
314 Interface redesigned to take a buffer and return a number
315 of bytes instead of a freshly-allocated object.
316 """
317 raise NotImplementedError()
319 def get_fd_error(self) -> Optional[Exception]:
320 """Returns information about any error on the underlying file.
322 This method is called after the `.IOLoop` has signaled an error on the
323 file descriptor, and should return an Exception (such as `socket.error`
324 with additional information, or None if no such information is
325 available.
326 """
327 return None
329 def read_until_regex(
330 self, regex: bytes, max_bytes: Optional[int] = None
331 ) -> Awaitable[bytes]:
332 """Asynchronously read until we have matched the given regex.
334 The result includes the data that matches the regex and anything
335 that came before it.
337 If ``max_bytes`` is not None, the connection will be closed
338 if more than ``max_bytes`` bytes have been read and the regex is
339 not satisfied.
341 .. versionchanged:: 4.0
342 Added the ``max_bytes`` argument. The ``callback`` argument is
343 now optional and a `.Future` will be returned if it is omitted.
345 .. versionchanged:: 6.0
347 The ``callback`` argument was removed. Use the returned
348 `.Future` instead.
350 """
351 future = self._start_read()
352 self._read_regex = re.compile(regex)
353 self._read_max_bytes = max_bytes
354 try:
355 self._try_inline_read()
356 except UnsatisfiableReadError as e:
357 # Handle this the same way as in _handle_events.
358 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
359 self.close(exc_info=e)
360 return future
361 except:
362 # Ensure that the future doesn't log an error because its
363 # failure was never examined.
364 future.add_done_callback(lambda f: f.exception())
365 raise
366 return future
368 def read_until(
369 self, delimiter: bytes, max_bytes: Optional[int] = None
370 ) -> Awaitable[bytes]:
371 """Asynchronously read until we have found the given delimiter.
373 The result includes all the data read including the delimiter.
375 If ``max_bytes`` is not None, the connection will be closed
376 if more than ``max_bytes`` bytes have been read and the delimiter
377 is not found.
379 .. versionchanged:: 4.0
380 Added the ``max_bytes`` argument. The ``callback`` argument is
381 now optional and a `.Future` will be returned if it is omitted.
383 .. versionchanged:: 6.0
385 The ``callback`` argument was removed. Use the returned
386 `.Future` instead.
387 """
388 future = self._start_read()
389 self._read_delimiter = delimiter
390 self._read_max_bytes = max_bytes
391 try:
392 self._try_inline_read()
393 except UnsatisfiableReadError as e:
394 # Handle this the same way as in _handle_events.
395 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
396 self.close(exc_info=e)
397 return future
398 except:
399 future.add_done_callback(lambda f: f.exception())
400 raise
401 return future
403 def read_bytes(self, num_bytes: int, partial: bool = False) -> Awaitable[bytes]:
404 """Asynchronously read a number of bytes.
406 If ``partial`` is true, data is returned as soon as we have
407 any bytes to return (but never more than ``num_bytes``)
409 .. versionchanged:: 4.0
410 Added the ``partial`` argument. The callback argument is now
411 optional and a `.Future` will be returned if it is omitted.
413 .. versionchanged:: 6.0
415 The ``callback`` and ``streaming_callback`` arguments have
416 been removed. Use the returned `.Future` (and
417 ``partial=True`` for ``streaming_callback``) instead.
419 """
420 future = self._start_read()
421 assert isinstance(num_bytes, numbers.Integral)
422 self._read_bytes = num_bytes
423 self._read_partial = partial
424 try:
425 self._try_inline_read()
426 except:
427 future.add_done_callback(lambda f: f.exception())
428 raise
429 return future
431 def read_into(self, buf: bytearray, partial: bool = False) -> Awaitable[int]:
432 """Asynchronously read a number of bytes.
434 ``buf`` must be a writable buffer into which data will be read.
436 If ``partial`` is true, the callback is run as soon as any bytes
437 have been read. Otherwise, it is run when the ``buf`` has been
438 entirely filled with read data.
440 .. versionadded:: 5.0
442 .. versionchanged:: 6.0
444 The ``callback`` argument was removed. Use the returned
445 `.Future` instead.
447 """
448 future = self._start_read()
450 # First copy data already in read buffer
451 available_bytes = self._read_buffer_size
452 n = len(buf)
453 if available_bytes >= n:
454 end = self._read_buffer_pos + n
455 buf[:] = memoryview(self._read_buffer)[self._read_buffer_pos : end]
456 del self._read_buffer[:end]
457 self._after_user_read_buffer = self._read_buffer
458 elif available_bytes > 0:
459 buf[:available_bytes] = memoryview(self._read_buffer)[
460 self._read_buffer_pos :
461 ]
463 # Set up the supplied buffer as our temporary read buffer.
464 # The original (if it had any data remaining) has been
465 # saved for later.
466 self._user_read_buffer = True
467 self._read_buffer = buf
468 self._read_buffer_pos = 0
469 self._read_buffer_size = available_bytes
470 self._read_bytes = n
471 self._read_partial = partial
473 try:
474 self._try_inline_read()
475 except:
476 future.add_done_callback(lambda f: f.exception())
477 raise
478 return future
480 def read_until_close(self) -> Awaitable[bytes]:
481 """Asynchronously reads all data from the socket until it is closed.
483 This will buffer all available data until ``max_buffer_size``
484 is reached. If flow control or cancellation are desired, use a
485 loop with `read_bytes(partial=True) <.read_bytes>` instead.
487 .. versionchanged:: 4.0
488 The callback argument is now optional and a `.Future` will
489 be returned if it is omitted.
491 .. versionchanged:: 6.0
493 The ``callback`` and ``streaming_callback`` arguments have
494 been removed. Use the returned `.Future` (and `read_bytes`
495 with ``partial=True`` for ``streaming_callback``) instead.
497 """
498 future = self._start_read()
499 if self.closed():
500 self._finish_read(self._read_buffer_size)
501 return future
502 self._read_until_close = True
503 try:
504 self._try_inline_read()
505 except:
506 future.add_done_callback(lambda f: f.exception())
507 raise
508 return future
510 def write(self, data: Union[bytes, memoryview]) -> "Future[None]":
511 """Asynchronously write the given data to this stream.
513 This method returns a `.Future` that resolves (with a result
514 of ``None``) when the write has been completed.
516 The ``data`` argument may be of type `bytes` or `memoryview`.
518 .. versionchanged:: 4.0
519 Now returns a `.Future` if no callback is given.
521 .. versionchanged:: 4.5
522 Added support for `memoryview` arguments.
524 .. versionchanged:: 6.0
526 The ``callback`` argument was removed. Use the returned
527 `.Future` instead.
529 """
530 self._check_closed()
531 if data:
532 if isinstance(data, memoryview):
533 # Make sure that ``len(data) == data.nbytes``
534 data = memoryview(data).cast("B")
535 if (
536 self.max_write_buffer_size is not None
537 and len(self._write_buffer) + len(data) > self.max_write_buffer_size
538 ):
539 raise StreamBufferFullError("Reached maximum write buffer size")
540 self._write_buffer.append(data)
541 self._total_write_index += len(data)
542 future = Future() # type: Future[None]
543 future.add_done_callback(lambda f: f.exception())
544 self._write_futures.append((self._total_write_index, future))
545 if not self._connecting:
546 self._handle_write()
547 if self._write_buffer:
548 self._add_io_state(self.io_loop.WRITE)
549 self._maybe_add_error_listener()
550 return future
552 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None:
553 """Call the given callback when the stream is closed.
555 This mostly is not necessary for applications that use the
556 `.Future` interface; all outstanding ``Futures`` will resolve
557 with a `StreamClosedError` when the stream is closed. However,
558 it is still useful as a way to signal that the stream has been
559 closed while no other read or write is in progress.
561 Unlike other callback-based interfaces, ``set_close_callback``
562 was not removed in Tornado 6.0.
563 """
564 self._close_callback = callback
565 self._maybe_add_error_listener()
567 def close(
568 self,
569 exc_info: Union[
570 None,
571 bool,
572 BaseException,
573 Tuple[
574 "Optional[Type[BaseException]]",
575 Optional[BaseException],
576 Optional[TracebackType],
577 ],
578 ] = False,
579 ) -> None:
580 """Close this stream.
582 If ``exc_info`` is true, set the ``error`` attribute to the current
583 exception from `sys.exc_info` (or if ``exc_info`` is a tuple,
584 use that instead of `sys.exc_info`).
585 """
586 if not self.closed():
587 if exc_info:
588 if isinstance(exc_info, tuple):
589 self.error = exc_info[1]
590 elif isinstance(exc_info, BaseException):
591 self.error = exc_info
592 else:
593 exc_info = sys.exc_info()
594 if any(exc_info):
595 self.error = exc_info[1]
596 if self._read_until_close:
597 self._read_until_close = False
598 self._finish_read(self._read_buffer_size)
599 elif self._read_future is not None:
600 # resolve reads that are pending and ready to complete
601 try:
602 pos = self._find_read_pos()
603 except UnsatisfiableReadError:
604 pass
605 else:
606 if pos is not None:
607 self._read_from_buffer(pos)
608 if self._state is not None:
609 self.io_loop.remove_handler(self.fileno())
610 self._state = None
611 self.close_fd()
612 self._closed = True
613 self._signal_closed()
615 def _signal_closed(self) -> None:
616 futures = [] # type: List[Future]
617 if self._read_future is not None:
618 futures.append(self._read_future)
619 self._read_future = None
620 futures += [future for _, future in self._write_futures]
621 self._write_futures.clear()
622 if self._connect_future is not None:
623 futures.append(self._connect_future)
624 self._connect_future = None
625 for future in futures:
626 if not future.done():
627 future.set_exception(StreamClosedError(real_error=self.error))
628 # Reference the exception to silence warnings. Annoyingly,
629 # this raises if the future was cancelled, but just
630 # returns any other error.
631 try:
632 future.exception()
633 except asyncio.CancelledError:
634 pass
635 if self._ssl_connect_future is not None:
636 # _ssl_connect_future expects to see the real exception (typically
637 # an ssl.SSLError), not just StreamClosedError.
638 if not self._ssl_connect_future.done():
639 if self.error is not None:
640 self._ssl_connect_future.set_exception(self.error)
641 else:
642 self._ssl_connect_future.set_exception(StreamClosedError())
643 self._ssl_connect_future.exception()
644 self._ssl_connect_future = None
645 if self._close_callback is not None:
646 cb = self._close_callback
647 self._close_callback = None
648 self.io_loop.add_callback(cb)
649 # Clear the buffers so they can be cleared immediately even
650 # if the IOStream object is kept alive by a reference cycle.
651 # TODO: Clear the read buffer too; it currently breaks some tests.
652 self._write_buffer = None # type: ignore
654 def reading(self) -> bool:
655 """Returns ``True`` if we are currently reading from the stream."""
656 return self._read_future is not None
658 def writing(self) -> bool:
659 """Returns ``True`` if we are currently writing to the stream."""
660 return bool(self._write_buffer)
662 def closed(self) -> bool:
663 """Returns ``True`` if the stream has been closed."""
664 return self._closed
666 def set_nodelay(self, value: bool) -> None:
667 """Sets the no-delay flag for this stream.
669 By default, data written to TCP streams may be held for a time
670 to make the most efficient use of bandwidth (according to
671 Nagle's algorithm). The no-delay flag requests that data be
672 written as soon as possible, even if doing so would consume
673 additional bandwidth.
675 This flag is currently defined only for TCP-based ``IOStreams``.
677 .. versionadded:: 3.1
678 """
679 pass
681 def _handle_connect(self) -> None:
682 raise NotImplementedError()
684 def _handle_events(self, fd: Union[int, ioloop._Selectable], events: int) -> None:
685 if self.closed():
686 gen_log.warning("Got events for closed stream %s", fd)
687 return
688 try:
689 if self._connecting:
690 # Most IOLoops will report a write failed connect
691 # with the WRITE event, but SelectIOLoop reports a
692 # READ as well so we must check for connecting before
693 # either.
694 self._handle_connect()
695 if self.closed():
696 return
697 if events & self.io_loop.READ:
698 self._handle_read()
699 if self.closed():
700 return
701 if events & self.io_loop.WRITE:
702 self._handle_write()
703 if self.closed():
704 return
705 if events & self.io_loop.ERROR:
706 self.error = self.get_fd_error()
707 # We may have queued up a user callback in _handle_read or
708 # _handle_write, so don't close the IOStream until those
709 # callbacks have had a chance to run.
710 self.io_loop.add_callback(self.close)
711 return
712 state = self.io_loop.ERROR
713 if self.reading():
714 state |= self.io_loop.READ
715 if self.writing():
716 state |= self.io_loop.WRITE
717 if state == self.io_loop.ERROR and self._read_buffer_size == 0:
718 # If the connection is idle, listen for reads too so
719 # we can tell if the connection is closed. If there is
720 # data in the read buffer we won't run the close callback
721 # yet anyway, so we don't need to listen in this case.
722 state |= self.io_loop.READ
723 if state != self._state:
724 assert (
725 self._state is not None
726 ), "shouldn't happen: _handle_events without self._state"
727 self._state = state
728 self.io_loop.update_handler(self.fileno(), self._state)
729 except UnsatisfiableReadError as e:
730 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
731 self.close(exc_info=e)
732 except Exception as e:
733 gen_log.error("Uncaught exception, closing connection.", exc_info=True)
734 self.close(exc_info=e)
735 raise
737 def _read_to_buffer_loop(self) -> Optional[int]:
738 # This method is called from _handle_read and _try_inline_read.
739 if self._read_bytes is not None:
740 target_bytes = self._read_bytes # type: Optional[int]
741 elif self._read_max_bytes is not None:
742 target_bytes = self._read_max_bytes
743 elif self.reading():
744 # For read_until without max_bytes, or
745 # read_until_close, read as much as we can before
746 # scanning for the delimiter.
747 target_bytes = None
748 else:
749 target_bytes = 0
750 next_find_pos = 0
751 while not self.closed():
752 # Read from the socket until we get EWOULDBLOCK or equivalent.
753 # SSL sockets do some internal buffering, and if the data is
754 # sitting in the SSL object's buffer select() and friends
755 # can't see it; the only way to find out if it's there is to
756 # try to read it.
757 if self._read_to_buffer() == 0:
758 break
760 # If we've read all the bytes we can use, break out of
761 # this loop.
763 # If we've reached target_bytes, we know we're done.
764 if target_bytes is not None and self._read_buffer_size >= target_bytes:
765 break
767 # Otherwise, we need to call the more expensive find_read_pos.
768 # It's inefficient to do this on every read, so instead
769 # do it on the first read and whenever the read buffer
770 # size has doubled.
771 if self._read_buffer_size >= next_find_pos:
772 pos = self._find_read_pos()
773 if pos is not None:
774 return pos
775 next_find_pos = self._read_buffer_size * 2
776 return self._find_read_pos()
778 def _handle_read(self) -> None:
779 try:
780 pos = self._read_to_buffer_loop()
781 except UnsatisfiableReadError:
782 raise
783 except asyncio.CancelledError:
784 raise
785 except Exception as e:
786 gen_log.warning("error on read: %s" % e)
787 self.close(exc_info=e)
788 return
789 if pos is not None:
790 self._read_from_buffer(pos)
792 def _start_read(self) -> Future:
793 if self._read_future is not None:
794 # It is an error to start a read while a prior read is unresolved.
795 # However, if the prior read is unresolved because the stream was
796 # closed without satisfying it, it's better to raise
797 # StreamClosedError instead of AssertionError. In particular, this
798 # situation occurs in harmless situations in http1connection.py and
799 # an AssertionError would be logged noisily.
800 #
801 # On the other hand, it is legal to start a new read while the
802 # stream is closed, in case the read can be satisfied from the
803 # read buffer. So we only want to check the closed status of the
804 # stream if we need to decide what kind of error to raise for
805 # "already reading".
806 #
807 # These conditions have proven difficult to test; we have no
808 # unittests that reliably verify this behavior so be careful
809 # when making changes here. See #2651 and #2719.
810 self._check_closed()
811 assert self._read_future is None, "Already reading"
812 self._read_future = Future()
813 return self._read_future
815 def _finish_read(self, size: int) -> None:
816 if self._user_read_buffer:
817 self._read_buffer = self._after_user_read_buffer or bytearray()
818 self._after_user_read_buffer = None
819 self._read_buffer_pos = 0
820 self._read_buffer_size = len(self._read_buffer)
821 self._user_read_buffer = False
822 result = size # type: Union[int, bytes]
823 else:
824 result = self._consume(size)
825 if self._read_future is not None:
826 future = self._read_future
827 self._read_future = None
828 future_set_result_unless_cancelled(future, result)
829 self._maybe_add_error_listener()
831 def _try_inline_read(self) -> None:
832 """Attempt to complete the current read operation from buffered data.
834 If the read can be completed without blocking, schedules the
835 read callback on the next IOLoop iteration; otherwise starts
836 listening for reads on the socket.
837 """
838 # See if we've already got the data from a previous read
839 pos = self._find_read_pos()
840 if pos is not None:
841 self._read_from_buffer(pos)
842 return
843 self._check_closed()
844 pos = self._read_to_buffer_loop()
845 if pos is not None:
846 self._read_from_buffer(pos)
847 return
848 # We couldn't satisfy the read inline, so make sure we're
849 # listening for new data unless the stream is closed.
850 if not self.closed():
851 self._add_io_state(ioloop.IOLoop.READ)
853 def _read_to_buffer(self) -> Optional[int]:
854 """Reads from the socket and appends the result to the read buffer.
856 Returns the number of bytes read. Returns 0 if there is nothing
857 to read (i.e. the read returns EWOULDBLOCK or equivalent). On
858 error closes the socket and raises an exception.
859 """
860 try:
861 while True:
862 try:
863 if self._user_read_buffer:
864 buf = memoryview(self._read_buffer)[
865 self._read_buffer_size :
866 ] # type: Union[memoryview, bytearray]
867 else:
868 buf = bytearray(self.read_chunk_size)
869 bytes_read = self.read_from_fd(buf)
870 except (socket.error, IOError, OSError) as e:
871 # ssl.SSLError is a subclass of socket.error
872 if self._is_connreset(e):
873 # Treat ECONNRESET as a connection close rather than
874 # an error to minimize log spam (the exception will
875 # be available on self.error for apps that care).
876 self.close(exc_info=e)
877 return None
878 self.close(exc_info=e)
879 raise
880 break
881 if bytes_read is None:
882 return 0
883 elif bytes_read == 0:
884 self.close()
885 return 0
886 if not self._user_read_buffer:
887 self._read_buffer += memoryview(buf)[:bytes_read]
888 self._read_buffer_size += bytes_read
889 finally:
890 # Break the reference to buf so we don't waste a chunk's worth of
891 # memory in case an exception hangs on to our stack frame.
892 del buf
893 if self._read_buffer_size > self.max_buffer_size:
894 gen_log.error("Reached maximum read buffer size")
895 self.close()
896 raise StreamBufferFullError("Reached maximum read buffer size")
897 return bytes_read
899 def _read_from_buffer(self, pos: int) -> None:
900 """Attempts to complete the currently-pending read from the buffer.
902 The argument is either a position in the read buffer or None,
903 as returned by _find_read_pos.
904 """
905 self._read_bytes = self._read_delimiter = self._read_regex = None
906 self._read_partial = False
907 self._finish_read(pos)
909 def _find_read_pos(self) -> Optional[int]:
910 """Attempts to find a position in the read buffer that satisfies
911 the currently-pending read.
913 Returns a position in the buffer if the current read can be satisfied,
914 or None if it cannot.
915 """
916 if self._read_bytes is not None and (
917 self._read_buffer_size >= self._read_bytes
918 or (self._read_partial and self._read_buffer_size > 0)
919 ):
920 num_bytes = min(self._read_bytes, self._read_buffer_size)
921 return num_bytes
922 elif self._read_delimiter is not None:
923 # Multi-byte delimiters (e.g. '\r\n') may straddle two
924 # chunks in the read buffer, so we can't easily find them
925 # without collapsing the buffer. However, since protocols
926 # using delimited reads (as opposed to reads of a known
927 # length) tend to be "line" oriented, the delimiter is likely
928 # to be in the first few chunks. Merge the buffer gradually
929 # since large merges are relatively expensive and get undone in
930 # _consume().
931 if self._read_buffer:
932 loc = self._read_buffer.find(
933 self._read_delimiter, self._read_buffer_pos
934 )
935 if loc != -1:
936 loc -= self._read_buffer_pos
937 delimiter_len = len(self._read_delimiter)
938 self._check_max_bytes(self._read_delimiter, loc + delimiter_len)
939 return loc + delimiter_len
940 self._check_max_bytes(self._read_delimiter, self._read_buffer_size)
941 elif self._read_regex is not None:
942 if self._read_buffer:
943 m = self._read_regex.search(self._read_buffer, self._read_buffer_pos)
944 if m is not None:
945 loc = m.end() - self._read_buffer_pos
946 self._check_max_bytes(self._read_regex, loc)
947 return loc
948 self._check_max_bytes(self._read_regex, self._read_buffer_size)
949 return None
951 def _check_max_bytes(self, delimiter: Union[bytes, Pattern], size: int) -> None:
952 if self._read_max_bytes is not None and size > self._read_max_bytes:
953 raise UnsatisfiableReadError(
954 "delimiter %r not found within %d bytes"
955 % (delimiter, self._read_max_bytes)
956 )
958 def _handle_write(self) -> None:
959 while True:
960 size = len(self._write_buffer)
961 if not size:
962 break
963 assert size > 0
964 try:
965 if _WINDOWS:
966 # On windows, socket.send blows up if given a
967 # write buffer that's too large, instead of just
968 # returning the number of bytes it was able to
969 # process. Therefore we must not call socket.send
970 # with more than 128KB at a time.
971 size = 128 * 1024
973 num_bytes = self.write_to_fd(self._write_buffer.peek(size))
974 if num_bytes == 0:
975 break
976 self._write_buffer.advance(num_bytes)
977 self._total_write_done_index += num_bytes
978 except BlockingIOError:
979 break
980 except (socket.error, IOError, OSError) as e:
981 if not self._is_connreset(e):
982 # Broken pipe errors are usually caused by connection
983 # reset, and its better to not log EPIPE errors to
984 # minimize log spam
985 gen_log.warning("Write error on %s: %s", self.fileno(), e)
986 self.close(exc_info=e)
987 return
989 while self._write_futures:
990 index, future = self._write_futures[0]
991 if index > self._total_write_done_index:
992 break
993 self._write_futures.popleft()
994 future_set_result_unless_cancelled(future, None)
996 def _consume(self, loc: int) -> bytes:
997 # Consume loc bytes from the read buffer and return them
998 if loc == 0:
999 return b""
1000 assert loc <= self._read_buffer_size
1001 # Slice the bytearray buffer into bytes, without intermediate copying
1002 b = (
1003 memoryview(self._read_buffer)[
1004 self._read_buffer_pos : self._read_buffer_pos + loc
1005 ]
1006 ).tobytes()
1007 self._read_buffer_pos += loc
1008 self._read_buffer_size -= loc
1009 # Amortized O(1) shrink
1010 # (this heuristic is implemented natively in Python 3.4+
1011 # but is replicated here for Python 2)
1012 if self._read_buffer_pos > self._read_buffer_size:
1013 del self._read_buffer[: self._read_buffer_pos]
1014 self._read_buffer_pos = 0
1015 return b
1017 def _check_closed(self) -> None:
1018 if self.closed():
1019 raise StreamClosedError(real_error=self.error)
1021 def _maybe_add_error_listener(self) -> None:
1022 # This method is part of an optimization: to detect a connection that
1023 # is closed when we're not actively reading or writing, we must listen
1024 # for read events. However, it is inefficient to do this when the
1025 # connection is first established because we are going to read or write
1026 # immediately anyway. Instead, we insert checks at various times to
1027 # see if the connection is idle and add the read listener then.
1028 if self._state is None or self._state == ioloop.IOLoop.ERROR:
1029 if (
1030 not self.closed()
1031 and self._read_buffer_size == 0
1032 and self._close_callback is not None
1033 ):
1034 self._add_io_state(ioloop.IOLoop.READ)
1036 def _add_io_state(self, state: int) -> None:
1037 """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
1039 Implementation notes: Reads and writes have a fast path and a
1040 slow path. The fast path reads synchronously from socket
1041 buffers, while the slow path uses `_add_io_state` to schedule
1042 an IOLoop callback.
1044 To detect closed connections, we must have called
1045 `_add_io_state` at some point, but we want to delay this as
1046 much as possible so we don't have to set an `IOLoop.ERROR`
1047 listener that will be overwritten by the next slow-path
1048 operation. If a sequence of fast-path ops do not end in a
1049 slow-path op, (e.g. for an @asynchronous long-poll request),
1050 we must add the error handler.
1052 TODO: reevaluate this now that callbacks are gone.
1054 """
1055 if self.closed():
1056 # connection has been closed, so there can be no future events
1057 return
1058 if self._state is None:
1059 self._state = ioloop.IOLoop.ERROR | state
1060 self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
1061 elif not self._state & state:
1062 self._state = self._state | state
1063 self.io_loop.update_handler(self.fileno(), self._state)
1065 def _is_connreset(self, exc: BaseException) -> bool:
1066 """Return ``True`` if exc is ECONNRESET or equivalent.
1068 May be overridden in subclasses.
1069 """
1070 return (
1071 isinstance(exc, (socket.error, IOError))
1072 and errno_from_exception(exc) in _ERRNO_CONNRESET
1073 )
1076class IOStream(BaseIOStream):
1077 r"""Socket-based `IOStream` implementation.
1079 This class supports the read and write methods from `BaseIOStream`
1080 plus a `connect` method.
1082 The ``socket`` parameter may either be connected or unconnected.
1083 For server operations the socket is the result of calling
1084 `socket.accept <socket.socket.accept>`. For client operations the
1085 socket is created with `socket.socket`, and may either be
1086 connected before passing it to the `IOStream` or connected with
1087 `IOStream.connect`.
1089 A very simple (and broken) HTTP client using this class:
1091 .. testcode::
1093 import tornado.ioloop
1094 import tornado.iostream
1095 import socket
1097 async def main():
1098 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1099 stream = tornado.iostream.IOStream(s)
1100 await stream.connect(("friendfeed.com", 80))
1101 await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
1102 header_data = await stream.read_until(b"\r\n\r\n")
1103 headers = {}
1104 for line in header_data.split(b"\r\n"):
1105 parts = line.split(b":")
1106 if len(parts) == 2:
1107 headers[parts[0].strip()] = parts[1].strip()
1108 body_data = await stream.read_bytes(int(headers[b"Content-Length"]))
1109 print(body_data)
1110 stream.close()
1112 if __name__ == '__main__':
1113 asyncio.run(main())
1115 .. testoutput::
1116 :hide:
1118 """
1120 def __init__(self, socket: socket.socket, *args: Any, **kwargs: Any) -> None:
1121 self.socket = socket
1122 self.socket.setblocking(False)
1123 super().__init__(*args, **kwargs)
1125 def fileno(self) -> Union[int, ioloop._Selectable]:
1126 return self.socket
1128 def close_fd(self) -> None:
1129 self.socket.close()
1130 self.socket = None # type: ignore
1132 def get_fd_error(self) -> Optional[Exception]:
1133 errno = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1134 return socket.error(errno, os.strerror(errno))
1136 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1137 try:
1138 return self.socket.recv_into(buf, len(buf))
1139 except BlockingIOError:
1140 return None
1141 finally:
1142 del buf
1144 def write_to_fd(self, data: memoryview) -> int:
1145 try:
1146 return self.socket.send(data) # type: ignore
1147 finally:
1148 # Avoid keeping to data, which can be a memoryview.
1149 # See https://github.com/tornadoweb/tornado/pull/2008
1150 del data
1152 def connect(
1153 self: _IOStreamType, address: Any, server_hostname: Optional[str] = None
1154 ) -> "Future[_IOStreamType]":
1155 """Connects the socket to a remote address without blocking.
1157 May only be called if the socket passed to the constructor was
1158 not previously connected. The address parameter is in the
1159 same format as for `socket.connect <socket.socket.connect>` for
1160 the type of socket passed to the IOStream constructor,
1161 e.g. an ``(ip, port)`` tuple. Hostnames are accepted here,
1162 but will be resolved synchronously and block the IOLoop.
1163 If you have a hostname instead of an IP address, the `.TCPClient`
1164 class is recommended instead of calling this method directly.
1165 `.TCPClient` will do asynchronous DNS resolution and handle
1166 both IPv4 and IPv6.
1168 If ``callback`` is specified, it will be called with no
1169 arguments when the connection is completed; if not this method
1170 returns a `.Future` (whose result after a successful
1171 connection will be the stream itself).
1173 In SSL mode, the ``server_hostname`` parameter will be used
1174 for certificate validation (unless disabled in the
1175 ``ssl_options``) and SNI (if supported; requires Python
1176 2.7.9+).
1178 Note that it is safe to call `IOStream.write
1179 <BaseIOStream.write>` while the connection is pending, in
1180 which case the data will be written as soon as the connection
1181 is ready. Calling `IOStream` read methods before the socket is
1182 connected works on some platforms but is non-portable.
1184 .. versionchanged:: 4.0
1185 If no callback is given, returns a `.Future`.
1187 .. versionchanged:: 4.2
1188 SSL certificates are validated by default; pass
1189 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
1190 suitably-configured `ssl.SSLContext` to the
1191 `SSLIOStream` constructor to disable.
1193 .. versionchanged:: 6.0
1195 The ``callback`` argument was removed. Use the returned
1196 `.Future` instead.
1198 """
1199 self._connecting = True
1200 future = Future() # type: Future[_IOStreamType]
1201 self._connect_future = typing.cast("Future[IOStream]", future)
1202 try:
1203 self.socket.connect(address)
1204 except BlockingIOError:
1205 # In non-blocking mode we expect connect() to raise an
1206 # exception with EINPROGRESS or EWOULDBLOCK.
1207 pass
1208 except socket.error as e:
1209 # On freebsd, other errors such as ECONNREFUSED may be
1210 # returned immediately when attempting to connect to
1211 # localhost, so handle them the same way as an error
1212 # reported later in _handle_connect.
1213 if future is None:
1214 gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), e)
1215 self.close(exc_info=e)
1216 return future
1217 self._add_io_state(self.io_loop.WRITE)
1218 return future
1220 def start_tls(
1221 self,
1222 server_side: bool,
1223 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
1224 server_hostname: Optional[str] = None,
1225 ) -> Awaitable["SSLIOStream"]:
1226 """Convert this `IOStream` to an `SSLIOStream`.
1228 This enables protocols that begin in clear-text mode and
1229 switch to SSL after some initial negotiation (such as the
1230 ``STARTTLS`` extension to SMTP and IMAP).
1232 This method cannot be used if there are outstanding reads
1233 or writes on the stream, or if there is any data in the
1234 IOStream's buffer (data in the operating system's socket
1235 buffer is allowed). This means it must generally be used
1236 immediately after reading or writing the last clear-text
1237 data. It can also be used immediately after connecting,
1238 before any reads or writes.
1240 The ``ssl_options`` argument may be either an `ssl.SSLContext`
1241 object or a dictionary of keyword arguments for the
1242 `ssl.wrap_socket` function. The ``server_hostname`` argument
1243 will be used for certificate validation unless disabled
1244 in the ``ssl_options``.
1246 This method returns a `.Future` whose result is the new
1247 `SSLIOStream`. After this method has been called,
1248 any other operation on the original stream is undefined.
1250 If a close callback is defined on this stream, it will be
1251 transferred to the new stream.
1253 .. versionadded:: 4.0
1255 .. versionchanged:: 4.2
1256 SSL certificates are validated by default; pass
1257 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
1258 suitably-configured `ssl.SSLContext` to disable.
1259 """
1260 if (
1261 self._read_future
1262 or self._write_futures
1263 or self._connect_future
1264 or self._closed
1265 or self._read_buffer
1266 or self._write_buffer
1267 ):
1268 raise ValueError("IOStream is not idle; cannot convert to SSL")
1269 if ssl_options is None:
1270 if server_side:
1271 ssl_options = _server_ssl_defaults
1272 else:
1273 ssl_options = _client_ssl_defaults
1275 socket = self.socket
1276 self.io_loop.remove_handler(socket)
1277 self.socket = None # type: ignore
1278 socket = ssl_wrap_socket(
1279 socket,
1280 ssl_options,
1281 server_hostname=server_hostname,
1282 server_side=server_side,
1283 do_handshake_on_connect=False,
1284 )
1285 orig_close_callback = self._close_callback
1286 self._close_callback = None
1288 future = Future() # type: Future[SSLIOStream]
1289 ssl_stream = SSLIOStream(socket, ssl_options=ssl_options)
1290 ssl_stream.set_close_callback(orig_close_callback)
1291 ssl_stream._ssl_connect_future = future
1292 ssl_stream.max_buffer_size = self.max_buffer_size
1293 ssl_stream.read_chunk_size = self.read_chunk_size
1294 return future
1296 def _handle_connect(self) -> None:
1297 try:
1298 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1299 except socket.error as e:
1300 # Hurd doesn't allow SO_ERROR for loopback sockets because all
1301 # errors for such sockets are reported synchronously.
1302 if errno_from_exception(e) == errno.ENOPROTOOPT:
1303 err = 0
1304 if err != 0:
1305 self.error = socket.error(err, os.strerror(err))
1306 # IOLoop implementations may vary: some of them return
1307 # an error state before the socket becomes writable, so
1308 # in that case a connection failure would be handled by the
1309 # error path in _handle_events instead of here.
1310 if self._connect_future is None:
1311 gen_log.warning(
1312 "Connect error on fd %s: %s",
1313 self.socket.fileno(),
1314 errno.errorcode[err],
1315 )
1316 self.close()
1317 return
1318 if self._connect_future is not None:
1319 future = self._connect_future
1320 self._connect_future = None
1321 future_set_result_unless_cancelled(future, self)
1322 self._connecting = False
1324 def set_nodelay(self, value: bool) -> None:
1325 if self.socket is not None and self.socket.family in (
1326 socket.AF_INET,
1327 socket.AF_INET6,
1328 ):
1329 try:
1330 self.socket.setsockopt(
1331 socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 if value else 0
1332 )
1333 except socket.error as e:
1334 # Sometimes setsockopt will fail if the socket is closed
1335 # at the wrong time. This can happen with HTTPServer
1336 # resetting the value to ``False`` between requests.
1337 if e.errno != errno.EINVAL and not self._is_connreset(e):
1338 raise
1341class SSLIOStream(IOStream):
1342 """A utility class to write to and read from a non-blocking SSL socket.
1344 If the socket passed to the constructor is already connected,
1345 it should be wrapped with::
1347 ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
1349 before constructing the `SSLIOStream`. Unconnected sockets will be
1350 wrapped when `IOStream.connect` is finished.
1351 """
1353 socket = None # type: ssl.SSLSocket
1355 def __init__(self, *args: Any, **kwargs: Any) -> None:
1356 """The ``ssl_options`` keyword argument may either be an
1357 `ssl.SSLContext` object or a dictionary of keywords arguments
1358 for `ssl.wrap_socket`
1359 """
1360 self._ssl_options = kwargs.pop("ssl_options", _client_ssl_defaults)
1361 super().__init__(*args, **kwargs)
1362 self._ssl_accepting = True
1363 self._handshake_reading = False
1364 self._handshake_writing = False
1365 self._server_hostname = None # type: Optional[str]
1367 # If the socket is already connected, attempt to start the handshake.
1368 try:
1369 self.socket.getpeername()
1370 except socket.error:
1371 pass
1372 else:
1373 # Indirectly start the handshake, which will run on the next
1374 # IOLoop iteration and then the real IO state will be set in
1375 # _handle_events.
1376 self._add_io_state(self.io_loop.WRITE)
1378 def reading(self) -> bool:
1379 return self._handshake_reading or super().reading()
1381 def writing(self) -> bool:
1382 return self._handshake_writing or super().writing()
1384 def _do_ssl_handshake(self) -> None:
1385 # Based on code from test_ssl.py in the python stdlib
1386 try:
1387 self._handshake_reading = False
1388 self._handshake_writing = False
1389 self.socket.do_handshake()
1390 except ssl.SSLError as err:
1391 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
1392 self._handshake_reading = True
1393 return
1394 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
1395 self._handshake_writing = True
1396 return
1397 elif err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
1398 return self.close(exc_info=err)
1399 elif err.args[0] == ssl.SSL_ERROR_SSL:
1400 try:
1401 peer = self.socket.getpeername()
1402 except Exception:
1403 peer = "(not connected)"
1404 gen_log.warning(
1405 "SSL Error on %s %s: %s", self.socket.fileno(), peer, err
1406 )
1407 return self.close(exc_info=err)
1408 raise
1409 except ssl.CertificateError as err:
1410 # CertificateError can happen during handshake (hostname
1411 # verification) and should be passed to user. Starting
1412 # in Python 3.7, this error is a subclass of SSLError
1413 # and will be handled by the previous block instead.
1414 return self.close(exc_info=err)
1415 except socket.error as err:
1416 # Some port scans (e.g. nmap in -sT mode) have been known
1417 # to cause do_handshake to raise EBADF and ENOTCONN, so make
1418 # those errors quiet as well.
1419 # https://groups.google.com/forum/?fromgroups#!topic/python-tornado/ApucKJat1_0
1420 # Errno 0 is also possible in some cases (nc -z).
1421 # https://github.com/tornadoweb/tornado/issues/2504
1422 if self._is_connreset(err) or err.args[0] in (
1423 0,
1424 errno.EBADF,
1425 errno.ENOTCONN,
1426 ):
1427 return self.close(exc_info=err)
1428 raise
1429 except AttributeError as err:
1430 # On Linux, if the connection was reset before the call to
1431 # wrap_socket, do_handshake will fail with an
1432 # AttributeError.
1433 return self.close(exc_info=err)
1434 else:
1435 self._ssl_accepting = False
1436 if not self._verify_cert(self.socket.getpeercert()):
1437 self.close()
1438 return
1439 self._finish_ssl_connect()
1441 def _finish_ssl_connect(self) -> None:
1442 if self._ssl_connect_future is not None:
1443 future = self._ssl_connect_future
1444 self._ssl_connect_future = None
1445 future_set_result_unless_cancelled(future, self)
1447 def _verify_cert(self, peercert: Any) -> bool:
1448 """Returns ``True`` if peercert is valid according to the configured
1449 validation mode and hostname.
1451 The ssl handshake already tested the certificate for a valid
1452 CA signature; the only thing that remains is to check
1453 the hostname.
1454 """
1455 if isinstance(self._ssl_options, dict):
1456 verify_mode = self._ssl_options.get("cert_reqs", ssl.CERT_NONE)
1457 elif isinstance(self._ssl_options, ssl.SSLContext):
1458 verify_mode = self._ssl_options.verify_mode
1459 assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
1460 if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
1461 return True
1462 cert = self.socket.getpeercert()
1463 if cert is None and verify_mode == ssl.CERT_REQUIRED:
1464 gen_log.warning("No SSL certificate given")
1465 return False
1466 try:
1467 ssl.match_hostname(peercert, self._server_hostname)
1468 except ssl.CertificateError as e:
1469 gen_log.warning("Invalid SSL certificate: %s" % e)
1470 return False
1471 else:
1472 return True
1474 def _handle_read(self) -> None:
1475 if self._ssl_accepting:
1476 self._do_ssl_handshake()
1477 return
1478 super()._handle_read()
1480 def _handle_write(self) -> None:
1481 if self._ssl_accepting:
1482 self._do_ssl_handshake()
1483 return
1484 super()._handle_write()
1486 def connect(
1487 self, address: Tuple, server_hostname: Optional[str] = None
1488 ) -> "Future[SSLIOStream]":
1489 self._server_hostname = server_hostname
1490 # Ignore the result of connect(). If it fails,
1491 # wait_for_handshake will raise an error too. This is
1492 # necessary for the old semantics of the connect callback
1493 # (which takes no arguments). In 6.0 this can be refactored to
1494 # be a regular coroutine.
1495 # TODO: This is trickier than it looks, since if write()
1496 # is called with a connect() pending, we want the connect
1497 # to resolve before the write. Or do we care about this?
1498 # (There's a test for it, but I think in practice users
1499 # either wait for the connect before performing a write or
1500 # they don't care about the connect Future at all)
1501 fut = super().connect(address)
1502 fut.add_done_callback(lambda f: f.exception())
1503 return self.wait_for_handshake()
1505 def _handle_connect(self) -> None:
1506 # Call the superclass method to check for errors.
1507 super()._handle_connect()
1508 if self.closed():
1509 return
1510 # When the connection is complete, wrap the socket for SSL
1511 # traffic. Note that we do this by overriding _handle_connect
1512 # instead of by passing a callback to super().connect because
1513 # user callbacks are enqueued asynchronously on the IOLoop,
1514 # but since _handle_events calls _handle_connect immediately
1515 # followed by _handle_write we need this to be synchronous.
1516 #
1517 # The IOLoop will get confused if we swap out self.socket while the
1518 # fd is registered, so remove it now and re-register after
1519 # wrap_socket().
1520 self.io_loop.remove_handler(self.socket)
1521 old_state = self._state
1522 assert old_state is not None
1523 self._state = None
1524 self.socket = ssl_wrap_socket(
1525 self.socket,
1526 self._ssl_options,
1527 server_hostname=self._server_hostname,
1528 do_handshake_on_connect=False,
1529 server_side=False,
1530 )
1531 self._add_io_state(old_state)
1533 def wait_for_handshake(self) -> "Future[SSLIOStream]":
1534 """Wait for the initial SSL handshake to complete.
1536 If a ``callback`` is given, it will be called with no
1537 arguments once the handshake is complete; otherwise this
1538 method returns a `.Future` which will resolve to the
1539 stream itself after the handshake is complete.
1541 Once the handshake is complete, information such as
1542 the peer's certificate and NPN/ALPN selections may be
1543 accessed on ``self.socket``.
1545 This method is intended for use on server-side streams
1546 or after using `IOStream.start_tls`; it should not be used
1547 with `IOStream.connect` (which already waits for the
1548 handshake to complete). It may only be called once per stream.
1550 .. versionadded:: 4.2
1552 .. versionchanged:: 6.0
1554 The ``callback`` argument was removed. Use the returned
1555 `.Future` instead.
1557 """
1558 if self._ssl_connect_future is not None:
1559 raise RuntimeError("Already waiting")
1560 future = self._ssl_connect_future = Future()
1561 if not self._ssl_accepting:
1562 self._finish_ssl_connect()
1563 return future
1565 def write_to_fd(self, data: memoryview) -> int:
1566 # clip buffer size at 1GB since SSL sockets only support upto 2GB
1567 # this change in behaviour is transparent, since the function is
1568 # already expected to (possibly) write less than the provided buffer
1569 if len(data) >> 30:
1570 data = memoryview(data)[: 1 << 30]
1571 try:
1572 return self.socket.send(data) # type: ignore
1573 except ssl.SSLError as e:
1574 if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
1575 # In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
1576 # the socket is not writeable; we need to transform this into
1577 # an EWOULDBLOCK socket.error or a zero return value,
1578 # either of which will be recognized by the caller of this
1579 # method. Prior to Python 3.5, an unwriteable socket would
1580 # simply return 0 bytes written.
1581 return 0
1582 raise
1583 finally:
1584 # Avoid keeping to data, which can be a memoryview.
1585 # See https://github.com/tornadoweb/tornado/pull/2008
1586 del data
1588 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1589 try:
1590 if self._ssl_accepting:
1591 # If the handshake hasn't finished yet, there can't be anything
1592 # to read (attempting to read may or may not raise an exception
1593 # depending on the SSL version)
1594 return None
1595 # clip buffer size at 1GB since SSL sockets only support upto 2GB
1596 # this change in behaviour is transparent, since the function is
1597 # already expected to (possibly) read less than the provided buffer
1598 if len(buf) >> 30:
1599 buf = memoryview(buf)[: 1 << 30]
1600 try:
1601 return self.socket.recv_into(buf, len(buf))
1602 except ssl.SSLError as e:
1603 # SSLError is a subclass of socket.error, so this except
1604 # block must come first.
1605 if e.args[0] == ssl.SSL_ERROR_WANT_READ:
1606 return None
1607 else:
1608 raise
1609 except BlockingIOError:
1610 return None
1611 finally:
1612 del buf
1614 def _is_connreset(self, e: BaseException) -> bool:
1615 if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF:
1616 return True
1617 return super()._is_connreset(e)
1620class PipeIOStream(BaseIOStream):
1621 """Pipe-based `IOStream` implementation.
1623 The constructor takes an integer file descriptor (such as one returned
1624 by `os.pipe`) rather than an open file object. Pipes are generally
1625 one-way, so a `PipeIOStream` can be used for reading or writing but not
1626 both.
1628 ``PipeIOStream`` is only available on Unix-based platforms.
1629 """
1631 def __init__(self, fd: int, *args: Any, **kwargs: Any) -> None:
1632 self.fd = fd
1633 self._fio = io.FileIO(self.fd, "r+")
1634 if sys.platform == "win32":
1635 # The form and placement of this assertion is important to mypy.
1636 # A plain assert statement isn't recognized here. If the assertion
1637 # were earlier it would worry that the attributes of self aren't
1638 # set on windows. If it were missing it would complain about
1639 # the absence of the set_blocking function.
1640 raise AssertionError("PipeIOStream is not supported on Windows")
1641 os.set_blocking(fd, False)
1642 super().__init__(*args, **kwargs)
1644 def fileno(self) -> int:
1645 return self.fd
1647 def close_fd(self) -> None:
1648 self._fio.close()
1650 def write_to_fd(self, data: memoryview) -> int:
1651 try:
1652 return os.write(self.fd, data) # type: ignore
1653 finally:
1654 # Avoid keeping to data, which can be a memoryview.
1655 # See https://github.com/tornadoweb/tornado/pull/2008
1656 del data
1658 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1659 try:
1660 return self._fio.readinto(buf) # type: ignore
1661 except (IOError, OSError) as e:
1662 if errno_from_exception(e) == errno.EBADF:
1663 # If the writing half of a pipe is closed, select will
1664 # report it as readable but reads will fail with EBADF.
1665 self.close(exc_info=e)
1666 return None
1667 else:
1668 raise
1669 finally:
1670 del buf
1673def doctests() -> Any:
1674 import doctest
1676 return doctest.DocTestSuite()