Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/selector_events.py: 14%
743 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1"""Event loop using a selector and related classes.
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
7__all__ = 'BaseSelectorEventLoop',
9import collections
10import errno
11import functools
12import selectors
13import socket
14import warnings
15import weakref
16try:
17 import ssl
18except ImportError: # pragma: no cover
19 ssl = None
21from . import base_events
22from . import constants
23from . import events
24from . import futures
25from . import protocols
26from . import sslproto
27from . import transports
28from . import trsock
29from .log import logger
32def _test_selector_event(selector, fd, event):
33 # Test if the selector is monitoring 'event' events
34 # for the file descriptor 'fd'.
35 try:
36 key = selector.get_key(fd)
37 except KeyError:
38 return False
39 else:
40 return bool(key.events & event)
43def _check_ssl_socket(sock):
44 if ssl is not None and isinstance(sock, ssl.SSLSocket):
45 raise TypeError("Socket cannot be of type SSLSocket")
48class BaseSelectorEventLoop(base_events.BaseEventLoop):
49 """Selector event loop.
51 See events.EventLoop for API specification.
52 """
54 def __init__(self, selector=None):
55 super().__init__()
57 if selector is None:
58 selector = selectors.DefaultSelector()
59 logger.debug('Using selector: %s', selector.__class__.__name__)
60 self._selector = selector
61 self._make_self_pipe()
62 self._transports = weakref.WeakValueDictionary()
64 def _make_socket_transport(self, sock, protocol, waiter=None, *,
65 extra=None, server=None):
66 return _SelectorSocketTransport(self, sock, protocol, waiter,
67 extra, server)
69 def _make_ssl_transport(
70 self, rawsock, protocol, sslcontext, waiter=None,
71 *, server_side=False, server_hostname=None,
72 extra=None, server=None,
73 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
74 ssl_protocol = sslproto.SSLProtocol(
75 self, protocol, sslcontext, waiter,
76 server_side, server_hostname,
77 ssl_handshake_timeout=ssl_handshake_timeout)
78 _SelectorSocketTransport(self, rawsock, ssl_protocol,
79 extra=extra, server=server)
80 return ssl_protocol._app_transport
82 def _make_datagram_transport(self, sock, protocol,
83 address=None, waiter=None, extra=None):
84 return _SelectorDatagramTransport(self, sock, protocol,
85 address, waiter, extra)
87 def close(self):
88 if self.is_running():
89 raise RuntimeError("Cannot close a running event loop")
90 if self.is_closed():
91 return
92 self._close_self_pipe()
93 super().close()
94 if self._selector is not None:
95 self._selector.close()
96 self._selector = None
98 def _close_self_pipe(self):
99 self._remove_reader(self._ssock.fileno())
100 self._ssock.close()
101 self._ssock = None
102 self._csock.close()
103 self._csock = None
104 self._internal_fds -= 1
106 def _make_self_pipe(self):
107 # A self-socket, really. :-)
108 self._ssock, self._csock = socket.socketpair()
109 self._ssock.setblocking(False)
110 self._csock.setblocking(False)
111 self._internal_fds += 1
112 self._add_reader(self._ssock.fileno(), self._read_from_self)
114 def _process_self_data(self, data):
115 pass
117 def _read_from_self(self):
118 while True:
119 try:
120 data = self._ssock.recv(4096)
121 if not data:
122 break
123 self._process_self_data(data)
124 except InterruptedError:
125 continue
126 except BlockingIOError:
127 break
129 def _write_to_self(self):
130 # This may be called from a different thread, possibly after
131 # _close_self_pipe() has been called or even while it is
132 # running. Guard for self._csock being None or closed. When
133 # a socket is closed, send() raises OSError (with errno set to
134 # EBADF, but let's not rely on the exact error code).
135 csock = self._csock
136 if csock is not None:
137 try:
138 csock.send(b'\0')
139 except OSError:
140 if self._debug:
141 logger.debug("Fail to write a null byte into the "
142 "self-pipe socket",
143 exc_info=True)
145 def _start_serving(self, protocol_factory, sock,
146 sslcontext=None, server=None, backlog=100,
147 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
148 self._add_reader(sock.fileno(), self._accept_connection,
149 protocol_factory, sock, sslcontext, server, backlog,
150 ssl_handshake_timeout)
152 def _accept_connection(
153 self, protocol_factory, sock,
154 sslcontext=None, server=None, backlog=100,
155 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
156 # This method is only called once for each event loop tick where the
157 # listening socket has triggered an EVENT_READ. There may be multiple
158 # connections waiting for an .accept() so it is called in a loop.
159 # See https://bugs.python.org/issue27906 for more details.
160 for _ in range(backlog):
161 try:
162 conn, addr = sock.accept()
163 if self._debug:
164 logger.debug("%r got a new connection from %r: %r",
165 server, addr, conn)
166 conn.setblocking(False)
167 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
168 # Early exit because the socket accept buffer is empty.
169 return None
170 except OSError as exc:
171 # There's nowhere to send the error, so just log it.
172 if exc.errno in (errno.EMFILE, errno.ENFILE,
173 errno.ENOBUFS, errno.ENOMEM):
174 # Some platforms (e.g. Linux keep reporting the FD as
175 # ready, so we remove the read handler temporarily.
176 # We'll try again in a while.
177 self.call_exception_handler({
178 'message': 'socket.accept() out of system resource',
179 'exception': exc,
180 'socket': trsock.TransportSocket(sock),
181 })
182 self._remove_reader(sock.fileno())
183 self.call_later(constants.ACCEPT_RETRY_DELAY,
184 self._start_serving,
185 protocol_factory, sock, sslcontext, server,
186 backlog, ssl_handshake_timeout)
187 else:
188 raise # The event loop will catch, log and ignore it.
189 else:
190 extra = {'peername': addr}
191 accept = self._accept_connection2(
192 protocol_factory, conn, extra, sslcontext, server,
193 ssl_handshake_timeout)
194 self.create_task(accept)
196 async def _accept_connection2(
197 self, protocol_factory, conn, extra,
198 sslcontext=None, server=None,
199 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
200 protocol = None
201 transport = None
202 try:
203 protocol = protocol_factory()
204 waiter = self.create_future()
205 if sslcontext:
206 transport = self._make_ssl_transport(
207 conn, protocol, sslcontext, waiter=waiter,
208 server_side=True, extra=extra, server=server,
209 ssl_handshake_timeout=ssl_handshake_timeout)
210 else:
211 transport = self._make_socket_transport(
212 conn, protocol, waiter=waiter, extra=extra,
213 server=server)
215 try:
216 await waiter
217 except BaseException:
218 transport.close()
219 raise
220 # It's now up to the protocol to handle the connection.
222 except (SystemExit, KeyboardInterrupt):
223 raise
224 except BaseException as exc:
225 if self._debug:
226 context = {
227 'message':
228 'Error on transport creation for incoming connection',
229 'exception': exc,
230 }
231 if protocol is not None:
232 context['protocol'] = protocol
233 if transport is not None:
234 context['transport'] = transport
235 self.call_exception_handler(context)
237 def _ensure_fd_no_transport(self, fd):
238 fileno = fd
239 if not isinstance(fileno, int):
240 try:
241 fileno = int(fileno.fileno())
242 except (AttributeError, TypeError, ValueError):
243 # This code matches selectors._fileobj_to_fd function.
244 raise ValueError(f"Invalid file object: {fd!r}") from None
245 try:
246 transport = self._transports[fileno]
247 except KeyError:
248 pass
249 else:
250 if not transport.is_closing():
251 raise RuntimeError(
252 f'File descriptor {fd!r} is used by transport '
253 f'{transport!r}')
255 def _add_reader(self, fd, callback, *args):
256 self._check_closed()
257 handle = events.Handle(callback, args, self, None)
258 try:
259 key = self._selector.get_key(fd)
260 except KeyError:
261 self._selector.register(fd, selectors.EVENT_READ,
262 (handle, None))
263 else:
264 mask, (reader, writer) = key.events, key.data
265 self._selector.modify(fd, mask | selectors.EVENT_READ,
266 (handle, writer))
267 if reader is not None:
268 reader.cancel()
270 def _remove_reader(self, fd):
271 if self.is_closed():
272 return False
273 try:
274 key = self._selector.get_key(fd)
275 except KeyError:
276 return False
277 else:
278 mask, (reader, writer) = key.events, key.data
279 mask &= ~selectors.EVENT_READ
280 if not mask:
281 self._selector.unregister(fd)
282 else:
283 self._selector.modify(fd, mask, (None, writer))
285 if reader is not None:
286 reader.cancel()
287 return True
288 else:
289 return False
291 def _add_writer(self, fd, callback, *args):
292 self._check_closed()
293 handle = events.Handle(callback, args, self, None)
294 try:
295 key = self._selector.get_key(fd)
296 except KeyError:
297 self._selector.register(fd, selectors.EVENT_WRITE,
298 (None, handle))
299 else:
300 mask, (reader, writer) = key.events, key.data
301 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
302 (reader, handle))
303 if writer is not None:
304 writer.cancel()
306 def _remove_writer(self, fd):
307 """Remove a writer callback."""
308 if self.is_closed():
309 return False
310 try:
311 key = self._selector.get_key(fd)
312 except KeyError:
313 return False
314 else:
315 mask, (reader, writer) = key.events, key.data
316 # Remove both writer and connector.
317 mask &= ~selectors.EVENT_WRITE
318 if not mask:
319 self._selector.unregister(fd)
320 else:
321 self._selector.modify(fd, mask, (reader, None))
323 if writer is not None:
324 writer.cancel()
325 return True
326 else:
327 return False
329 def add_reader(self, fd, callback, *args):
330 """Add a reader callback."""
331 self._ensure_fd_no_transport(fd)
332 return self._add_reader(fd, callback, *args)
334 def remove_reader(self, fd):
335 """Remove a reader callback."""
336 self._ensure_fd_no_transport(fd)
337 return self._remove_reader(fd)
339 def add_writer(self, fd, callback, *args):
340 """Add a writer callback.."""
341 self._ensure_fd_no_transport(fd)
342 return self._add_writer(fd, callback, *args)
344 def remove_writer(self, fd):
345 """Remove a writer callback."""
346 self._ensure_fd_no_transport(fd)
347 return self._remove_writer(fd)
349 async def sock_recv(self, sock, n):
350 """Receive data from the socket.
352 The return value is a bytes object representing the data received.
353 The maximum amount of data to be received at once is specified by
354 nbytes.
355 """
356 _check_ssl_socket(sock)
357 if self._debug and sock.gettimeout() != 0:
358 raise ValueError("the socket must be non-blocking")
359 try:
360 return sock.recv(n)
361 except (BlockingIOError, InterruptedError):
362 pass
363 fut = self.create_future()
364 fd = sock.fileno()
365 self.add_reader(fd, self._sock_recv, fut, sock, n)
366 fut.add_done_callback(
367 functools.partial(self._sock_read_done, fd))
368 return await fut
370 def _sock_read_done(self, fd, fut):
371 self.remove_reader(fd)
373 def _sock_recv(self, fut, sock, n):
374 # _sock_recv() can add itself as an I/O callback if the operation can't
375 # be done immediately. Don't use it directly, call sock_recv().
376 if fut.done():
377 return
378 try:
379 data = sock.recv(n)
380 except (BlockingIOError, InterruptedError):
381 return # try again next time
382 except (SystemExit, KeyboardInterrupt):
383 raise
384 except BaseException as exc:
385 fut.set_exception(exc)
386 else:
387 fut.set_result(data)
389 async def sock_recv_into(self, sock, buf):
390 """Receive data from the socket.
392 The received data is written into *buf* (a writable buffer).
393 The return value is the number of bytes written.
394 """
395 _check_ssl_socket(sock)
396 if self._debug and sock.gettimeout() != 0:
397 raise ValueError("the socket must be non-blocking")
398 try:
399 return sock.recv_into(buf)
400 except (BlockingIOError, InterruptedError):
401 pass
402 fut = self.create_future()
403 fd = sock.fileno()
404 self.add_reader(fd, self._sock_recv_into, fut, sock, buf)
405 fut.add_done_callback(
406 functools.partial(self._sock_read_done, fd))
407 return await fut
409 def _sock_recv_into(self, fut, sock, buf):
410 # _sock_recv_into() can add itself as an I/O callback if the operation
411 # can't be done immediately. Don't use it directly, call
412 # sock_recv_into().
413 if fut.done():
414 return
415 try:
416 nbytes = sock.recv_into(buf)
417 except (BlockingIOError, InterruptedError):
418 return # try again next time
419 except (SystemExit, KeyboardInterrupt):
420 raise
421 except BaseException as exc:
422 fut.set_exception(exc)
423 else:
424 fut.set_result(nbytes)
426 async def sock_sendall(self, sock, data):
427 """Send data to the socket.
429 The socket must be connected to a remote socket. This method continues
430 to send data from data until either all data has been sent or an
431 error occurs. None is returned on success. On error, an exception is
432 raised, and there is no way to determine how much data, if any, was
433 successfully processed by the receiving end of the connection.
434 """
435 _check_ssl_socket(sock)
436 if self._debug and sock.gettimeout() != 0:
437 raise ValueError("the socket must be non-blocking")
438 try:
439 n = sock.send(data)
440 except (BlockingIOError, InterruptedError):
441 n = 0
443 if n == len(data):
444 # all data sent
445 return
447 fut = self.create_future()
448 fd = sock.fileno()
449 fut.add_done_callback(
450 functools.partial(self._sock_write_done, fd))
451 # use a trick with a list in closure to store a mutable state
452 self.add_writer(fd, self._sock_sendall, fut, sock,
453 memoryview(data), [n])
454 return await fut
456 def _sock_sendall(self, fut, sock, view, pos):
457 if fut.done():
458 # Future cancellation can be scheduled on previous loop iteration
459 return
460 start = pos[0]
461 try:
462 n = sock.send(view[start:])
463 except (BlockingIOError, InterruptedError):
464 return
465 except (SystemExit, KeyboardInterrupt):
466 raise
467 except BaseException as exc:
468 fut.set_exception(exc)
469 return
471 start += n
473 if start == len(view):
474 fut.set_result(None)
475 else:
476 pos[0] = start
478 async def sock_connect(self, sock, address):
479 """Connect to a remote socket at address.
481 This method is a coroutine.
482 """
483 _check_ssl_socket(sock)
484 if self._debug and sock.gettimeout() != 0:
485 raise ValueError("the socket must be non-blocking")
487 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
488 resolved = await self._ensure_resolved(
489 address, family=sock.family, proto=sock.proto, loop=self)
490 _, _, _, _, address = resolved[0]
492 fut = self.create_future()
493 self._sock_connect(fut, sock, address)
494 return await fut
496 def _sock_connect(self, fut, sock, address):
497 fd = sock.fileno()
498 try:
499 sock.connect(address)
500 except (BlockingIOError, InterruptedError):
501 # Issue #23618: When the C function connect() fails with EINTR, the
502 # connection runs in background. We have to wait until the socket
503 # becomes writable to be notified when the connection succeed or
504 # fails.
505 fut.add_done_callback(
506 functools.partial(self._sock_write_done, fd))
507 self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
508 except (SystemExit, KeyboardInterrupt):
509 raise
510 except BaseException as exc:
511 fut.set_exception(exc)
512 else:
513 fut.set_result(None)
515 def _sock_write_done(self, fd, fut):
516 self.remove_writer(fd)
518 def _sock_connect_cb(self, fut, sock, address):
519 if fut.done():
520 return
522 try:
523 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
524 if err != 0:
525 # Jump to any except clause below.
526 raise OSError(err, f'Connect call failed {address}')
527 except (BlockingIOError, InterruptedError):
528 # socket is still registered, the callback will be retried later
529 pass
530 except (SystemExit, KeyboardInterrupt):
531 raise
532 except BaseException as exc:
533 fut.set_exception(exc)
534 else:
535 fut.set_result(None)
537 async def sock_accept(self, sock):
538 """Accept a connection.
540 The socket must be bound to an address and listening for connections.
541 The return value is a pair (conn, address) where conn is a new socket
542 object usable to send and receive data on the connection, and address
543 is the address bound to the socket on the other end of the connection.
544 """
545 _check_ssl_socket(sock)
546 if self._debug and sock.gettimeout() != 0:
547 raise ValueError("the socket must be non-blocking")
548 fut = self.create_future()
549 self._sock_accept(fut, False, sock)
550 return await fut
552 def _sock_accept(self, fut, registered, sock):
553 fd = sock.fileno()
554 if registered:
555 self.remove_reader(fd)
556 if fut.done():
557 return
558 try:
559 conn, address = sock.accept()
560 conn.setblocking(False)
561 except (BlockingIOError, InterruptedError):
562 self.add_reader(fd, self._sock_accept, fut, True, sock)
563 except (SystemExit, KeyboardInterrupt):
564 raise
565 except BaseException as exc:
566 fut.set_exception(exc)
567 else:
568 fut.set_result((conn, address))
570 async def _sendfile_native(self, transp, file, offset, count):
571 del self._transports[transp._sock_fd]
572 resume_reading = transp.is_reading()
573 transp.pause_reading()
574 await transp._make_empty_waiter()
575 try:
576 return await self.sock_sendfile(transp._sock, file, offset, count,
577 fallback=False)
578 finally:
579 transp._reset_empty_waiter()
580 if resume_reading:
581 transp.resume_reading()
582 self._transports[transp._sock_fd] = transp
584 def _process_events(self, event_list):
585 for key, mask in event_list:
586 fileobj, (reader, writer) = key.fileobj, key.data
587 if mask & selectors.EVENT_READ and reader is not None:
588 if reader._cancelled:
589 self._remove_reader(fileobj)
590 else:
591 self._add_callback(reader)
592 if mask & selectors.EVENT_WRITE and writer is not None:
593 if writer._cancelled:
594 self._remove_writer(fileobj)
595 else:
596 self._add_callback(writer)
598 def _stop_serving(self, sock):
599 self._remove_reader(sock.fileno())
600 sock.close()
603class _SelectorTransport(transports._FlowControlMixin,
604 transports.Transport):
606 max_size = 256 * 1024 # Buffer size passed to recv().
608 _buffer_factory = bytearray # Constructs initial value for self._buffer.
610 # Attribute used in the destructor: it must be set even if the constructor
611 # is not called (see _SelectorSslTransport which may start by raising an
612 # exception)
613 _sock = None
615 def __init__(self, loop, sock, protocol, extra=None, server=None):
616 super().__init__(extra, loop)
617 self._extra['socket'] = trsock.TransportSocket(sock)
618 try:
619 self._extra['sockname'] = sock.getsockname()
620 except OSError:
621 self._extra['sockname'] = None
622 if 'peername' not in self._extra:
623 try:
624 self._extra['peername'] = sock.getpeername()
625 except socket.error:
626 self._extra['peername'] = None
627 self._sock = sock
628 self._sock_fd = sock.fileno()
630 self._protocol_connected = False
631 self.set_protocol(protocol)
633 self._server = server
634 self._buffer = self._buffer_factory()
635 self._conn_lost = 0 # Set when call to connection_lost scheduled.
636 self._closing = False # Set when close() called.
637 if self._server is not None:
638 self._server._attach()
639 loop._transports[self._sock_fd] = self
641 def __repr__(self):
642 info = [self.__class__.__name__]
643 if self._sock is None:
644 info.append('closed')
645 elif self._closing:
646 info.append('closing')
647 info.append(f'fd={self._sock_fd}')
648 # test if the transport was closed
649 if self._loop is not None and not self._loop.is_closed():
650 polling = _test_selector_event(self._loop._selector,
651 self._sock_fd, selectors.EVENT_READ)
652 if polling:
653 info.append('read=polling')
654 else:
655 info.append('read=idle')
657 polling = _test_selector_event(self._loop._selector,
658 self._sock_fd,
659 selectors.EVENT_WRITE)
660 if polling:
661 state = 'polling'
662 else:
663 state = 'idle'
665 bufsize = self.get_write_buffer_size()
666 info.append(f'write=<{state}, bufsize={bufsize}>')
667 return '<{}>'.format(' '.join(info))
669 def abort(self):
670 self._force_close(None)
672 def set_protocol(self, protocol):
673 self._protocol = protocol
674 self._protocol_connected = True
676 def get_protocol(self):
677 return self._protocol
679 def is_closing(self):
680 return self._closing
682 def close(self):
683 if self._closing:
684 return
685 self._closing = True
686 self._loop._remove_reader(self._sock_fd)
687 if not self._buffer:
688 self._conn_lost += 1
689 self._loop._remove_writer(self._sock_fd)
690 self._loop.call_soon(self._call_connection_lost, None)
692 def __del__(self, _warn=warnings.warn):
693 if self._sock is not None:
694 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
695 self._sock.close()
697 def _fatal_error(self, exc, message='Fatal error on transport'):
698 # Should be called from exception handler only.
699 if isinstance(exc, OSError):
700 if self._loop.get_debug():
701 logger.debug("%r: %s", self, message, exc_info=True)
702 else:
703 self._loop.call_exception_handler({
704 'message': message,
705 'exception': exc,
706 'transport': self,
707 'protocol': self._protocol,
708 })
709 self._force_close(exc)
711 def _force_close(self, exc):
712 if self._conn_lost:
713 return
714 if self._buffer:
715 self._buffer.clear()
716 self._loop._remove_writer(self._sock_fd)
717 if not self._closing:
718 self._closing = True
719 self._loop._remove_reader(self._sock_fd)
720 self._conn_lost += 1
721 self._loop.call_soon(self._call_connection_lost, exc)
723 def _call_connection_lost(self, exc):
724 try:
725 if self._protocol_connected:
726 self._protocol.connection_lost(exc)
727 finally:
728 self._sock.close()
729 self._sock = None
730 self._protocol = None
731 self._loop = None
732 server = self._server
733 if server is not None:
734 server._detach()
735 self._server = None
737 def get_write_buffer_size(self):
738 return len(self._buffer)
740 def _add_reader(self, fd, callback, *args):
741 if self._closing:
742 return
744 self._loop._add_reader(fd, callback, *args)
747class _SelectorSocketTransport(_SelectorTransport):
749 _start_tls_compatible = True
750 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
752 def __init__(self, loop, sock, protocol, waiter=None,
753 extra=None, server=None):
755 self._read_ready_cb = None
756 super().__init__(loop, sock, protocol, extra, server)
757 self._eof = False
758 self._paused = False
759 self._empty_waiter = None
761 # Disable the Nagle algorithm -- small writes will be
762 # sent without waiting for the TCP ACK. This generally
763 # decreases the latency (in some cases significantly.)
764 base_events._set_nodelay(self._sock)
766 self._loop.call_soon(self._protocol.connection_made, self)
767 # only start reading when connection_made() has been called
768 self._loop.call_soon(self._add_reader,
769 self._sock_fd, self._read_ready)
770 if waiter is not None:
771 # only wake up the waiter when connection_made() has been called
772 self._loop.call_soon(futures._set_result_unless_cancelled,
773 waiter, None)
775 def set_protocol(self, protocol):
776 if isinstance(protocol, protocols.BufferedProtocol):
777 self._read_ready_cb = self._read_ready__get_buffer
778 else:
779 self._read_ready_cb = self._read_ready__data_received
781 super().set_protocol(protocol)
783 def is_reading(self):
784 return not self._paused and not self._closing
786 def pause_reading(self):
787 if self._closing or self._paused:
788 return
789 self._paused = True
790 self._loop._remove_reader(self._sock_fd)
791 if self._loop.get_debug():
792 logger.debug("%r pauses reading", self)
794 def resume_reading(self):
795 if self._closing or not self._paused:
796 return
797 self._paused = False
798 self._add_reader(self._sock_fd, self._read_ready)
799 if self._loop.get_debug():
800 logger.debug("%r resumes reading", self)
802 def _read_ready(self):
803 self._read_ready_cb()
805 def _read_ready__get_buffer(self):
806 if self._conn_lost:
807 return
809 try:
810 buf = self._protocol.get_buffer(-1)
811 if not len(buf):
812 raise RuntimeError('get_buffer() returned an empty buffer')
813 except (SystemExit, KeyboardInterrupt):
814 raise
815 except BaseException as exc:
816 self._fatal_error(
817 exc, 'Fatal error: protocol.get_buffer() call failed.')
818 return
820 try:
821 nbytes = self._sock.recv_into(buf)
822 except (BlockingIOError, InterruptedError):
823 return
824 except (SystemExit, KeyboardInterrupt):
825 raise
826 except BaseException as exc:
827 self._fatal_error(exc, 'Fatal read error on socket transport')
828 return
830 if not nbytes:
831 self._read_ready__on_eof()
832 return
834 try:
835 self._protocol.buffer_updated(nbytes)
836 except (SystemExit, KeyboardInterrupt):
837 raise
838 except BaseException as exc:
839 self._fatal_error(
840 exc, 'Fatal error: protocol.buffer_updated() call failed.')
842 def _read_ready__data_received(self):
843 if self._conn_lost:
844 return
845 try:
846 data = self._sock.recv(self.max_size)
847 except (BlockingIOError, InterruptedError):
848 return
849 except (SystemExit, KeyboardInterrupt):
850 raise
851 except BaseException as exc:
852 self._fatal_error(exc, 'Fatal read error on socket transport')
853 return
855 if not data:
856 self._read_ready__on_eof()
857 return
859 try:
860 self._protocol.data_received(data)
861 except (SystemExit, KeyboardInterrupt):
862 raise
863 except BaseException as exc:
864 self._fatal_error(
865 exc, 'Fatal error: protocol.data_received() call failed.')
867 def _read_ready__on_eof(self):
868 if self._loop.get_debug():
869 logger.debug("%r received EOF", self)
871 try:
872 keep_open = self._protocol.eof_received()
873 except (SystemExit, KeyboardInterrupt):
874 raise
875 except BaseException as exc:
876 self._fatal_error(
877 exc, 'Fatal error: protocol.eof_received() call failed.')
878 return
880 if keep_open:
881 # We're keeping the connection open so the
882 # protocol can write more, but we still can't
883 # receive more, so remove the reader callback.
884 self._loop._remove_reader(self._sock_fd)
885 else:
886 self.close()
888 def write(self, data):
889 if not isinstance(data, (bytes, bytearray, memoryview)):
890 raise TypeError(f'data argument must be a bytes-like object, '
891 f'not {type(data).__name__!r}')
892 if self._eof:
893 raise RuntimeError('Cannot call write() after write_eof()')
894 if self._empty_waiter is not None:
895 raise RuntimeError('unable to write; sendfile is in progress')
896 if not data:
897 return
899 if self._conn_lost:
900 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
901 logger.warning('socket.send() raised exception.')
902 self._conn_lost += 1
903 return
905 if not self._buffer:
906 # Optimization: try to send now.
907 try:
908 n = self._sock.send(data)
909 except (BlockingIOError, InterruptedError):
910 pass
911 except (SystemExit, KeyboardInterrupt):
912 raise
913 except BaseException as exc:
914 self._fatal_error(exc, 'Fatal write error on socket transport')
915 return
916 else:
917 data = data[n:]
918 if not data:
919 return
920 # Not all was written; register write handler.
921 self._loop._add_writer(self._sock_fd, self._write_ready)
923 # Add it to the buffer.
924 self._buffer.extend(data)
925 self._maybe_pause_protocol()
927 def _write_ready(self):
928 assert self._buffer, 'Data should not be empty'
930 if self._conn_lost:
931 return
932 try:
933 n = self._sock.send(self._buffer)
934 except (BlockingIOError, InterruptedError):
935 pass
936 except (SystemExit, KeyboardInterrupt):
937 raise
938 except BaseException as exc:
939 self._loop._remove_writer(self._sock_fd)
940 self._buffer.clear()
941 self._fatal_error(exc, 'Fatal write error on socket transport')
942 if self._empty_waiter is not None:
943 self._empty_waiter.set_exception(exc)
944 else:
945 if n:
946 del self._buffer[:n]
947 self._maybe_resume_protocol() # May append to buffer.
948 if not self._buffer:
949 self._loop._remove_writer(self._sock_fd)
950 if self._empty_waiter is not None:
951 self._empty_waiter.set_result(None)
952 if self._closing:
953 self._call_connection_lost(None)
954 elif self._eof:
955 self._sock.shutdown(socket.SHUT_WR)
957 def write_eof(self):
958 if self._closing or self._eof:
959 return
960 self._eof = True
961 if not self._buffer:
962 self._sock.shutdown(socket.SHUT_WR)
964 def can_write_eof(self):
965 return True
967 def _call_connection_lost(self, exc):
968 super()._call_connection_lost(exc)
969 if self._empty_waiter is not None:
970 self._empty_waiter.set_exception(
971 ConnectionError("Connection is closed by peer"))
973 def _make_empty_waiter(self):
974 if self._empty_waiter is not None:
975 raise RuntimeError("Empty waiter is already set")
976 self._empty_waiter = self._loop.create_future()
977 if not self._buffer:
978 self._empty_waiter.set_result(None)
979 return self._empty_waiter
981 def _reset_empty_waiter(self):
982 self._empty_waiter = None
985class _SelectorDatagramTransport(_SelectorTransport):
987 _buffer_factory = collections.deque
989 def __init__(self, loop, sock, protocol, address=None,
990 waiter=None, extra=None):
991 super().__init__(loop, sock, protocol, extra)
992 self._address = address
993 self._loop.call_soon(self._protocol.connection_made, self)
994 # only start reading when connection_made() has been called
995 self._loop.call_soon(self._add_reader,
996 self._sock_fd, self._read_ready)
997 if waiter is not None:
998 # only wake up the waiter when connection_made() has been called
999 self._loop.call_soon(futures._set_result_unless_cancelled,
1000 waiter, None)
1002 def get_write_buffer_size(self):
1003 return sum(len(data) for data, _ in self._buffer)
1005 def _read_ready(self):
1006 if self._conn_lost:
1007 return
1008 try:
1009 data, addr = self._sock.recvfrom(self.max_size)
1010 except (BlockingIOError, InterruptedError):
1011 pass
1012 except OSError as exc:
1013 self._protocol.error_received(exc)
1014 except (SystemExit, KeyboardInterrupt):
1015 raise
1016 except BaseException as exc:
1017 self._fatal_error(exc, 'Fatal read error on datagram transport')
1018 else:
1019 self._protocol.datagram_received(data, addr)
1021 def sendto(self, data, addr=None):
1022 if not isinstance(data, (bytes, bytearray, memoryview)):
1023 raise TypeError(f'data argument must be a bytes-like object, '
1024 f'not {type(data).__name__!r}')
1025 if not data:
1026 return
1028 if self._address:
1029 if addr not in (None, self._address):
1030 raise ValueError(
1031 f'Invalid address: must be None or {self._address}')
1032 addr = self._address
1034 if self._conn_lost and self._address:
1035 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1036 logger.warning('socket.send() raised exception.')
1037 self._conn_lost += 1
1038 return
1040 if not self._buffer:
1041 # Attempt to send it right away first.
1042 try:
1043 if self._extra['peername']:
1044 self._sock.send(data)
1045 else:
1046 self._sock.sendto(data, addr)
1047 return
1048 except (BlockingIOError, InterruptedError):
1049 self._loop._add_writer(self._sock_fd, self._sendto_ready)
1050 except OSError as exc:
1051 self._protocol.error_received(exc)
1052 return
1053 except (SystemExit, KeyboardInterrupt):
1054 raise
1055 except BaseException as exc:
1056 self._fatal_error(
1057 exc, 'Fatal write error on datagram transport')
1058 return
1060 # Ensure that what we buffer is immutable.
1061 self._buffer.append((bytes(data), addr))
1062 self._maybe_pause_protocol()
1064 def _sendto_ready(self):
1065 while self._buffer:
1066 data, addr = self._buffer.popleft()
1067 try:
1068 if self._extra['peername']:
1069 self._sock.send(data)
1070 else:
1071 self._sock.sendto(data, addr)
1072 except (BlockingIOError, InterruptedError):
1073 self._buffer.appendleft((data, addr)) # Try again later.
1074 break
1075 except OSError as exc:
1076 self._protocol.error_received(exc)
1077 return
1078 except (SystemExit, KeyboardInterrupt):
1079 raise
1080 except BaseException as exc:
1081 self._fatal_error(
1082 exc, 'Fatal write error on datagram transport')
1083 return
1085 self._maybe_resume_protocol() # May append to buffer.
1086 if not self._buffer:
1087 self._loop._remove_writer(self._sock_fd)
1088 if self._closing:
1089 self._call_connection_lost(None)