Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/asyncio/selector_events.py: 14%
750 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
1"""Event loop using a selector and related classes.
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
7__all__ = 'BaseSelectorEventLoop',
9import collections
10import errno
11import functools
12import selectors
13import socket
14import warnings
15import weakref
16try:
17 import ssl
18except ImportError: # pragma: no cover
19 ssl = None
21from . import base_events
22from . import constants
23from . import events
24from . import futures
25from . import protocols
26from . import sslproto
27from . import transports
28from . import trsock
29from .log import logger
32def _test_selector_event(selector, fd, event):
33 # Test if the selector is monitoring 'event' events
34 # for the file descriptor 'fd'.
35 try:
36 key = selector.get_key(fd)
37 except KeyError:
38 return False
39 else:
40 return bool(key.events & event)
43def _check_ssl_socket(sock):
44 if ssl is not None and isinstance(sock, ssl.SSLSocket):
45 raise TypeError("Socket cannot be of type SSLSocket")
48class BaseSelectorEventLoop(base_events.BaseEventLoop):
49 """Selector event loop.
51 See events.EventLoop for API specification.
52 """
54 def __init__(self, selector=None):
55 super().__init__()
57 if selector is None:
58 selector = selectors.DefaultSelector()
59 logger.debug('Using selector: %s', selector.__class__.__name__)
60 self._selector = selector
61 self._make_self_pipe()
62 self._transports = weakref.WeakValueDictionary()
64 def _make_socket_transport(self, sock, protocol, waiter=None, *,
65 extra=None, server=None):
66 return _SelectorSocketTransport(self, sock, protocol, waiter,
67 extra, server)
69 def _make_ssl_transport(
70 self, rawsock, protocol, sslcontext, waiter=None,
71 *, server_side=False, server_hostname=None,
72 extra=None, server=None,
73 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
74 ssl_protocol = sslproto.SSLProtocol(
75 self, protocol, sslcontext, waiter,
76 server_side, server_hostname,
77 ssl_handshake_timeout=ssl_handshake_timeout)
78 _SelectorSocketTransport(self, rawsock, ssl_protocol,
79 extra=extra, server=server)
80 return ssl_protocol._app_transport
82 def _make_datagram_transport(self, sock, protocol,
83 address=None, waiter=None, extra=None):
84 return _SelectorDatagramTransport(self, sock, protocol,
85 address, waiter, extra)
87 def close(self):
88 if self.is_running():
89 raise RuntimeError("Cannot close a running event loop")
90 if self.is_closed():
91 return
92 self._close_self_pipe()
93 super().close()
94 if self._selector is not None:
95 self._selector.close()
96 self._selector = None
98 def _close_self_pipe(self):
99 self._remove_reader(self._ssock.fileno())
100 self._ssock.close()
101 self._ssock = None
102 self._csock.close()
103 self._csock = None
104 self._internal_fds -= 1
106 def _make_self_pipe(self):
107 # A self-socket, really. :-)
108 self._ssock, self._csock = socket.socketpair()
109 self._ssock.setblocking(False)
110 self._csock.setblocking(False)
111 self._internal_fds += 1
112 self._add_reader(self._ssock.fileno(), self._read_from_self)
114 def _process_self_data(self, data):
115 pass
117 def _read_from_self(self):
118 while True:
119 try:
120 data = self._ssock.recv(4096)
121 if not data:
122 break
123 self._process_self_data(data)
124 except InterruptedError:
125 continue
126 except BlockingIOError:
127 break
129 def _write_to_self(self):
130 # This may be called from a different thread, possibly after
131 # _close_self_pipe() has been called or even while it is
132 # running. Guard for self._csock being None or closed. When
133 # a socket is closed, send() raises OSError (with errno set to
134 # EBADF, but let's not rely on the exact error code).
135 csock = self._csock
136 if csock is None:
137 return
139 try:
140 csock.send(b'\0')
141 except OSError:
142 if self._debug:
143 logger.debug("Fail to write a null byte into the "
144 "self-pipe socket",
145 exc_info=True)
147 def _start_serving(self, protocol_factory, sock,
148 sslcontext=None, server=None, backlog=100,
149 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
150 self._add_reader(sock.fileno(), self._accept_connection,
151 protocol_factory, sock, sslcontext, server, backlog,
152 ssl_handshake_timeout)
154 def _accept_connection(
155 self, protocol_factory, sock,
156 sslcontext=None, server=None, backlog=100,
157 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
158 # This method is only called once for each event loop tick where the
159 # listening socket has triggered an EVENT_READ. There may be multiple
160 # connections waiting for an .accept() so it is called in a loop.
161 # See https://bugs.python.org/issue27906 for more details.
162 for _ in range(backlog):
163 try:
164 conn, addr = sock.accept()
165 if self._debug:
166 logger.debug("%r got a new connection from %r: %r",
167 server, addr, conn)
168 conn.setblocking(False)
169 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
170 # Early exit because the socket accept buffer is empty.
171 return None
172 except OSError as exc:
173 # There's nowhere to send the error, so just log it.
174 if exc.errno in (errno.EMFILE, errno.ENFILE,
175 errno.ENOBUFS, errno.ENOMEM):
176 # Some platforms (e.g. Linux keep reporting the FD as
177 # ready, so we remove the read handler temporarily.
178 # We'll try again in a while.
179 self.call_exception_handler({
180 'message': 'socket.accept() out of system resource',
181 'exception': exc,
182 'socket': trsock.TransportSocket(sock),
183 })
184 self._remove_reader(sock.fileno())
185 self.call_later(constants.ACCEPT_RETRY_DELAY,
186 self._start_serving,
187 protocol_factory, sock, sslcontext, server,
188 backlog, ssl_handshake_timeout)
189 else:
190 raise # The event loop will catch, log and ignore it.
191 else:
192 extra = {'peername': addr}
193 accept = self._accept_connection2(
194 protocol_factory, conn, extra, sslcontext, server,
195 ssl_handshake_timeout)
196 self.create_task(accept)
198 async def _accept_connection2(
199 self, protocol_factory, conn, extra,
200 sslcontext=None, server=None,
201 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
202 protocol = None
203 transport = None
204 try:
205 protocol = protocol_factory()
206 waiter = self.create_future()
207 if sslcontext:
208 transport = self._make_ssl_transport(
209 conn, protocol, sslcontext, waiter=waiter,
210 server_side=True, extra=extra, server=server,
211 ssl_handshake_timeout=ssl_handshake_timeout)
212 else:
213 transport = self._make_socket_transport(
214 conn, protocol, waiter=waiter, extra=extra,
215 server=server)
217 try:
218 await waiter
219 except BaseException:
220 transport.close()
221 raise
222 # It's now up to the protocol to handle the connection.
224 except (SystemExit, KeyboardInterrupt):
225 raise
226 except BaseException as exc:
227 if self._debug:
228 context = {
229 'message':
230 'Error on transport creation for incoming connection',
231 'exception': exc,
232 }
233 if protocol is not None:
234 context['protocol'] = protocol
235 if transport is not None:
236 context['transport'] = transport
237 self.call_exception_handler(context)
239 def _ensure_fd_no_transport(self, fd):
240 fileno = fd
241 if not isinstance(fileno, int):
242 try:
243 fileno = int(fileno.fileno())
244 except (AttributeError, TypeError, ValueError):
245 # This code matches selectors._fileobj_to_fd function.
246 raise ValueError(f"Invalid file object: {fd!r}") from None
247 try:
248 transport = self._transports[fileno]
249 except KeyError:
250 pass
251 else:
252 if not transport.is_closing():
253 raise RuntimeError(
254 f'File descriptor {fd!r} is used by transport '
255 f'{transport!r}')
257 def _add_reader(self, fd, callback, *args):
258 self._check_closed()
259 handle = events.Handle(callback, args, self, None)
260 try:
261 key = self._selector.get_key(fd)
262 except KeyError:
263 self._selector.register(fd, selectors.EVENT_READ,
264 (handle, None))
265 else:
266 mask, (reader, writer) = key.events, key.data
267 self._selector.modify(fd, mask | selectors.EVENT_READ,
268 (handle, writer))
269 if reader is not None:
270 reader.cancel()
271 return handle
273 def _remove_reader(self, fd):
274 if self.is_closed():
275 return False
276 try:
277 key = self._selector.get_key(fd)
278 except KeyError:
279 return False
280 else:
281 mask, (reader, writer) = key.events, key.data
282 mask &= ~selectors.EVENT_READ
283 if not mask:
284 self._selector.unregister(fd)
285 else:
286 self._selector.modify(fd, mask, (None, writer))
288 if reader is not None:
289 reader.cancel()
290 return True
291 else:
292 return False
294 def _add_writer(self, fd, callback, *args):
295 self._check_closed()
296 handle = events.Handle(callback, args, self, None)
297 try:
298 key = self._selector.get_key(fd)
299 except KeyError:
300 self._selector.register(fd, selectors.EVENT_WRITE,
301 (None, handle))
302 else:
303 mask, (reader, writer) = key.events, key.data
304 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
305 (reader, handle))
306 if writer is not None:
307 writer.cancel()
308 return handle
310 def _remove_writer(self, fd):
311 """Remove a writer callback."""
312 if self.is_closed():
313 return False
314 try:
315 key = self._selector.get_key(fd)
316 except KeyError:
317 return False
318 else:
319 mask, (reader, writer) = key.events, key.data
320 # Remove both writer and connector.
321 mask &= ~selectors.EVENT_WRITE
322 if not mask:
323 self._selector.unregister(fd)
324 else:
325 self._selector.modify(fd, mask, (reader, None))
327 if writer is not None:
328 writer.cancel()
329 return True
330 else:
331 return False
333 def add_reader(self, fd, callback, *args):
334 """Add a reader callback."""
335 self._ensure_fd_no_transport(fd)
336 self._add_reader(fd, callback, *args)
338 def remove_reader(self, fd):
339 """Remove a reader callback."""
340 self._ensure_fd_no_transport(fd)
341 return self._remove_reader(fd)
343 def add_writer(self, fd, callback, *args):
344 """Add a writer callback.."""
345 self._ensure_fd_no_transport(fd)
346 self._add_writer(fd, callback, *args)
348 def remove_writer(self, fd):
349 """Remove a writer callback."""
350 self._ensure_fd_no_transport(fd)
351 return self._remove_writer(fd)
353 async def sock_recv(self, sock, n):
354 """Receive data from the socket.
356 The return value is a bytes object representing the data received.
357 The maximum amount of data to be received at once is specified by
358 nbytes.
359 """
360 _check_ssl_socket(sock)
361 if self._debug and sock.gettimeout() != 0:
362 raise ValueError("the socket must be non-blocking")
363 try:
364 return sock.recv(n)
365 except (BlockingIOError, InterruptedError):
366 pass
367 fut = self.create_future()
368 fd = sock.fileno()
369 self._ensure_fd_no_transport(fd)
370 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
371 fut.add_done_callback(
372 functools.partial(self._sock_read_done, fd, handle=handle))
373 return await fut
375 def _sock_read_done(self, fd, fut, handle=None):
376 if handle is None or not handle.cancelled():
377 self.remove_reader(fd)
379 def _sock_recv(self, fut, sock, n):
380 # _sock_recv() can add itself as an I/O callback if the operation can't
381 # be done immediately. Don't use it directly, call sock_recv().
382 if fut.done():
383 return
384 try:
385 data = sock.recv(n)
386 except (BlockingIOError, InterruptedError):
387 return # try again next time
388 except (SystemExit, KeyboardInterrupt):
389 raise
390 except BaseException as exc:
391 fut.set_exception(exc)
392 else:
393 fut.set_result(data)
395 async def sock_recv_into(self, sock, buf):
396 """Receive data from the socket.
398 The received data is written into *buf* (a writable buffer).
399 The return value is the number of bytes written.
400 """
401 _check_ssl_socket(sock)
402 if self._debug and sock.gettimeout() != 0:
403 raise ValueError("the socket must be non-blocking")
404 try:
405 return sock.recv_into(buf)
406 except (BlockingIOError, InterruptedError):
407 pass
408 fut = self.create_future()
409 fd = sock.fileno()
410 self._ensure_fd_no_transport(fd)
411 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
412 fut.add_done_callback(
413 functools.partial(self._sock_read_done, fd, handle=handle))
414 return await fut
416 def _sock_recv_into(self, fut, sock, buf):
417 # _sock_recv_into() can add itself as an I/O callback if the operation
418 # can't be done immediately. Don't use it directly, call
419 # sock_recv_into().
420 if fut.done():
421 return
422 try:
423 nbytes = sock.recv_into(buf)
424 except (BlockingIOError, InterruptedError):
425 return # try again next time
426 except (SystemExit, KeyboardInterrupt):
427 raise
428 except BaseException as exc:
429 fut.set_exception(exc)
430 else:
431 fut.set_result(nbytes)
433 async def sock_sendall(self, sock, data):
434 """Send data to the socket.
436 The socket must be connected to a remote socket. This method continues
437 to send data from data until either all data has been sent or an
438 error occurs. None is returned on success. On error, an exception is
439 raised, and there is no way to determine how much data, if any, was
440 successfully processed by the receiving end of the connection.
441 """
442 _check_ssl_socket(sock)
443 if self._debug and sock.gettimeout() != 0:
444 raise ValueError("the socket must be non-blocking")
445 try:
446 n = sock.send(data)
447 except (BlockingIOError, InterruptedError):
448 n = 0
450 if n == len(data):
451 # all data sent
452 return
454 fut = self.create_future()
455 fd = sock.fileno()
456 self._ensure_fd_no_transport(fd)
457 # use a trick with a list in closure to store a mutable state
458 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
459 memoryview(data), [n])
460 fut.add_done_callback(
461 functools.partial(self._sock_write_done, fd, handle=handle))
462 return await fut
464 def _sock_sendall(self, fut, sock, view, pos):
465 if fut.done():
466 # Future cancellation can be scheduled on previous loop iteration
467 return
468 start = pos[0]
469 try:
470 n = sock.send(view[start:])
471 except (BlockingIOError, InterruptedError):
472 return
473 except (SystemExit, KeyboardInterrupt):
474 raise
475 except BaseException as exc:
476 fut.set_exception(exc)
477 return
479 start += n
481 if start == len(view):
482 fut.set_result(None)
483 else:
484 pos[0] = start
486 async def sock_connect(self, sock, address):
487 """Connect to a remote socket at address.
489 This method is a coroutine.
490 """
491 _check_ssl_socket(sock)
492 if self._debug and sock.gettimeout() != 0:
493 raise ValueError("the socket must be non-blocking")
495 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
496 resolved = await self._ensure_resolved(
497 address, family=sock.family, proto=sock.proto, loop=self)
498 _, _, _, _, address = resolved[0]
500 fut = self.create_future()
501 self._sock_connect(fut, sock, address)
502 return await fut
504 def _sock_connect(self, fut, sock, address):
505 fd = sock.fileno()
506 try:
507 sock.connect(address)
508 except (BlockingIOError, InterruptedError):
509 # Issue #23618: When the C function connect() fails with EINTR, the
510 # connection runs in background. We have to wait until the socket
511 # becomes writable to be notified when the connection succeed or
512 # fails.
513 self._ensure_fd_no_transport(fd)
514 handle = self._add_writer(
515 fd, self._sock_connect_cb, fut, sock, address)
516 fut.add_done_callback(
517 functools.partial(self._sock_write_done, fd, handle=handle))
518 except (SystemExit, KeyboardInterrupt):
519 raise
520 except BaseException as exc:
521 fut.set_exception(exc)
522 else:
523 fut.set_result(None)
525 def _sock_write_done(self, fd, fut, handle=None):
526 if handle is None or not handle.cancelled():
527 self.remove_writer(fd)
529 def _sock_connect_cb(self, fut, sock, address):
530 if fut.done():
531 return
533 try:
534 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
535 if err != 0:
536 # Jump to any except clause below.
537 raise OSError(err, f'Connect call failed {address}')
538 except (BlockingIOError, InterruptedError):
539 # socket is still registered, the callback will be retried later
540 pass
541 except (SystemExit, KeyboardInterrupt):
542 raise
543 except BaseException as exc:
544 fut.set_exception(exc)
545 else:
546 fut.set_result(None)
548 async def sock_accept(self, sock):
549 """Accept a connection.
551 The socket must be bound to an address and listening for connections.
552 The return value is a pair (conn, address) where conn is a new socket
553 object usable to send and receive data on the connection, and address
554 is the address bound to the socket on the other end of the connection.
555 """
556 _check_ssl_socket(sock)
557 if self._debug and sock.gettimeout() != 0:
558 raise ValueError("the socket must be non-blocking")
559 fut = self.create_future()
560 self._sock_accept(fut, sock)
561 return await fut
563 def _sock_accept(self, fut, sock):
564 fd = sock.fileno()
565 try:
566 conn, address = sock.accept()
567 conn.setblocking(False)
568 except (BlockingIOError, InterruptedError):
569 self._ensure_fd_no_transport(fd)
570 handle = self._add_reader(fd, self._sock_accept, fut, sock)
571 fut.add_done_callback(
572 functools.partial(self._sock_read_done, fd, handle=handle))
573 except (SystemExit, KeyboardInterrupt):
574 raise
575 except BaseException as exc:
576 fut.set_exception(exc)
577 else:
578 fut.set_result((conn, address))
580 async def _sendfile_native(self, transp, file, offset, count):
581 del self._transports[transp._sock_fd]
582 resume_reading = transp.is_reading()
583 transp.pause_reading()
584 await transp._make_empty_waiter()
585 try:
586 return await self.sock_sendfile(transp._sock, file, offset, count,
587 fallback=False)
588 finally:
589 transp._reset_empty_waiter()
590 if resume_reading:
591 transp.resume_reading()
592 self._transports[transp._sock_fd] = transp
594 def _process_events(self, event_list):
595 for key, mask in event_list:
596 fileobj, (reader, writer) = key.fileobj, key.data
597 if mask & selectors.EVENT_READ and reader is not None:
598 if reader._cancelled:
599 self._remove_reader(fileobj)
600 else:
601 self._add_callback(reader)
602 if mask & selectors.EVENT_WRITE and writer is not None:
603 if writer._cancelled:
604 self._remove_writer(fileobj)
605 else:
606 self._add_callback(writer)
608 def _stop_serving(self, sock):
609 self._remove_reader(sock.fileno())
610 sock.close()
613class _SelectorTransport(transports._FlowControlMixin,
614 transports.Transport):
616 max_size = 256 * 1024 # Buffer size passed to recv().
618 _buffer_factory = bytearray # Constructs initial value for self._buffer.
620 # Attribute used in the destructor: it must be set even if the constructor
621 # is not called (see _SelectorSslTransport which may start by raising an
622 # exception)
623 _sock = None
625 def __init__(self, loop, sock, protocol, extra=None, server=None):
626 super().__init__(extra, loop)
627 self._extra['socket'] = trsock.TransportSocket(sock)
628 try:
629 self._extra['sockname'] = sock.getsockname()
630 except OSError:
631 self._extra['sockname'] = None
632 if 'peername' not in self._extra:
633 try:
634 self._extra['peername'] = sock.getpeername()
635 except socket.error:
636 self._extra['peername'] = None
637 self._sock = sock
638 self._sock_fd = sock.fileno()
640 self._protocol_connected = False
641 self.set_protocol(protocol)
643 self._server = server
644 self._buffer = self._buffer_factory()
645 self._conn_lost = 0 # Set when call to connection_lost scheduled.
646 self._closing = False # Set when close() called.
647 if self._server is not None:
648 self._server._attach()
649 loop._transports[self._sock_fd] = self
651 def __repr__(self):
652 info = [self.__class__.__name__]
653 if self._sock is None:
654 info.append('closed')
655 elif self._closing:
656 info.append('closing')
657 info.append(f'fd={self._sock_fd}')
658 # test if the transport was closed
659 if self._loop is not None and not self._loop.is_closed():
660 polling = _test_selector_event(self._loop._selector,
661 self._sock_fd, selectors.EVENT_READ)
662 if polling:
663 info.append('read=polling')
664 else:
665 info.append('read=idle')
667 polling = _test_selector_event(self._loop._selector,
668 self._sock_fd,
669 selectors.EVENT_WRITE)
670 if polling:
671 state = 'polling'
672 else:
673 state = 'idle'
675 bufsize = self.get_write_buffer_size()
676 info.append(f'write=<{state}, bufsize={bufsize}>')
677 return '<{}>'.format(' '.join(info))
679 def abort(self):
680 self._force_close(None)
682 def set_protocol(self, protocol):
683 self._protocol = protocol
684 self._protocol_connected = True
686 def get_protocol(self):
687 return self._protocol
689 def is_closing(self):
690 return self._closing
692 def close(self):
693 if self._closing:
694 return
695 self._closing = True
696 self._loop._remove_reader(self._sock_fd)
697 if not self._buffer:
698 self._conn_lost += 1
699 self._loop._remove_writer(self._sock_fd)
700 self._loop.call_soon(self._call_connection_lost, None)
702 def __del__(self, _warn=warnings.warn):
703 if self._sock is not None:
704 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
705 self._sock.close()
707 def _fatal_error(self, exc, message='Fatal error on transport'):
708 # Should be called from exception handler only.
709 if isinstance(exc, OSError):
710 if self._loop.get_debug():
711 logger.debug("%r: %s", self, message, exc_info=True)
712 else:
713 self._loop.call_exception_handler({
714 'message': message,
715 'exception': exc,
716 'transport': self,
717 'protocol': self._protocol,
718 })
719 self._force_close(exc)
721 def _force_close(self, exc):
722 if self._conn_lost:
723 return
724 if self._buffer:
725 self._buffer.clear()
726 self._loop._remove_writer(self._sock_fd)
727 if not self._closing:
728 self._closing = True
729 self._loop._remove_reader(self._sock_fd)
730 self._conn_lost += 1
731 self._loop.call_soon(self._call_connection_lost, exc)
733 def _call_connection_lost(self, exc):
734 try:
735 if self._protocol_connected:
736 self._protocol.connection_lost(exc)
737 finally:
738 self._sock.close()
739 self._sock = None
740 self._protocol = None
741 self._loop = None
742 server = self._server
743 if server is not None:
744 server._detach()
745 self._server = None
747 def get_write_buffer_size(self):
748 return len(self._buffer)
750 def _add_reader(self, fd, callback, *args):
751 if self._closing:
752 return
754 self._loop._add_reader(fd, callback, *args)
757class _SelectorSocketTransport(_SelectorTransport):
759 _start_tls_compatible = True
760 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
762 def __init__(self, loop, sock, protocol, waiter=None,
763 extra=None, server=None):
765 self._read_ready_cb = None
766 super().__init__(loop, sock, protocol, extra, server)
767 self._eof = False
768 self._paused = False
769 self._empty_waiter = None
771 # Disable the Nagle algorithm -- small writes will be
772 # sent without waiting for the TCP ACK. This generally
773 # decreases the latency (in some cases significantly.)
774 base_events._set_nodelay(self._sock)
776 self._loop.call_soon(self._protocol.connection_made, self)
777 # only start reading when connection_made() has been called
778 self._loop.call_soon(self._add_reader,
779 self._sock_fd, self._read_ready)
780 if waiter is not None:
781 # only wake up the waiter when connection_made() has been called
782 self._loop.call_soon(futures._set_result_unless_cancelled,
783 waiter, None)
785 def set_protocol(self, protocol):
786 if isinstance(protocol, protocols.BufferedProtocol):
787 self._read_ready_cb = self._read_ready__get_buffer
788 else:
789 self._read_ready_cb = self._read_ready__data_received
791 super().set_protocol(protocol)
793 def is_reading(self):
794 return not self._paused and not self._closing
796 def pause_reading(self):
797 if self._closing or self._paused:
798 return
799 self._paused = True
800 self._loop._remove_reader(self._sock_fd)
801 if self._loop.get_debug():
802 logger.debug("%r pauses reading", self)
804 def resume_reading(self):
805 if self._closing or not self._paused:
806 return
807 self._paused = False
808 self._add_reader(self._sock_fd, self._read_ready)
809 if self._loop.get_debug():
810 logger.debug("%r resumes reading", self)
812 def _read_ready(self):
813 self._read_ready_cb()
815 def _read_ready__get_buffer(self):
816 if self._conn_lost:
817 return
819 try:
820 buf = self._protocol.get_buffer(-1)
821 if not len(buf):
822 raise RuntimeError('get_buffer() returned an empty buffer')
823 except (SystemExit, KeyboardInterrupt):
824 raise
825 except BaseException as exc:
826 self._fatal_error(
827 exc, 'Fatal error: protocol.get_buffer() call failed.')
828 return
830 try:
831 nbytes = self._sock.recv_into(buf)
832 except (BlockingIOError, InterruptedError):
833 return
834 except (SystemExit, KeyboardInterrupt):
835 raise
836 except BaseException as exc:
837 self._fatal_error(exc, 'Fatal read error on socket transport')
838 return
840 if not nbytes:
841 self._read_ready__on_eof()
842 return
844 try:
845 self._protocol.buffer_updated(nbytes)
846 except (SystemExit, KeyboardInterrupt):
847 raise
848 except BaseException as exc:
849 self._fatal_error(
850 exc, 'Fatal error: protocol.buffer_updated() call failed.')
852 def _read_ready__data_received(self):
853 if self._conn_lost:
854 return
855 try:
856 data = self._sock.recv(self.max_size)
857 except (BlockingIOError, InterruptedError):
858 return
859 except (SystemExit, KeyboardInterrupt):
860 raise
861 except BaseException as exc:
862 self._fatal_error(exc, 'Fatal read error on socket transport')
863 return
865 if not data:
866 self._read_ready__on_eof()
867 return
869 try:
870 self._protocol.data_received(data)
871 except (SystemExit, KeyboardInterrupt):
872 raise
873 except BaseException as exc:
874 self._fatal_error(
875 exc, 'Fatal error: protocol.data_received() call failed.')
877 def _read_ready__on_eof(self):
878 if self._loop.get_debug():
879 logger.debug("%r received EOF", self)
881 try:
882 keep_open = self._protocol.eof_received()
883 except (SystemExit, KeyboardInterrupt):
884 raise
885 except BaseException as exc:
886 self._fatal_error(
887 exc, 'Fatal error: protocol.eof_received() call failed.')
888 return
890 if keep_open:
891 # We're keeping the connection open so the
892 # protocol can write more, but we still can't
893 # receive more, so remove the reader callback.
894 self._loop._remove_reader(self._sock_fd)
895 else:
896 self.close()
898 def write(self, data):
899 if not isinstance(data, (bytes, bytearray, memoryview)):
900 raise TypeError(f'data argument must be a bytes-like object, '
901 f'not {type(data).__name__!r}')
902 if self._eof:
903 raise RuntimeError('Cannot call write() after write_eof()')
904 if self._empty_waiter is not None:
905 raise RuntimeError('unable to write; sendfile is in progress')
906 if not data:
907 return
909 if self._conn_lost:
910 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
911 logger.warning('socket.send() raised exception.')
912 self._conn_lost += 1
913 return
915 if not self._buffer:
916 # Optimization: try to send now.
917 try:
918 n = self._sock.send(data)
919 except (BlockingIOError, InterruptedError):
920 pass
921 except (SystemExit, KeyboardInterrupt):
922 raise
923 except BaseException as exc:
924 self._fatal_error(exc, 'Fatal write error on socket transport')
925 return
926 else:
927 data = data[n:]
928 if not data:
929 return
930 # Not all was written; register write handler.
931 self._loop._add_writer(self._sock_fd, self._write_ready)
933 # Add it to the buffer.
934 self._buffer.extend(data)
935 self._maybe_pause_protocol()
937 def _write_ready(self):
938 assert self._buffer, 'Data should not be empty'
940 if self._conn_lost:
941 return
942 try:
943 n = self._sock.send(self._buffer)
944 except (BlockingIOError, InterruptedError):
945 pass
946 except (SystemExit, KeyboardInterrupt):
947 raise
948 except BaseException as exc:
949 self._loop._remove_writer(self._sock_fd)
950 self._buffer.clear()
951 self._fatal_error(exc, 'Fatal write error on socket transport')
952 if self._empty_waiter is not None:
953 self._empty_waiter.set_exception(exc)
954 else:
955 if n:
956 del self._buffer[:n]
957 self._maybe_resume_protocol() # May append to buffer.
958 if not self._buffer:
959 self._loop._remove_writer(self._sock_fd)
960 if self._empty_waiter is not None:
961 self._empty_waiter.set_result(None)
962 if self._closing:
963 self._call_connection_lost(None)
964 elif self._eof:
965 self._sock.shutdown(socket.SHUT_WR)
967 def write_eof(self):
968 if self._closing or self._eof:
969 return
970 self._eof = True
971 if not self._buffer:
972 self._sock.shutdown(socket.SHUT_WR)
974 def can_write_eof(self):
975 return True
977 def _call_connection_lost(self, exc):
978 super()._call_connection_lost(exc)
979 if self._empty_waiter is not None:
980 self._empty_waiter.set_exception(
981 ConnectionError("Connection is closed by peer"))
983 def _make_empty_waiter(self):
984 if self._empty_waiter is not None:
985 raise RuntimeError("Empty waiter is already set")
986 self._empty_waiter = self._loop.create_future()
987 if not self._buffer:
988 self._empty_waiter.set_result(None)
989 return self._empty_waiter
991 def _reset_empty_waiter(self):
992 self._empty_waiter = None
995class _SelectorDatagramTransport(_SelectorTransport):
997 _buffer_factory = collections.deque
999 def __init__(self, loop, sock, protocol, address=None,
1000 waiter=None, extra=None):
1001 super().__init__(loop, sock, protocol, extra)
1002 self._address = address
1003 self._loop.call_soon(self._protocol.connection_made, self)
1004 # only start reading when connection_made() has been called
1005 self._loop.call_soon(self._add_reader,
1006 self._sock_fd, self._read_ready)
1007 if waiter is not None:
1008 # only wake up the waiter when connection_made() has been called
1009 self._loop.call_soon(futures._set_result_unless_cancelled,
1010 waiter, None)
1012 def get_write_buffer_size(self):
1013 return sum(len(data) for data, _ in self._buffer)
1015 def _read_ready(self):
1016 if self._conn_lost:
1017 return
1018 try:
1019 data, addr = self._sock.recvfrom(self.max_size)
1020 except (BlockingIOError, InterruptedError):
1021 pass
1022 except OSError as exc:
1023 self._protocol.error_received(exc)
1024 except (SystemExit, KeyboardInterrupt):
1025 raise
1026 except BaseException as exc:
1027 self._fatal_error(exc, 'Fatal read error on datagram transport')
1028 else:
1029 self._protocol.datagram_received(data, addr)
1031 def sendto(self, data, addr=None):
1032 if not isinstance(data, (bytes, bytearray, memoryview)):
1033 raise TypeError(f'data argument must be a bytes-like object, '
1034 f'not {type(data).__name__!r}')
1035 if not data:
1036 return
1038 if self._address:
1039 if addr not in (None, self._address):
1040 raise ValueError(
1041 f'Invalid address: must be None or {self._address}')
1042 addr = self._address
1044 if self._conn_lost and self._address:
1045 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1046 logger.warning('socket.send() raised exception.')
1047 self._conn_lost += 1
1048 return
1050 if not self._buffer:
1051 # Attempt to send it right away first.
1052 try:
1053 if self._extra['peername']:
1054 self._sock.send(data)
1055 else:
1056 self._sock.sendto(data, addr)
1057 return
1058 except (BlockingIOError, InterruptedError):
1059 self._loop._add_writer(self._sock_fd, self._sendto_ready)
1060 except OSError as exc:
1061 self._protocol.error_received(exc)
1062 return
1063 except (SystemExit, KeyboardInterrupt):
1064 raise
1065 except BaseException as exc:
1066 self._fatal_error(
1067 exc, 'Fatal write error on datagram transport')
1068 return
1070 # Ensure that what we buffer is immutable.
1071 self._buffer.append((bytes(data), addr))
1072 self._maybe_pause_protocol()
1074 def _sendto_ready(self):
1075 while self._buffer:
1076 data, addr = self._buffer.popleft()
1077 try:
1078 if self._extra['peername']:
1079 self._sock.send(data)
1080 else:
1081 self._sock.sendto(data, addr)
1082 except (BlockingIOError, InterruptedError):
1083 self._buffer.appendleft((data, addr)) # Try again later.
1084 break
1085 except OSError as exc:
1086 self._protocol.error_received(exc)
1087 return
1088 except (SystemExit, KeyboardInterrupt):
1089 raise
1090 except BaseException as exc:
1091 self._fatal_error(
1092 exc, 'Fatal write error on datagram transport')
1093 return
1095 self._maybe_resume_protocol() # May append to buffer.
1096 if not self._buffer:
1097 self._loop._remove_writer(self._sock_fd)
1098 if self._closing:
1099 self._call_connection_lost(None)