1#
2# Copyright 2009 Facebook
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16"""Utility classes to write to and read from non-blocking files and sockets.
17
18Contents:
19
20* `BaseIOStream`: Generic interface for reading and writing.
21* `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
22* `SSLIOStream`: SSL-aware version of IOStream.
23* `PipeIOStream`: Pipe-based IOStream implementation.
24"""
25
26import asyncio
27import collections
28import errno
29import io
30import numbers
31import os
32import socket
33import ssl
34import sys
35import re
36
37from tornado.concurrent import Future, future_set_result_unless_cancelled
38from tornado import ioloop
39from tornado.log import gen_log
40from tornado.netutil import ssl_wrap_socket, _client_ssl_defaults, _server_ssl_defaults
41from tornado.util import errno_from_exception
42
43import typing
44from typing import (
45 Union,
46 Optional,
47 Awaitable,
48 Callable,
49 Pattern,
50 Any,
51 Dict,
52 TypeVar,
53 Tuple,
54)
55from types import TracebackType
56
57if typing.TYPE_CHECKING:
58 from typing import Deque, List, Type # noqa: F401
59
60_IOStreamType = TypeVar("_IOStreamType", bound="IOStream")
61
62# These errnos indicate that a connection has been abruptly terminated.
63# They should be caught and handled less noisily than other errors.
64_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE, errno.ETIMEDOUT)
65
66if hasattr(errno, "WSAECONNRESET"):
67 _ERRNO_CONNRESET += ( # type: ignore
68 errno.WSAECONNRESET, # type: ignore
69 errno.WSAECONNABORTED, # type: ignore
70 errno.WSAETIMEDOUT, # type: ignore
71 )
72
73if sys.platform == "darwin":
74 # OSX appears to have a race condition that causes send(2) to return
75 # EPROTOTYPE if called while a socket is being torn down:
76 # http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/
77 # Since the socket is being closed anyway, treat this as an ECONNRESET
78 # instead of an unexpected error.
79 _ERRNO_CONNRESET += (errno.EPROTOTYPE,) # type: ignore
80
81_WINDOWS = sys.platform.startswith("win")
82
83
84class StreamClosedError(IOError):
85 """Exception raised by `IOStream` methods when the stream is closed.
86
87 Note that the close callback is scheduled to run *after* other
88 callbacks on the stream (to allow for buffered data to be processed),
89 so you may see this error before you see the close callback.
90
91 The ``real_error`` attribute contains the underlying error that caused
92 the stream to close (if any).
93
94 .. versionchanged:: 4.3
95 Added the ``real_error`` attribute.
96 """
97
98 def __init__(self, real_error: Optional[BaseException] = None) -> None:
99 super().__init__("Stream is closed")
100 self.real_error = real_error
101
102
103class UnsatisfiableReadError(Exception):
104 """Exception raised when a read cannot be satisfied.
105
106 Raised by ``read_until`` and ``read_until_regex`` with a ``max_bytes``
107 argument.
108 """
109
110 pass
111
112
113class StreamBufferFullError(Exception):
114 """Exception raised by `IOStream` methods when the buffer is full."""
115
116
117class _StreamBuffer:
118 """
119 A specialized buffer that tries to avoid copies when large pieces
120 of data are encountered.
121 """
122
123 def __init__(self) -> None:
124 # A sequence of (False, bytearray) and (True, memoryview) objects
125 self._buffers = (
126 collections.deque()
127 ) # type: Deque[Tuple[bool, Union[bytearray, memoryview]]]
128 # Position in the first buffer
129 self._first_pos = 0
130 self._size = 0
131
132 def __len__(self) -> int:
133 return self._size
134
135 # Data above this size will be appended separately instead
136 # of extending an existing bytearray
137 _large_buf_threshold = 2048
138
139 def append(self, data: Union[bytes, bytearray, memoryview]) -> None:
140 """
141 Append the given piece of data (should be a buffer-compatible object).
142 """
143 size = len(data)
144 if size > self._large_buf_threshold:
145 if not isinstance(data, memoryview):
146 data = memoryview(data)
147 self._buffers.append((True, data))
148 elif size > 0:
149 if self._buffers:
150 is_memview, b = self._buffers[-1]
151 new_buf = is_memview or len(b) >= self._large_buf_threshold
152 else:
153 new_buf = True
154 if new_buf:
155 self._buffers.append((False, bytearray(data)))
156 else:
157 b += data # type: ignore
158
159 self._size += size
160
161 def peek(self, size: int) -> memoryview:
162 """
163 Get a view over at most ``size`` bytes (possibly fewer) at the
164 current buffer position.
165 """
166 assert size > 0
167 try:
168 is_memview, b = self._buffers[0]
169 except IndexError:
170 return memoryview(b"")
171
172 pos = self._first_pos
173 if is_memview:
174 return typing.cast(memoryview, b[pos : pos + size])
175 else:
176 return memoryview(b)[pos : pos + size]
177
178 def advance(self, size: int) -> None:
179 """
180 Advance the current buffer position by ``size`` bytes.
181 """
182 assert 0 < size <= self._size
183 self._size -= size
184 pos = self._first_pos
185
186 buffers = self._buffers
187 while buffers and size > 0:
188 is_large, b = buffers[0]
189 b_remain = len(b) - size - pos
190 if b_remain <= 0:
191 buffers.popleft()
192 size -= len(b) - pos
193 pos = 0
194 elif is_large:
195 pos += size
196 size = 0
197 else:
198 pos += size
199 del typing.cast(bytearray, b)[:pos]
200 pos = 0
201 size = 0
202
203 assert size == 0
204 self._first_pos = pos
205
206
207class BaseIOStream:
208 """A utility class to write to and read from a non-blocking file or socket.
209
210 We support a non-blocking ``write()`` and a family of ``read_*()``
211 methods. When the operation completes, the ``Awaitable`` will resolve
212 with the data read (or ``None`` for ``write()``). All outstanding
213 ``Awaitables`` will resolve with a `StreamClosedError` when the
214 stream is closed; `.BaseIOStream.set_close_callback` can also be used
215 to be notified of a closed stream.
216
217 When a stream is closed due to an error, the IOStream's ``error``
218 attribute contains the exception object.
219
220 Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
221 `read_from_fd`, and optionally `get_fd_error`.
222
223 """
224
225 def __init__(
226 self,
227 max_buffer_size: Optional[int] = None,
228 read_chunk_size: Optional[int] = None,
229 max_write_buffer_size: Optional[int] = None,
230 ) -> None:
231 """`BaseIOStream` constructor.
232
233 :arg max_buffer_size: Maximum amount of incoming data to buffer;
234 defaults to 100MB.
235 :arg read_chunk_size: Amount of data to read at one time from the
236 underlying transport; defaults to 64KB.
237 :arg max_write_buffer_size: Amount of outgoing data to buffer;
238 defaults to unlimited.
239
240 .. versionchanged:: 4.0
241 Add the ``max_write_buffer_size`` parameter. Changed default
242 ``read_chunk_size`` to 64KB.
243 .. versionchanged:: 5.0
244 The ``io_loop`` argument (deprecated since version 4.1) has been
245 removed.
246 """
247 self.io_loop = ioloop.IOLoop.current()
248 self.max_buffer_size = max_buffer_size or 104857600
249 # A chunk size that is too close to max_buffer_size can cause
250 # spurious failures.
251 self.read_chunk_size = min(read_chunk_size or 65536, self.max_buffer_size // 2)
252 self.max_write_buffer_size = max_write_buffer_size
253 self.error = None # type: Optional[BaseException]
254 self._read_buffer = bytearray()
255 self._read_buffer_size = 0
256 self._user_read_buffer = False
257 self._after_user_read_buffer = None # type: Optional[bytearray]
258 self._write_buffer = _StreamBuffer()
259 self._total_write_index = 0
260 self._total_write_done_index = 0
261 self._read_delimiter = None # type: Optional[bytes]
262 self._read_regex = None # type: Optional[Pattern]
263 self._read_max_bytes = None # type: Optional[int]
264 self._read_bytes = None # type: Optional[int]
265 self._read_partial = False
266 self._read_until_close = False
267 self._read_future = None # type: Optional[Future]
268 self._write_futures = (
269 collections.deque()
270 ) # type: Deque[Tuple[int, Future[None]]]
271 self._close_callback = None # type: Optional[Callable[[], None]]
272 self._connect_future = None # type: Optional[Future[IOStream]]
273 # _ssl_connect_future should be defined in SSLIOStream
274 # but it's here so we can clean it up in _signal_closed
275 # TODO: refactor that so subclasses can add additional futures
276 # to be cancelled.
277 self._ssl_connect_future = None # type: Optional[Future[SSLIOStream]]
278 self._connecting = False
279 self._state = None # type: Optional[int]
280 self._closed = False
281
282 def fileno(self) -> Union[int, ioloop._Selectable]:
283 """Returns the file descriptor for this stream."""
284 raise NotImplementedError()
285
286 def close_fd(self) -> None:
287 """Closes the file underlying this stream.
288
289 ``close_fd`` is called by `BaseIOStream` and should not be called
290 elsewhere; other users should call `close` instead.
291 """
292 raise NotImplementedError()
293
294 def write_to_fd(self, data: memoryview) -> int:
295 """Attempts to write ``data`` to the underlying file.
296
297 Returns the number of bytes written.
298 """
299 raise NotImplementedError()
300
301 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
302 """Attempts to read from the underlying file.
303
304 Reads up to ``len(buf)`` bytes, storing them in the buffer.
305 Returns the number of bytes read. Returns None if there was
306 nothing to read (the socket returned `~errno.EWOULDBLOCK` or
307 equivalent), and zero on EOF.
308
309 .. versionchanged:: 5.0
310
311 Interface redesigned to take a buffer and return a number
312 of bytes instead of a freshly-allocated object.
313 """
314 raise NotImplementedError()
315
316 def get_fd_error(self) -> Optional[Exception]:
317 """Returns information about any error on the underlying file.
318
319 This method is called after the `.IOLoop` has signaled an error on the
320 file descriptor, and should return an Exception (such as `socket.error`
321 with additional information, or None if no such information is
322 available.
323 """
324 return None
325
326 def read_until_regex(
327 self, regex: bytes, max_bytes: Optional[int] = None
328 ) -> Awaitable[bytes]:
329 """Asynchronously read until we have matched the given regex.
330
331 The result includes the data that matches the regex and anything
332 that came before it.
333
334 If ``max_bytes`` is not None, the connection will be closed
335 if more than ``max_bytes`` bytes have been read and the regex is
336 not satisfied.
337
338 .. versionchanged:: 4.0
339 Added the ``max_bytes`` argument. The ``callback`` argument is
340 now optional and a `.Future` will be returned if it is omitted.
341
342 .. versionchanged:: 6.0
343
344 The ``callback`` argument was removed. Use the returned
345 `.Future` instead.
346
347 """
348 future = self._start_read()
349 self._read_regex = re.compile(regex)
350 self._read_max_bytes = max_bytes
351 try:
352 self._try_inline_read()
353 except UnsatisfiableReadError as e:
354 # Handle this the same way as in _handle_events.
355 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
356 self.close(exc_info=e)
357 return future
358 except:
359 # Ensure that the future doesn't log an error because its
360 # failure was never examined.
361 future.add_done_callback(lambda f: f.exception())
362 raise
363 return future
364
365 def read_until(
366 self, delimiter: bytes, max_bytes: Optional[int] = None
367 ) -> Awaitable[bytes]:
368 """Asynchronously read until we have found the given delimiter.
369
370 The result includes all the data read including the delimiter.
371
372 If ``max_bytes`` is not None, the connection will be closed
373 if more than ``max_bytes`` bytes have been read and the delimiter
374 is not found.
375
376 .. versionchanged:: 4.0
377 Added the ``max_bytes`` argument. The ``callback`` argument is
378 now optional and a `.Future` will be returned if it is omitted.
379
380 .. versionchanged:: 6.0
381
382 The ``callback`` argument was removed. Use the returned
383 `.Future` instead.
384 """
385 future = self._start_read()
386 self._read_delimiter = delimiter
387 self._read_max_bytes = max_bytes
388 try:
389 self._try_inline_read()
390 except UnsatisfiableReadError as e:
391 # Handle this the same way as in _handle_events.
392 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
393 self.close(exc_info=e)
394 return future
395 except:
396 future.add_done_callback(lambda f: f.exception())
397 raise
398 return future
399
400 def read_bytes(self, num_bytes: int, partial: bool = False) -> Awaitable[bytes]:
401 """Asynchronously read a number of bytes.
402
403 If ``partial`` is true, data is returned as soon as we have
404 any bytes to return (but never more than ``num_bytes``)
405
406 .. versionchanged:: 4.0
407 Added the ``partial`` argument. The callback argument is now
408 optional and a `.Future` will be returned if it is omitted.
409
410 .. versionchanged:: 6.0
411
412 The ``callback`` and ``streaming_callback`` arguments have
413 been removed. Use the returned `.Future` (and
414 ``partial=True`` for ``streaming_callback``) instead.
415
416 """
417 future = self._start_read()
418 assert isinstance(num_bytes, numbers.Integral)
419 self._read_bytes = num_bytes
420 self._read_partial = partial
421 try:
422 self._try_inline_read()
423 except:
424 future.add_done_callback(lambda f: f.exception())
425 raise
426 return future
427
428 def read_into(self, buf: bytearray, partial: bool = False) -> Awaitable[int]:
429 """Asynchronously read a number of bytes.
430
431 ``buf`` must be a writable buffer into which data will be read.
432
433 If ``partial`` is true, the callback is run as soon as any bytes
434 have been read. Otherwise, it is run when the ``buf`` has been
435 entirely filled with read data.
436
437 .. versionadded:: 5.0
438
439 .. versionchanged:: 6.0
440
441 The ``callback`` argument was removed. Use the returned
442 `.Future` instead.
443
444 """
445 future = self._start_read()
446
447 # First copy data already in read buffer
448 available_bytes = self._read_buffer_size
449 n = len(buf)
450 if available_bytes >= n:
451 buf[:] = memoryview(self._read_buffer)[:n]
452 del self._read_buffer[:n]
453 self._after_user_read_buffer = self._read_buffer
454 elif available_bytes > 0:
455 buf[:available_bytes] = memoryview(self._read_buffer)[:]
456
457 # Set up the supplied buffer as our temporary read buffer.
458 # The original (if it had any data remaining) has been
459 # saved for later.
460 self._user_read_buffer = True
461 self._read_buffer = buf
462 self._read_buffer_size = available_bytes
463 self._read_bytes = n
464 self._read_partial = partial
465
466 try:
467 self._try_inline_read()
468 except:
469 future.add_done_callback(lambda f: f.exception())
470 raise
471 return future
472
473 def read_until_close(self) -> Awaitable[bytes]:
474 """Asynchronously reads all data from the socket until it is closed.
475
476 This will buffer all available data until ``max_buffer_size``
477 is reached. If flow control or cancellation are desired, use a
478 loop with `read_bytes(partial=True) <.read_bytes>` instead.
479
480 .. versionchanged:: 4.0
481 The callback argument is now optional and a `.Future` will
482 be returned if it is omitted.
483
484 .. versionchanged:: 6.0
485
486 The ``callback`` and ``streaming_callback`` arguments have
487 been removed. Use the returned `.Future` (and `read_bytes`
488 with ``partial=True`` for ``streaming_callback``) instead.
489
490 """
491 future = self._start_read()
492 if self.closed():
493 self._finish_read(self._read_buffer_size)
494 return future
495 self._read_until_close = True
496 try:
497 self._try_inline_read()
498 except:
499 future.add_done_callback(lambda f: f.exception())
500 raise
501 return future
502
503 def write(self, data: Union[bytes, memoryview]) -> "Future[None]":
504 """Asynchronously write the given data to this stream.
505
506 This method returns a `.Future` that resolves (with a result
507 of ``None``) when the write has been completed.
508
509 The ``data`` argument may be of type `bytes` or `memoryview`.
510
511 .. versionchanged:: 4.0
512 Now returns a `.Future` if no callback is given.
513
514 .. versionchanged:: 4.5
515 Added support for `memoryview` arguments.
516
517 .. versionchanged:: 6.0
518
519 The ``callback`` argument was removed. Use the returned
520 `.Future` instead.
521
522 """
523 self._check_closed()
524 if data:
525 if isinstance(data, memoryview):
526 # Make sure that ``len(data) == data.nbytes``
527 data = memoryview(data).cast("B")
528 if (
529 self.max_write_buffer_size is not None
530 and len(self._write_buffer) + len(data) > self.max_write_buffer_size
531 ):
532 raise StreamBufferFullError("Reached maximum write buffer size")
533 self._write_buffer.append(data)
534 self._total_write_index += len(data)
535 future = Future() # type: Future[None]
536 future.add_done_callback(lambda f: f.exception())
537 self._write_futures.append((self._total_write_index, future))
538 if not self._connecting:
539 self._handle_write()
540 if self._write_buffer:
541 self._add_io_state(self.io_loop.WRITE)
542 self._maybe_add_error_listener()
543 return future
544
545 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None:
546 """Call the given callback when the stream is closed.
547
548 This mostly is not necessary for applications that use the
549 `.Future` interface; all outstanding ``Futures`` will resolve
550 with a `StreamClosedError` when the stream is closed. However,
551 it is still useful as a way to signal that the stream has been
552 closed while no other read or write is in progress.
553
554 Unlike other callback-based interfaces, ``set_close_callback``
555 was not removed in Tornado 6.0.
556 """
557 self._close_callback = callback
558 self._maybe_add_error_listener()
559
560 def close(
561 self,
562 exc_info: Union[
563 None,
564 bool,
565 BaseException,
566 Tuple[
567 "Optional[Type[BaseException]]",
568 Optional[BaseException],
569 Optional[TracebackType],
570 ],
571 ] = False,
572 ) -> None:
573 """Close this stream.
574
575 If ``exc_info`` is true, set the ``error`` attribute to the current
576 exception from `sys.exc_info` (or if ``exc_info`` is a tuple,
577 use that instead of `sys.exc_info`).
578 """
579 if not self.closed():
580 if exc_info:
581 if isinstance(exc_info, tuple):
582 self.error = exc_info[1]
583 elif isinstance(exc_info, BaseException):
584 self.error = exc_info
585 else:
586 exc_info = sys.exc_info()
587 if any(exc_info):
588 self.error = exc_info[1]
589 if self._read_until_close:
590 self._read_until_close = False
591 self._finish_read(self._read_buffer_size)
592 elif self._read_future is not None:
593 # resolve reads that are pending and ready to complete
594 try:
595 pos = self._find_read_pos()
596 except UnsatisfiableReadError:
597 pass
598 else:
599 if pos is not None:
600 self._read_from_buffer(pos)
601 if self._state is not None:
602 self.io_loop.remove_handler(self.fileno())
603 self._state = None
604 self.close_fd()
605 self._closed = True
606 self._signal_closed()
607
608 def _signal_closed(self) -> None:
609 futures = [] # type: List[Future]
610 if self._read_future is not None:
611 futures.append(self._read_future)
612 self._read_future = None
613 futures += [future for _, future in self._write_futures]
614 self._write_futures.clear()
615 if self._connect_future is not None:
616 futures.append(self._connect_future)
617 self._connect_future = None
618 for future in futures:
619 if not future.done():
620 future.set_exception(StreamClosedError(real_error=self.error))
621 # Reference the exception to silence warnings. Annoyingly,
622 # this raises if the future was cancelled, but just
623 # returns any other error.
624 try:
625 future.exception()
626 except asyncio.CancelledError:
627 pass
628 if self._ssl_connect_future is not None:
629 # _ssl_connect_future expects to see the real exception (typically
630 # an ssl.SSLError), not just StreamClosedError.
631 if not self._ssl_connect_future.done():
632 if self.error is not None:
633 self._ssl_connect_future.set_exception(self.error)
634 else:
635 self._ssl_connect_future.set_exception(StreamClosedError())
636 self._ssl_connect_future.exception()
637 self._ssl_connect_future = None
638 if self._close_callback is not None:
639 cb = self._close_callback
640 self._close_callback = None
641 self.io_loop.add_callback(cb)
642 # Clear the buffers so they can be cleared immediately even
643 # if the IOStream object is kept alive by a reference cycle.
644 # TODO: Clear the read buffer too; it currently breaks some tests.
645 self._write_buffer = None # type: ignore
646
647 def reading(self) -> bool:
648 """Returns ``True`` if we are currently reading from the stream."""
649 return self._read_future is not None
650
651 def writing(self) -> bool:
652 """Returns ``True`` if we are currently writing to the stream."""
653 return bool(self._write_buffer)
654
655 def closed(self) -> bool:
656 """Returns ``True`` if the stream has been closed."""
657 return self._closed
658
659 def set_nodelay(self, value: bool) -> None:
660 """Sets the no-delay flag for this stream.
661
662 By default, data written to TCP streams may be held for a time
663 to make the most efficient use of bandwidth (according to
664 Nagle's algorithm). The no-delay flag requests that data be
665 written as soon as possible, even if doing so would consume
666 additional bandwidth.
667
668 This flag is currently defined only for TCP-based ``IOStreams``.
669
670 .. versionadded:: 3.1
671 """
672 pass
673
674 def _handle_connect(self) -> None:
675 raise NotImplementedError()
676
677 def _handle_events(self, fd: Union[int, ioloop._Selectable], events: int) -> None:
678 if self.closed():
679 gen_log.warning("Got events for closed stream %s", fd)
680 return
681 try:
682 if self._connecting:
683 # Most IOLoops will report a write failed connect
684 # with the WRITE event, but SelectIOLoop reports a
685 # READ as well so we must check for connecting before
686 # either.
687 self._handle_connect()
688 if self.closed():
689 return
690 if events & self.io_loop.READ:
691 self._handle_read()
692 if self.closed():
693 return
694 if events & self.io_loop.WRITE:
695 self._handle_write()
696 if self.closed():
697 return
698 if events & self.io_loop.ERROR:
699 self.error = self.get_fd_error()
700 # We may have queued up a user callback in _handle_read or
701 # _handle_write, so don't close the IOStream until those
702 # callbacks have had a chance to run.
703 self.io_loop.add_callback(self.close)
704 return
705 state = self.io_loop.ERROR
706 if self.reading():
707 state |= self.io_loop.READ
708 if self.writing():
709 state |= self.io_loop.WRITE
710 if state == self.io_loop.ERROR and self._read_buffer_size == 0:
711 # If the connection is idle, listen for reads too so
712 # we can tell if the connection is closed. If there is
713 # data in the read buffer we won't run the close callback
714 # yet anyway, so we don't need to listen in this case.
715 state |= self.io_loop.READ
716 if state != self._state:
717 assert (
718 self._state is not None
719 ), "shouldn't happen: _handle_events without self._state"
720 self._state = state
721 self.io_loop.update_handler(self.fileno(), self._state)
722 except UnsatisfiableReadError as e:
723 gen_log.info("Unsatisfiable read, closing connection: %s" % e)
724 self.close(exc_info=e)
725 except Exception as e:
726 gen_log.error("Uncaught exception, closing connection.", exc_info=True)
727 self.close(exc_info=e)
728 raise
729
730 def _read_to_buffer_loop(self) -> Optional[int]:
731 # This method is called from _handle_read and _try_inline_read.
732 if self._read_bytes is not None:
733 target_bytes = self._read_bytes # type: Optional[int]
734 elif self._read_max_bytes is not None:
735 target_bytes = self._read_max_bytes
736 elif self.reading():
737 # For read_until without max_bytes, or
738 # read_until_close, read as much as we can before
739 # scanning for the delimiter.
740 target_bytes = None
741 else:
742 target_bytes = 0
743 next_find_pos = 0
744 while not self.closed():
745 # Read from the socket until we get EWOULDBLOCK or equivalent.
746 # SSL sockets do some internal buffering, and if the data is
747 # sitting in the SSL object's buffer select() and friends
748 # can't see it; the only way to find out if it's there is to
749 # try to read it.
750 if self._read_to_buffer() == 0:
751 break
752
753 # If we've read all the bytes we can use, break out of
754 # this loop.
755
756 # If we've reached target_bytes, we know we're done.
757 if target_bytes is not None and self._read_buffer_size >= target_bytes:
758 break
759
760 # Otherwise, we need to call the more expensive find_read_pos.
761 # It's inefficient to do this on every read, so instead
762 # do it on the first read and whenever the read buffer
763 # size has doubled.
764 if self._read_buffer_size >= next_find_pos:
765 pos = self._find_read_pos()
766 if pos is not None:
767 return pos
768 next_find_pos = self._read_buffer_size * 2
769 return self._find_read_pos()
770
771 def _handle_read(self) -> None:
772 try:
773 pos = self._read_to_buffer_loop()
774 except UnsatisfiableReadError:
775 raise
776 except asyncio.CancelledError:
777 raise
778 except Exception as e:
779 gen_log.warning("error on read: %s" % e)
780 self.close(exc_info=e)
781 return
782 if pos is not None:
783 self._read_from_buffer(pos)
784
785 def _start_read(self) -> Future:
786 if self._read_future is not None:
787 # It is an error to start a read while a prior read is unresolved.
788 # However, if the prior read is unresolved because the stream was
789 # closed without satisfying it, it's better to raise
790 # StreamClosedError instead of AssertionError. In particular, this
791 # situation occurs in harmless situations in http1connection.py and
792 # an AssertionError would be logged noisily.
793 #
794 # On the other hand, it is legal to start a new read while the
795 # stream is closed, in case the read can be satisfied from the
796 # read buffer. So we only want to check the closed status of the
797 # stream if we need to decide what kind of error to raise for
798 # "already reading".
799 #
800 # These conditions have proven difficult to test; we have no
801 # unittests that reliably verify this behavior so be careful
802 # when making changes here. See #2651 and #2719.
803 self._check_closed()
804 assert self._read_future is None, "Already reading"
805 self._read_future = Future()
806 return self._read_future
807
808 def _finish_read(self, size: int) -> None:
809 if self._user_read_buffer:
810 self._read_buffer = self._after_user_read_buffer or bytearray()
811 self._after_user_read_buffer = None
812 self._read_buffer_size = len(self._read_buffer)
813 self._user_read_buffer = False
814 result = size # type: Union[int, bytes]
815 else:
816 result = self._consume(size)
817 if self._read_future is not None:
818 future = self._read_future
819 self._read_future = None
820 future_set_result_unless_cancelled(future, result)
821 self._maybe_add_error_listener()
822
823 def _try_inline_read(self) -> None:
824 """Attempt to complete the current read operation from buffered data.
825
826 If the read can be completed without blocking, schedules the
827 read callback on the next IOLoop iteration; otherwise starts
828 listening for reads on the socket.
829 """
830 # See if we've already got the data from a previous read
831 pos = self._find_read_pos()
832 if pos is not None:
833 self._read_from_buffer(pos)
834 return
835 self._check_closed()
836 pos = self._read_to_buffer_loop()
837 if pos is not None:
838 self._read_from_buffer(pos)
839 return
840 # We couldn't satisfy the read inline, so make sure we're
841 # listening for new data unless the stream is closed.
842 if not self.closed():
843 self._add_io_state(ioloop.IOLoop.READ)
844
845 def _read_to_buffer(self) -> Optional[int]:
846 """Reads from the socket and appends the result to the read buffer.
847
848 Returns the number of bytes read. Returns 0 if there is nothing
849 to read (i.e. the read returns EWOULDBLOCK or equivalent). On
850 error closes the socket and raises an exception.
851 """
852 try:
853 while True:
854 try:
855 if self._user_read_buffer:
856 buf = memoryview(self._read_buffer)[
857 self._read_buffer_size :
858 ] # type: Union[memoryview, bytearray]
859 else:
860 buf = bytearray(self.read_chunk_size)
861 bytes_read = self.read_from_fd(buf)
862 except OSError as e:
863 # ssl.SSLError is a subclass of socket.error
864 if self._is_connreset(e):
865 # Treat ECONNRESET as a connection close rather than
866 # an error to minimize log spam (the exception will
867 # be available on self.error for apps that care).
868 self.close(exc_info=e)
869 return None
870 self.close(exc_info=e)
871 raise
872 break
873 if bytes_read is None:
874 return 0
875 elif bytes_read == 0:
876 self.close()
877 return 0
878 if not self._user_read_buffer:
879 self._read_buffer += memoryview(buf)[:bytes_read]
880 self._read_buffer_size += bytes_read
881 finally:
882 # Break the reference to buf so we don't waste a chunk's worth of
883 # memory in case an exception hangs on to our stack frame.
884 del buf
885 if self._read_buffer_size > self.max_buffer_size:
886 gen_log.error("Reached maximum read buffer size")
887 self.close()
888 raise StreamBufferFullError("Reached maximum read buffer size")
889 return bytes_read
890
891 def _read_from_buffer(self, pos: int) -> None:
892 """Attempts to complete the currently-pending read from the buffer.
893
894 The argument is either a position in the read buffer or None,
895 as returned by _find_read_pos.
896 """
897 self._read_bytes = self._read_delimiter = self._read_regex = None
898 self._read_partial = False
899 self._finish_read(pos)
900
901 def _find_read_pos(self) -> Optional[int]:
902 """Attempts to find a position in the read buffer that satisfies
903 the currently-pending read.
904
905 Returns a position in the buffer if the current read can be satisfied,
906 or None if it cannot.
907 """
908 if self._read_bytes is not None and (
909 self._read_buffer_size >= self._read_bytes
910 or (self._read_partial and self._read_buffer_size > 0)
911 ):
912 num_bytes = min(self._read_bytes, self._read_buffer_size)
913 return num_bytes
914 elif self._read_delimiter is not None:
915 # Multi-byte delimiters (e.g. '\r\n') may straddle two
916 # chunks in the read buffer, so we can't easily find them
917 # without collapsing the buffer. However, since protocols
918 # using delimited reads (as opposed to reads of a known
919 # length) tend to be "line" oriented, the delimiter is likely
920 # to be in the first few chunks. Merge the buffer gradually
921 # since large merges are relatively expensive and get undone in
922 # _consume().
923 if self._read_buffer:
924 loc = self._read_buffer.find(self._read_delimiter)
925 if loc != -1:
926 delimiter_len = len(self._read_delimiter)
927 self._check_max_bytes(self._read_delimiter, loc + delimiter_len)
928 return loc + delimiter_len
929 self._check_max_bytes(self._read_delimiter, self._read_buffer_size)
930 elif self._read_regex is not None:
931 if self._read_buffer:
932 m = self._read_regex.search(self._read_buffer)
933 if m is not None:
934 loc = m.end()
935 self._check_max_bytes(self._read_regex, loc)
936 return loc
937 self._check_max_bytes(self._read_regex, self._read_buffer_size)
938 return None
939
940 def _check_max_bytes(self, delimiter: Union[bytes, Pattern], size: int) -> None:
941 if self._read_max_bytes is not None and size > self._read_max_bytes:
942 raise UnsatisfiableReadError(
943 "delimiter %r not found within %d bytes"
944 % (delimiter, self._read_max_bytes)
945 )
946
947 def _handle_write(self) -> None:
948 while True:
949 size = len(self._write_buffer)
950 if not size:
951 break
952 assert size > 0
953 try:
954 if _WINDOWS:
955 # On windows, socket.send blows up if given a
956 # write buffer that's too large, instead of just
957 # returning the number of bytes it was able to
958 # process. Therefore we must not call socket.send
959 # with more than 128KB at a time.
960 size = 128 * 1024
961
962 num_bytes = self.write_to_fd(self._write_buffer.peek(size))
963 if num_bytes == 0:
964 break
965 self._write_buffer.advance(num_bytes)
966 self._total_write_done_index += num_bytes
967 except BlockingIOError:
968 break
969 except OSError as e:
970 if not self._is_connreset(e):
971 # Broken pipe errors are usually caused by connection
972 # reset, and its better to not log EPIPE errors to
973 # minimize log spam
974 gen_log.warning("Write error on %s: %s", self.fileno(), e)
975 self.close(exc_info=e)
976 return
977
978 while self._write_futures:
979 index, future = self._write_futures[0]
980 if index > self._total_write_done_index:
981 break
982 self._write_futures.popleft()
983 future_set_result_unless_cancelled(future, None)
984
985 def _consume(self, loc: int) -> bytes:
986 # Consume loc bytes from the read buffer and return them
987 if loc == 0:
988 return b""
989 assert loc <= self._read_buffer_size
990 # Slice the bytearray buffer into bytes, without intermediate copying
991 b = (memoryview(self._read_buffer)[:loc]).tobytes()
992 self._read_buffer_size -= loc
993 del self._read_buffer[:loc]
994 return b
995
996 def _check_closed(self) -> None:
997 if self.closed():
998 raise StreamClosedError(real_error=self.error)
999
1000 def _maybe_add_error_listener(self) -> None:
1001 # This method is part of an optimization: to detect a connection that
1002 # is closed when we're not actively reading or writing, we must listen
1003 # for read events. However, it is inefficient to do this when the
1004 # connection is first established because we are going to read or write
1005 # immediately anyway. Instead, we insert checks at various times to
1006 # see if the connection is idle and add the read listener then.
1007 if self._state is None or self._state == ioloop.IOLoop.ERROR:
1008 if (
1009 not self.closed()
1010 and self._read_buffer_size == 0
1011 and self._close_callback is not None
1012 ):
1013 self._add_io_state(ioloop.IOLoop.READ)
1014
1015 def _add_io_state(self, state: int) -> None:
1016 """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
1017
1018 Implementation notes: Reads and writes have a fast path and a
1019 slow path. The fast path reads synchronously from socket
1020 buffers, while the slow path uses `_add_io_state` to schedule
1021 an IOLoop callback.
1022
1023 To detect closed connections, we must have called
1024 `_add_io_state` at some point, but we want to delay this as
1025 much as possible so we don't have to set an `IOLoop.ERROR`
1026 listener that will be overwritten by the next slow-path
1027 operation. If a sequence of fast-path ops do not end in a
1028 slow-path op, (e.g. for an @asynchronous long-poll request),
1029 we must add the error handler.
1030
1031 TODO: reevaluate this now that callbacks are gone.
1032
1033 """
1034 if self.closed():
1035 # connection has been closed, so there can be no future events
1036 return
1037 if self._state is None:
1038 self._state = ioloop.IOLoop.ERROR | state
1039 self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
1040 elif not self._state & state:
1041 self._state = self._state | state
1042 self.io_loop.update_handler(self.fileno(), self._state)
1043
1044 def _is_connreset(self, exc: BaseException) -> bool:
1045 """Return ``True`` if exc is ECONNRESET or equivalent.
1046
1047 May be overridden in subclasses.
1048 """
1049 return (
1050 isinstance(exc, (socket.error, IOError))
1051 and errno_from_exception(exc) in _ERRNO_CONNRESET
1052 )
1053
1054
1055class IOStream(BaseIOStream):
1056 r"""Socket-based `IOStream` implementation.
1057
1058 This class supports the read and write methods from `BaseIOStream`
1059 plus a `connect` method.
1060
1061 The ``socket`` parameter may either be connected or unconnected.
1062 For server operations the socket is the result of calling
1063 `socket.accept <socket.socket.accept>`. For client operations the
1064 socket is created with `socket.socket`, and may either be
1065 connected before passing it to the `IOStream` or connected with
1066 `IOStream.connect`.
1067
1068 A very simple (and broken) HTTP client using this class:
1069
1070 .. testcode::
1071
1072 import socket
1073 import tornado
1074
1075 async def main():
1076 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1077 stream = tornado.iostream.IOStream(s)
1078 await stream.connect(("friendfeed.com", 80))
1079 await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
1080 header_data = await stream.read_until(b"\r\n\r\n")
1081 headers = {}
1082 for line in header_data.split(b"\r\n"):
1083 parts = line.split(b":")
1084 if len(parts) == 2:
1085 headers[parts[0].strip()] = parts[1].strip()
1086 body_data = await stream.read_bytes(int(headers[b"Content-Length"]))
1087 print(body_data)
1088 stream.close()
1089
1090 if __name__ == '__main__':
1091 asyncio.run(main())
1092
1093 """
1094
1095 def __init__(self, socket: socket.socket, *args: Any, **kwargs: Any) -> None:
1096 self.socket = socket
1097 self.socket.setblocking(False)
1098 super().__init__(*args, **kwargs)
1099
1100 def fileno(self) -> Union[int, ioloop._Selectable]:
1101 return self.socket
1102
1103 def close_fd(self) -> None:
1104 self.socket.close()
1105 self.socket = None # type: ignore
1106
1107 def get_fd_error(self) -> Optional[Exception]:
1108 errno = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1109 return socket.error(errno, os.strerror(errno))
1110
1111 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1112 try:
1113 return self.socket.recv_into(buf, len(buf))
1114 except BlockingIOError:
1115 return None
1116 finally:
1117 del buf
1118
1119 def write_to_fd(self, data: memoryview) -> int:
1120 try:
1121 return self.socket.send(data) # type: ignore
1122 finally:
1123 # Avoid keeping to data, which can be a memoryview.
1124 # See https://github.com/tornadoweb/tornado/pull/2008
1125 del data
1126
1127 def connect(
1128 self: _IOStreamType, address: Any, server_hostname: Optional[str] = None
1129 ) -> "Future[_IOStreamType]":
1130 """Connects the socket to a remote address without blocking.
1131
1132 May only be called if the socket passed to the constructor was
1133 not previously connected. The address parameter is in the
1134 same format as for `socket.connect <socket.socket.connect>` for
1135 the type of socket passed to the IOStream constructor,
1136 e.g. an ``(ip, port)`` tuple. Hostnames are accepted here,
1137 but will be resolved synchronously and block the IOLoop.
1138 If you have a hostname instead of an IP address, the `.TCPClient`
1139 class is recommended instead of calling this method directly.
1140 `.TCPClient` will do asynchronous DNS resolution and handle
1141 both IPv4 and IPv6.
1142
1143 If ``callback`` is specified, it will be called with no
1144 arguments when the connection is completed; if not this method
1145 returns a `.Future` (whose result after a successful
1146 connection will be the stream itself).
1147
1148 In SSL mode, the ``server_hostname`` parameter will be used
1149 for certificate validation (unless disabled in the
1150 ``ssl_options``) and SNI.
1151
1152 Note that it is safe to call `IOStream.write
1153 <BaseIOStream.write>` while the connection is pending, in
1154 which case the data will be written as soon as the connection
1155 is ready. Calling `IOStream` read methods before the socket is
1156 connected works on some platforms but is non-portable.
1157
1158 .. versionchanged:: 4.0
1159 If no callback is given, returns a `.Future`.
1160
1161 .. versionchanged:: 4.2
1162 SSL certificates are validated by default; pass
1163 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
1164 suitably-configured `ssl.SSLContext` to the
1165 `SSLIOStream` constructor to disable.
1166
1167 .. versionchanged:: 6.0
1168
1169 The ``callback`` argument was removed. Use the returned
1170 `.Future` instead.
1171
1172 """
1173 self._connecting = True
1174 future = Future() # type: Future[_IOStreamType]
1175 self._connect_future = typing.cast("Future[IOStream]", future)
1176 try:
1177 self.socket.connect(address)
1178 except BlockingIOError:
1179 # In non-blocking mode we expect connect() to raise an
1180 # exception with EINPROGRESS or EWOULDBLOCK.
1181 pass
1182 except OSError as e:
1183 # On freebsd, other errors such as ECONNREFUSED may be
1184 # returned immediately when attempting to connect to
1185 # localhost, so handle them the same way as an error
1186 # reported later in _handle_connect.
1187 if future is None:
1188 gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), e)
1189 self.close(exc_info=e)
1190 return future
1191 self._add_io_state(self.io_loop.WRITE)
1192 return future
1193
1194 def start_tls(
1195 self,
1196 server_side: bool,
1197 ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
1198 server_hostname: Optional[str] = None,
1199 ) -> Awaitable["SSLIOStream"]:
1200 """Convert this `IOStream` to an `SSLIOStream`.
1201
1202 This enables protocols that begin in clear-text mode and
1203 switch to SSL after some initial negotiation (such as the
1204 ``STARTTLS`` extension to SMTP and IMAP).
1205
1206 This method cannot be used if there are outstanding reads
1207 or writes on the stream, or if there is any data in the
1208 IOStream's buffer (data in the operating system's socket
1209 buffer is allowed). This means it must generally be used
1210 immediately after reading or writing the last clear-text
1211 data. It can also be used immediately after connecting,
1212 before any reads or writes.
1213
1214 The ``ssl_options`` argument may be either an `ssl.SSLContext`
1215 object or a dictionary of keyword arguments for the
1216 `ssl.SSLContext.wrap_socket` function. The ``server_hostname`` argument
1217 will be used for certificate validation unless disabled
1218 in the ``ssl_options``.
1219
1220 This method returns a `.Future` whose result is the new
1221 `SSLIOStream`. After this method has been called,
1222 any other operation on the original stream is undefined.
1223
1224 If a close callback is defined on this stream, it will be
1225 transferred to the new stream.
1226
1227 .. versionadded:: 4.0
1228
1229 .. versionchanged:: 4.2
1230 SSL certificates are validated by default; pass
1231 ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
1232 suitably-configured `ssl.SSLContext` to disable.
1233 """
1234 if (
1235 self._read_future
1236 or self._write_futures
1237 or self._connect_future
1238 or self._closed
1239 or self._read_buffer
1240 or self._write_buffer
1241 ):
1242 raise ValueError("IOStream is not idle; cannot convert to SSL")
1243 if ssl_options is None:
1244 if server_side:
1245 ssl_options = _server_ssl_defaults
1246 else:
1247 ssl_options = _client_ssl_defaults
1248
1249 socket = self.socket
1250 self.io_loop.remove_handler(socket)
1251 self.socket = None # type: ignore
1252 socket = ssl_wrap_socket(
1253 socket,
1254 ssl_options,
1255 server_hostname=server_hostname,
1256 server_side=server_side,
1257 do_handshake_on_connect=False,
1258 )
1259 orig_close_callback = self._close_callback
1260 self._close_callback = None
1261
1262 future = Future() # type: Future[SSLIOStream]
1263 ssl_stream = SSLIOStream(socket, ssl_options=ssl_options)
1264 ssl_stream.set_close_callback(orig_close_callback)
1265 ssl_stream._ssl_connect_future = future
1266 ssl_stream.max_buffer_size = self.max_buffer_size
1267 ssl_stream.read_chunk_size = self.read_chunk_size
1268 return future
1269
1270 def _handle_connect(self) -> None:
1271 try:
1272 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1273 except OSError as e:
1274 # Hurd doesn't allow SO_ERROR for loopback sockets because all
1275 # errors for such sockets are reported synchronously.
1276 if errno_from_exception(e) == errno.ENOPROTOOPT:
1277 err = 0
1278 if err != 0:
1279 self.error = socket.error(err, os.strerror(err))
1280 # IOLoop implementations may vary: some of them return
1281 # an error state before the socket becomes writable, so
1282 # in that case a connection failure would be handled by the
1283 # error path in _handle_events instead of here.
1284 if self._connect_future is None:
1285 gen_log.warning(
1286 "Connect error on fd %s: %s",
1287 self.socket.fileno(),
1288 errno.errorcode[err],
1289 )
1290 self.close()
1291 return
1292 if self._connect_future is not None:
1293 future = self._connect_future
1294 self._connect_future = None
1295 future_set_result_unless_cancelled(future, self)
1296 self._connecting = False
1297
1298 def set_nodelay(self, value: bool) -> None:
1299 if self.socket is not None and self.socket.family in (
1300 socket.AF_INET,
1301 socket.AF_INET6,
1302 ):
1303 try:
1304 self.socket.setsockopt(
1305 socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 if value else 0
1306 )
1307 except OSError as e:
1308 # Sometimes setsockopt will fail if the socket is closed
1309 # at the wrong time. This can happen with HTTPServer
1310 # resetting the value to ``False`` between requests.
1311 if e.errno != errno.EINVAL and not self._is_connreset(e):
1312 raise
1313
1314
1315class SSLIOStream(IOStream):
1316 """A utility class to write to and read from a non-blocking SSL socket.
1317
1318 If the socket passed to the constructor is already connected,
1319 it should be wrapped with::
1320
1321 ssl.SSLContext(...).wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
1322
1323 before constructing the `SSLIOStream`. Unconnected sockets will be
1324 wrapped when `IOStream.connect` is finished.
1325 """
1326
1327 socket = None # type: ssl.SSLSocket
1328
1329 def __init__(self, *args: Any, **kwargs: Any) -> None:
1330 """The ``ssl_options`` keyword argument may either be an
1331 `ssl.SSLContext` object or a dictionary of keywords arguments
1332 for `ssl.SSLContext.wrap_socket`
1333 """
1334 self._ssl_options = kwargs.pop("ssl_options", _client_ssl_defaults)
1335 super().__init__(*args, **kwargs)
1336 self._ssl_accepting = True
1337 self._handshake_reading = False
1338 self._handshake_writing = False
1339 self._server_hostname = None # type: Optional[str]
1340
1341 # If the socket is already connected, attempt to start the handshake.
1342 try:
1343 self.socket.getpeername()
1344 except OSError:
1345 pass
1346 else:
1347 # Indirectly start the handshake, which will run on the next
1348 # IOLoop iteration and then the real IO state will be set in
1349 # _handle_events.
1350 self._add_io_state(self.io_loop.WRITE)
1351
1352 def reading(self) -> bool:
1353 return self._handshake_reading or super().reading()
1354
1355 def writing(self) -> bool:
1356 return self._handshake_writing or super().writing()
1357
1358 def _do_ssl_handshake(self) -> None:
1359 # Based on code from test_ssl.py in the python stdlib
1360 try:
1361 self._handshake_reading = False
1362 self._handshake_writing = False
1363 self.socket.do_handshake()
1364 except ssl.SSLError as err:
1365 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
1366 self._handshake_reading = True
1367 return
1368 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
1369 self._handshake_writing = True
1370 return
1371 elif err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
1372 return self.close(exc_info=err)
1373 elif err.args[0] in (ssl.SSL_ERROR_SSL, ssl.SSL_ERROR_SYSCALL):
1374 try:
1375 peer = self.socket.getpeername()
1376 except Exception:
1377 peer = "(not connected)"
1378 gen_log.warning(
1379 "SSL Error on %s %s: %s", self.socket.fileno(), peer, err
1380 )
1381 return self.close(exc_info=err)
1382 raise
1383 except OSError as err:
1384 # Some port scans (e.g. nmap in -sT mode) have been known
1385 # to cause do_handshake to raise EBADF and ENOTCONN, so make
1386 # those errors quiet as well.
1387 # https://groups.google.com/forum/?fromgroups#!topic/python-tornado/ApucKJat1_0
1388 # Errno 0 is also possible in some cases (nc -z).
1389 # https://github.com/tornadoweb/tornado/issues/2504
1390 if self._is_connreset(err) or err.args[0] in (
1391 0,
1392 errno.EBADF,
1393 errno.ENOTCONN,
1394 ):
1395 return self.close(exc_info=err)
1396 raise
1397 except AttributeError as err:
1398 # On Linux, if the connection was reset before the call to
1399 # wrap_socket, do_handshake will fail with an
1400 # AttributeError.
1401 return self.close(exc_info=err)
1402 else:
1403 self._ssl_accepting = False
1404 # Prior to the introduction of SNI, this is where we would check
1405 # the server's claimed hostname.
1406 assert ssl.HAS_SNI
1407 self._finish_ssl_connect()
1408
1409 def _finish_ssl_connect(self) -> None:
1410 if self._ssl_connect_future is not None:
1411 future = self._ssl_connect_future
1412 self._ssl_connect_future = None
1413 future_set_result_unless_cancelled(future, self)
1414
1415 def _handle_read(self) -> None:
1416 if self._ssl_accepting:
1417 self._do_ssl_handshake()
1418 return
1419 super()._handle_read()
1420
1421 def _handle_write(self) -> None:
1422 if self._ssl_accepting:
1423 self._do_ssl_handshake()
1424 return
1425 super()._handle_write()
1426
1427 def connect(
1428 self, address: Tuple, server_hostname: Optional[str] = None
1429 ) -> "Future[SSLIOStream]":
1430 self._server_hostname = server_hostname
1431 # Ignore the result of connect(). If it fails,
1432 # wait_for_handshake will raise an error too. This is
1433 # necessary for the old semantics of the connect callback
1434 # (which takes no arguments). In 6.0 this can be refactored to
1435 # be a regular coroutine.
1436 # TODO: This is trickier than it looks, since if write()
1437 # is called with a connect() pending, we want the connect
1438 # to resolve before the write. Or do we care about this?
1439 # (There's a test for it, but I think in practice users
1440 # either wait for the connect before performing a write or
1441 # they don't care about the connect Future at all)
1442 fut = super().connect(address)
1443 fut.add_done_callback(lambda f: f.exception())
1444 return self.wait_for_handshake()
1445
1446 def _handle_connect(self) -> None:
1447 # Call the superclass method to check for errors.
1448 super()._handle_connect()
1449 if self.closed():
1450 return
1451 # When the connection is complete, wrap the socket for SSL
1452 # traffic. Note that we do this by overriding _handle_connect
1453 # instead of by passing a callback to super().connect because
1454 # user callbacks are enqueued asynchronously on the IOLoop,
1455 # but since _handle_events calls _handle_connect immediately
1456 # followed by _handle_write we need this to be synchronous.
1457 #
1458 # The IOLoop will get confused if we swap out self.socket while the
1459 # fd is registered, so remove it now and re-register after
1460 # wrap_socket().
1461 self.io_loop.remove_handler(self.socket)
1462 old_state = self._state
1463 assert old_state is not None
1464 self._state = None
1465 self.socket = ssl_wrap_socket(
1466 self.socket,
1467 self._ssl_options,
1468 server_hostname=self._server_hostname,
1469 do_handshake_on_connect=False,
1470 server_side=False,
1471 )
1472 self._add_io_state(old_state)
1473
1474 def wait_for_handshake(self) -> "Future[SSLIOStream]":
1475 """Wait for the initial SSL handshake to complete.
1476
1477 If a ``callback`` is given, it will be called with no
1478 arguments once the handshake is complete; otherwise this
1479 method returns a `.Future` which will resolve to the
1480 stream itself after the handshake is complete.
1481
1482 Once the handshake is complete, information such as
1483 the peer's certificate and NPN/ALPN selections may be
1484 accessed on ``self.socket``.
1485
1486 This method is intended for use on server-side streams
1487 or after using `IOStream.start_tls`; it should not be used
1488 with `IOStream.connect` (which already waits for the
1489 handshake to complete). It may only be called once per stream.
1490
1491 .. versionadded:: 4.2
1492
1493 .. versionchanged:: 6.0
1494
1495 The ``callback`` argument was removed. Use the returned
1496 `.Future` instead.
1497
1498 """
1499 if self._ssl_connect_future is not None:
1500 raise RuntimeError("Already waiting")
1501 future = self._ssl_connect_future = Future()
1502 if not self._ssl_accepting:
1503 self._finish_ssl_connect()
1504 return future
1505
1506 def write_to_fd(self, data: memoryview) -> int:
1507 # clip buffer size at 1GB since SSL sockets only support upto 2GB
1508 # this change in behaviour is transparent, since the function is
1509 # already expected to (possibly) write less than the provided buffer
1510 if len(data) >> 30:
1511 data = memoryview(data)[: 1 << 30]
1512 try:
1513 return self.socket.send(data) # type: ignore
1514 except ssl.SSLError as e:
1515 if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
1516 # In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
1517 # the socket is not writeable; we need to transform this into
1518 # an EWOULDBLOCK socket.error or a zero return value,
1519 # either of which will be recognized by the caller of this
1520 # method. Prior to Python 3.5, an unwriteable socket would
1521 # simply return 0 bytes written.
1522 return 0
1523 raise
1524 finally:
1525 # Avoid keeping to data, which can be a memoryview.
1526 # See https://github.com/tornadoweb/tornado/pull/2008
1527 del data
1528
1529 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1530 try:
1531 if self._ssl_accepting:
1532 # If the handshake hasn't finished yet, there can't be anything
1533 # to read (attempting to read may or may not raise an exception
1534 # depending on the SSL version)
1535 return None
1536 # clip buffer size at 1GB since SSL sockets only support upto 2GB
1537 # this change in behaviour is transparent, since the function is
1538 # already expected to (possibly) read less than the provided buffer
1539 if len(buf) >> 30:
1540 buf = memoryview(buf)[: 1 << 30]
1541 try:
1542 return self.socket.recv_into(buf, len(buf))
1543 except ssl.SSLError as e:
1544 # SSLError is a subclass of socket.error, so this except
1545 # block must come first.
1546 if e.args[0] == ssl.SSL_ERROR_WANT_READ:
1547 return None
1548 else:
1549 raise
1550 except BlockingIOError:
1551 return None
1552 finally:
1553 del buf
1554
1555 def _is_connreset(self, e: BaseException) -> bool:
1556 if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF:
1557 return True
1558 return super()._is_connreset(e)
1559
1560
1561class PipeIOStream(BaseIOStream):
1562 """Pipe-based `IOStream` implementation.
1563
1564 The constructor takes an integer file descriptor (such as one returned
1565 by `os.pipe`) rather than an open file object. Pipes are generally
1566 one-way, so a `PipeIOStream` can be used for reading or writing but not
1567 both.
1568
1569 ``PipeIOStream`` is only available on Unix-based platforms.
1570 """
1571
1572 def __init__(self, fd: int, *args: Any, **kwargs: Any) -> None:
1573 self.fd = fd
1574 self._fio = io.FileIO(self.fd, "r+")
1575 if sys.platform == "win32":
1576 # The form and placement of this assertion is important to mypy.
1577 # A plain assert statement isn't recognized here. If the assertion
1578 # were earlier it would worry that the attributes of self aren't
1579 # set on windows. If it were missing it would complain about
1580 # the absence of the set_blocking function.
1581 raise AssertionError("PipeIOStream is not supported on Windows")
1582 os.set_blocking(fd, False)
1583 super().__init__(*args, **kwargs)
1584
1585 def fileno(self) -> int:
1586 return self.fd
1587
1588 def close_fd(self) -> None:
1589 self._fio.close()
1590
1591 def write_to_fd(self, data: memoryview) -> int:
1592 try:
1593 return os.write(self.fd, data) # type: ignore
1594 finally:
1595 # Avoid keeping to data, which can be a memoryview.
1596 # See https://github.com/tornadoweb/tornado/pull/2008
1597 del data
1598
1599 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
1600 try:
1601 return self._fio.readinto(buf) # type: ignore
1602 except OSError as e:
1603 if errno_from_exception(e) == errno.EBADF:
1604 # If the writing half of a pipe is closed, select will
1605 # report it as readable but reads will fail with EBADF.
1606 self.close(exc_info=e)
1607 return None
1608 else:
1609 raise
1610 finally:
1611 del buf
1612
1613
1614def doctests() -> Any:
1615 import doctest
1616
1617 return doctest.DocTestSuite()