Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/base_events.py: 13%
1053 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1"""Base implementation of event loop.
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
16import collections
17import collections.abc
18import concurrent.futures
19import functools
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
33try:
34 import ssl
35except ImportError: # pragma: no cover
36 ssl = None
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import transports
48from . import trsock
49from .log import logger
52__all__ = 'BaseEventLoop',
55# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
74def _format_handle(handle):
75 cb = handle._callback
76 if isinstance(getattr(cb, '__self__', None), tasks.Task):
77 # format the task
78 return repr(cb.__self__)
79 else:
80 return str(handle)
83def _format_pipe(fd):
84 if fd == subprocess.PIPE:
85 return '<pipe>'
86 elif fd == subprocess.STDOUT:
87 return '<stdout>'
88 else:
89 return repr(fd)
92def _set_reuseport(sock):
93 if not hasattr(socket, 'SO_REUSEPORT'):
94 raise ValueError('reuse_port not supported by socket module')
95 else:
96 try:
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98 except OSError:
99 raise ValueError('reuse_port not supported by socket module, '
100 'SO_REUSEPORT defined but not implemented.')
103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
104 # Try to skip getaddrinfo if "host" is already an IP. Users might have
105 # handled name resolution in their own code and pass in resolved IPs.
106 if not hasattr(socket, 'inet_pton'):
107 return
109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110 host is None:
111 return None
113 if type == socket.SOCK_STREAM:
114 proto = socket.IPPROTO_TCP
115 elif type == socket.SOCK_DGRAM:
116 proto = socket.IPPROTO_UDP
117 else:
118 return None
120 if port is None:
121 port = 0
122 elif isinstance(port, bytes) and port == b'':
123 port = 0
124 elif isinstance(port, str) and port == '':
125 port = 0
126 else:
127 # If port's a service name like "http", don't skip getaddrinfo.
128 try:
129 port = int(port)
130 except (TypeError, ValueError):
131 return None
133 if family == socket.AF_UNSPEC:
134 afs = [socket.AF_INET]
135 if _HAS_IPv6:
136 afs.append(socket.AF_INET6)
137 else:
138 afs = [family]
140 if isinstance(host, bytes):
141 host = host.decode('idna')
142 if '%' in host:
143 # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144 # like '::1%lo0'.
145 return None
147 for af in afs:
148 try:
149 socket.inet_pton(af, host)
150 # The host has already been resolved.
151 if _HAS_IPv6 and af == socket.AF_INET6:
152 return af, type, proto, '', (host, port, flowinfo, scopeid)
153 else:
154 return af, type, proto, '', (host, port)
155 except OSError:
156 pass
158 # "host" is not an IP address.
159 return None
162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163 """Interleave list of addrinfo tuples by family."""
164 # Group addresses by family
165 addrinfos_by_family = collections.OrderedDict()
166 for addr in addrinfos:
167 family = addr[0]
168 if family not in addrinfos_by_family:
169 addrinfos_by_family[family] = []
170 addrinfos_by_family[family].append(addr)
171 addrinfos_lists = list(addrinfos_by_family.values())
173 reordered = []
174 if first_address_family_count > 1:
175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176 del addrinfos_lists[0][:first_address_family_count - 1]
177 reordered.extend(
178 a for a in itertools.chain.from_iterable(
179 itertools.zip_longest(*addrinfos_lists)
180 ) if a is not None)
181 return reordered
184def _run_until_complete_cb(fut):
185 if not fut.cancelled():
186 exc = fut.exception()
187 if isinstance(exc, (SystemExit, KeyboardInterrupt)):
188 # Issue #22429: run_forever() already finished, no need to
189 # stop it.
190 return
191 futures._get_loop(fut).stop()
194if hasattr(socket, 'TCP_NODELAY'):
195 def _set_nodelay(sock):
196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197 sock.type == socket.SOCK_STREAM and
198 sock.proto == socket.IPPROTO_TCP):
199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201 def _set_nodelay(sock):
202 pass
205class _SendfileFallbackProtocol(protocols.Protocol):
206 def __init__(self, transp):
207 if not isinstance(transp, transports._FlowControlMixin):
208 raise TypeError("transport should be _FlowControlMixin instance")
209 self._transport = transp
210 self._proto = transp.get_protocol()
211 self._should_resume_reading = transp.is_reading()
212 self._should_resume_writing = transp._protocol_paused
213 transp.pause_reading()
214 transp.set_protocol(self)
215 if self._should_resume_writing:
216 self._write_ready_fut = self._transport._loop.create_future()
217 else:
218 self._write_ready_fut = None
220 async def drain(self):
221 if self._transport.is_closing():
222 raise ConnectionError("Connection closed by peer")
223 fut = self._write_ready_fut
224 if fut is None:
225 return
226 await fut
228 def connection_made(self, transport):
229 raise RuntimeError("Invalid state: "
230 "connection should have been established already.")
232 def connection_lost(self, exc):
233 if self._write_ready_fut is not None:
234 # Never happens if peer disconnects after sending the whole content
235 # Thus disconnection is always an exception from user perspective
236 if exc is None:
237 self._write_ready_fut.set_exception(
238 ConnectionError("Connection is closed by peer"))
239 else:
240 self._write_ready_fut.set_exception(exc)
241 self._proto.connection_lost(exc)
243 def pause_writing(self):
244 if self._write_ready_fut is not None:
245 return
246 self._write_ready_fut = self._transport._loop.create_future()
248 def resume_writing(self):
249 if self._write_ready_fut is None:
250 return
251 self._write_ready_fut.set_result(False)
252 self._write_ready_fut = None
254 def data_received(self, data):
255 raise RuntimeError("Invalid state: reading should be paused")
257 def eof_received(self):
258 raise RuntimeError("Invalid state: reading should be paused")
260 async def restore(self):
261 self._transport.set_protocol(self._proto)
262 if self._should_resume_reading:
263 self._transport.resume_reading()
264 if self._write_ready_fut is not None:
265 # Cancel the future.
266 # Basically it has no effect because protocol is switched back,
267 # no code should wait for it anymore.
268 self._write_ready_fut.cancel()
269 if self._should_resume_writing:
270 self._proto.resume_writing()
273class Server(events.AbstractServer):
275 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
276 ssl_handshake_timeout):
277 self._loop = loop
278 self._sockets = sockets
279 self._active_count = 0
280 self._waiters = []
281 self._protocol_factory = protocol_factory
282 self._backlog = backlog
283 self._ssl_context = ssl_context
284 self._ssl_handshake_timeout = ssl_handshake_timeout
285 self._serving = False
286 self._serving_forever_fut = None
288 def __repr__(self):
289 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
291 def _attach(self):
292 assert self._sockets is not None
293 self._active_count += 1
295 def _detach(self):
296 assert self._active_count > 0
297 self._active_count -= 1
298 if self._active_count == 0 and self._sockets is None:
299 self._wakeup()
301 def _wakeup(self):
302 waiters = self._waiters
303 self._waiters = None
304 for waiter in waiters:
305 if not waiter.done():
306 waiter.set_result(waiter)
308 def _start_serving(self):
309 if self._serving:
310 return
311 self._serving = True
312 for sock in self._sockets:
313 sock.listen(self._backlog)
314 self._loop._start_serving(
315 self._protocol_factory, sock, self._ssl_context,
316 self, self._backlog, self._ssl_handshake_timeout)
318 def get_loop(self):
319 return self._loop
321 def is_serving(self):
322 return self._serving
324 @property
325 def sockets(self):
326 if self._sockets is None:
327 return ()
328 return tuple(trsock.TransportSocket(s) for s in self._sockets)
330 def close(self):
331 sockets = self._sockets
332 if sockets is None:
333 return
334 self._sockets = None
336 for sock in sockets:
337 self._loop._stop_serving(sock)
339 self._serving = False
341 if (self._serving_forever_fut is not None and
342 not self._serving_forever_fut.done()):
343 self._serving_forever_fut.cancel()
344 self._serving_forever_fut = None
346 if self._active_count == 0:
347 self._wakeup()
349 async def start_serving(self):
350 self._start_serving()
351 # Skip one loop iteration so that all 'loop.add_reader'
352 # go through.
353 await tasks.sleep(0, loop=self._loop)
355 async def serve_forever(self):
356 if self._serving_forever_fut is not None:
357 raise RuntimeError(
358 f'server {self!r} is already being awaited on serve_forever()')
359 if self._sockets is None:
360 raise RuntimeError(f'server {self!r} is closed')
362 self._start_serving()
363 self._serving_forever_fut = self._loop.create_future()
365 try:
366 await self._serving_forever_fut
367 except exceptions.CancelledError:
368 try:
369 self.close()
370 await self.wait_closed()
371 finally:
372 raise
373 finally:
374 self._serving_forever_fut = None
376 async def wait_closed(self):
377 if self._sockets is None or self._waiters is None:
378 return
379 waiter = self._loop.create_future()
380 self._waiters.append(waiter)
381 await waiter
384class BaseEventLoop(events.AbstractEventLoop):
386 def __init__(self):
387 self._timer_cancelled_count = 0
388 self._closed = False
389 self._stopping = False
390 self._ready = collections.deque()
391 self._scheduled = []
392 self._default_executor = None
393 self._internal_fds = 0
394 # Identifier of the thread running the event loop, or None if the
395 # event loop is not running
396 self._thread_id = None
397 self._clock_resolution = time.get_clock_info('monotonic').resolution
398 self._exception_handler = None
399 self.set_debug(coroutines._is_debug_mode())
400 # In debug mode, if the execution of a callback or a step of a task
401 # exceed this duration in seconds, the slow callback/task is logged.
402 self.slow_callback_duration = 0.1
403 self._current_handle = None
404 self._task_factory = None
405 self._coroutine_origin_tracking_enabled = False
406 self._coroutine_origin_tracking_saved_depth = None
408 # A weak set of all asynchronous generators that are
409 # being iterated by the loop.
410 self._asyncgens = weakref.WeakSet()
411 # Set to True when `loop.shutdown_asyncgens` is called.
412 self._asyncgens_shutdown_called = False
414 def __repr__(self):
415 return (
416 f'<{self.__class__.__name__} running={self.is_running()} '
417 f'closed={self.is_closed()} debug={self.get_debug()}>'
418 )
420 def create_future(self):
421 """Create a Future object attached to the loop."""
422 return futures.Future(loop=self)
424 def create_task(self, coro, *, name=None):
425 """Schedule a coroutine object.
427 Return a task object.
428 """
429 self._check_closed()
430 if self._task_factory is None:
431 task = tasks.Task(coro, loop=self, name=name)
432 if task._source_traceback:
433 del task._source_traceback[-1]
434 else:
435 task = self._task_factory(self, coro)
436 tasks._set_task_name(task, name)
438 return task
440 def set_task_factory(self, factory):
441 """Set a task factory that will be used by loop.create_task().
443 If factory is None the default task factory will be set.
445 If factory is a callable, it should have a signature matching
446 '(loop, coro)', where 'loop' will be a reference to the active
447 event loop, 'coro' will be a coroutine object. The callable
448 must return a Future.
449 """
450 if factory is not None and not callable(factory):
451 raise TypeError('task factory must be a callable or None')
452 self._task_factory = factory
454 def get_task_factory(self):
455 """Return a task factory, or None if the default one is in use."""
456 return self._task_factory
458 def _make_socket_transport(self, sock, protocol, waiter=None, *,
459 extra=None, server=None):
460 """Create socket transport."""
461 raise NotImplementedError
463 def _make_ssl_transport(
464 self, rawsock, protocol, sslcontext, waiter=None,
465 *, server_side=False, server_hostname=None,
466 extra=None, server=None,
467 ssl_handshake_timeout=None,
468 call_connection_made=True):
469 """Create SSL transport."""
470 raise NotImplementedError
472 def _make_datagram_transport(self, sock, protocol,
473 address=None, waiter=None, extra=None):
474 """Create datagram transport."""
475 raise NotImplementedError
477 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
478 extra=None):
479 """Create read pipe transport."""
480 raise NotImplementedError
482 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
483 extra=None):
484 """Create write pipe transport."""
485 raise NotImplementedError
487 async def _make_subprocess_transport(self, protocol, args, shell,
488 stdin, stdout, stderr, bufsize,
489 extra=None, **kwargs):
490 """Create subprocess transport."""
491 raise NotImplementedError
493 def _write_to_self(self):
494 """Write a byte to self-pipe, to wake up the event loop.
496 This may be called from a different thread.
498 The subclass is responsible for implementing the self-pipe.
499 """
500 raise NotImplementedError
502 def _process_events(self, event_list):
503 """Process selector events."""
504 raise NotImplementedError
506 def _check_closed(self):
507 if self._closed:
508 raise RuntimeError('Event loop is closed')
510 def _asyncgen_finalizer_hook(self, agen):
511 self._asyncgens.discard(agen)
512 if not self.is_closed():
513 self.call_soon_threadsafe(self.create_task, agen.aclose())
515 def _asyncgen_firstiter_hook(self, agen):
516 if self._asyncgens_shutdown_called:
517 warnings.warn(
518 f"asynchronous generator {agen!r} was scheduled after "
519 f"loop.shutdown_asyncgens() call",
520 ResourceWarning, source=self)
522 self._asyncgens.add(agen)
524 async def shutdown_asyncgens(self):
525 """Shutdown all active asynchronous generators."""
526 self._asyncgens_shutdown_called = True
528 if not len(self._asyncgens):
529 # If Python version is <3.6 or we don't have any asynchronous
530 # generators alive.
531 return
533 closing_agens = list(self._asyncgens)
534 self._asyncgens.clear()
536 results = await tasks.gather(
537 *[ag.aclose() for ag in closing_agens],
538 return_exceptions=True,
539 loop=self)
541 for result, agen in zip(results, closing_agens):
542 if isinstance(result, Exception):
543 self.call_exception_handler({
544 'message': f'an error occurred during closing of '
545 f'asynchronous generator {agen!r}',
546 'exception': result,
547 'asyncgen': agen
548 })
550 def _check_running(self):
551 if self.is_running():
552 raise RuntimeError('This event loop is already running')
553 if events._get_running_loop() is not None:
554 raise RuntimeError(
555 'Cannot run the event loop while another loop is running')
557 def run_forever(self):
558 """Run until stop() is called."""
559 self._check_closed()
560 self._check_running()
561 self._set_coroutine_origin_tracking(self._debug)
562 self._thread_id = threading.get_ident()
564 old_agen_hooks = sys.get_asyncgen_hooks()
565 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
566 finalizer=self._asyncgen_finalizer_hook)
567 try:
568 events._set_running_loop(self)
569 while True:
570 self._run_once()
571 if self._stopping:
572 break
573 finally:
574 self._stopping = False
575 self._thread_id = None
576 events._set_running_loop(None)
577 self._set_coroutine_origin_tracking(False)
578 sys.set_asyncgen_hooks(*old_agen_hooks)
580 def run_until_complete(self, future):
581 """Run until the Future is done.
583 If the argument is a coroutine, it is wrapped in a Task.
585 WARNING: It would be disastrous to call run_until_complete()
586 with the same coroutine twice -- it would wrap it in two
587 different Tasks and that can't be good.
589 Return the Future's result, or raise its exception.
590 """
591 self._check_closed()
592 self._check_running()
594 new_task = not futures.isfuture(future)
595 future = tasks.ensure_future(future, loop=self)
596 if new_task:
597 # An exception is raised if the future didn't complete, so there
598 # is no need to log the "destroy pending task" message
599 future._log_destroy_pending = False
601 future.add_done_callback(_run_until_complete_cb)
602 try:
603 self.run_forever()
604 except:
605 if new_task and future.done() and not future.cancelled():
606 # The coroutine raised a BaseException. Consume the exception
607 # to not log a warning, the caller doesn't have access to the
608 # local task.
609 future.exception()
610 raise
611 finally:
612 future.remove_done_callback(_run_until_complete_cb)
613 if not future.done():
614 raise RuntimeError('Event loop stopped before Future completed.')
616 return future.result()
618 def stop(self):
619 """Stop running the event loop.
621 Every callback already scheduled will still run. This simply informs
622 run_forever to stop looping after a complete iteration.
623 """
624 self._stopping = True
626 def close(self):
627 """Close the event loop.
629 This clears the queues and shuts down the executor,
630 but does not wait for the executor to finish.
632 The event loop must not be running.
633 """
634 if self.is_running():
635 raise RuntimeError("Cannot close a running event loop")
636 if self._closed:
637 return
638 if self._debug:
639 logger.debug("Close %r", self)
640 self._closed = True
641 self._ready.clear()
642 self._scheduled.clear()
643 executor = self._default_executor
644 if executor is not None:
645 self._default_executor = None
646 executor.shutdown(wait=False)
648 def is_closed(self):
649 """Returns True if the event loop was closed."""
650 return self._closed
652 def __del__(self, _warn=warnings.warn):
653 if not self.is_closed():
654 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
655 if not self.is_running():
656 self.close()
658 def is_running(self):
659 """Returns True if the event loop is running."""
660 return (self._thread_id is not None)
662 def time(self):
663 """Return the time according to the event loop's clock.
665 This is a float expressed in seconds since an epoch, but the
666 epoch, precision, accuracy and drift are unspecified and may
667 differ per event loop.
668 """
669 return time.monotonic()
671 def call_later(self, delay, callback, *args, context=None):
672 """Arrange for a callback to be called at a given time.
674 Return a Handle: an opaque object with a cancel() method that
675 can be used to cancel the call.
677 The delay can be an int or float, expressed in seconds. It is
678 always relative to the current time.
680 Each callback will be called exactly once. If two callbacks
681 are scheduled for exactly the same time, it undefined which
682 will be called first.
684 Any positional arguments after the callback will be passed to
685 the callback when it is called.
686 """
687 timer = self.call_at(self.time() + delay, callback, *args,
688 context=context)
689 if timer._source_traceback:
690 del timer._source_traceback[-1]
691 return timer
693 def call_at(self, when, callback, *args, context=None):
694 """Like call_later(), but uses an absolute time.
696 Absolute time corresponds to the event loop's time() method.
697 """
698 self._check_closed()
699 if self._debug:
700 self._check_thread()
701 self._check_callback(callback, 'call_at')
702 timer = events.TimerHandle(when, callback, args, self, context)
703 if timer._source_traceback:
704 del timer._source_traceback[-1]
705 heapq.heappush(self._scheduled, timer)
706 timer._scheduled = True
707 return timer
709 def call_soon(self, callback, *args, context=None):
710 """Arrange for a callback to be called as soon as possible.
712 This operates as a FIFO queue: callbacks are called in the
713 order in which they are registered. Each callback will be
714 called exactly once.
716 Any positional arguments after the callback will be passed to
717 the callback when it is called.
718 """
719 self._check_closed()
720 if self._debug:
721 self._check_thread()
722 self._check_callback(callback, 'call_soon')
723 handle = self._call_soon(callback, args, context)
724 if handle._source_traceback:
725 del handle._source_traceback[-1]
726 return handle
728 def _check_callback(self, callback, method):
729 if (coroutines.iscoroutine(callback) or
730 coroutines.iscoroutinefunction(callback)):
731 raise TypeError(
732 f"coroutines cannot be used with {method}()")
733 if not callable(callback):
734 raise TypeError(
735 f'a callable object was expected by {method}(), '
736 f'got {callback!r}')
738 def _call_soon(self, callback, args, context):
739 handle = events.Handle(callback, args, self, context)
740 if handle._source_traceback:
741 del handle._source_traceback[-1]
742 self._ready.append(handle)
743 return handle
745 def _check_thread(self):
746 """Check that the current thread is the thread running the event loop.
748 Non-thread-safe methods of this class make this assumption and will
749 likely behave incorrectly when the assumption is violated.
751 Should only be called when (self._debug == True). The caller is
752 responsible for checking this condition for performance reasons.
753 """
754 if self._thread_id is None:
755 return
756 thread_id = threading.get_ident()
757 if thread_id != self._thread_id:
758 raise RuntimeError(
759 "Non-thread-safe operation invoked on an event loop other "
760 "than the current one")
762 def call_soon_threadsafe(self, callback, *args, context=None):
763 """Like call_soon(), but thread-safe."""
764 self._check_closed()
765 if self._debug:
766 self._check_callback(callback, 'call_soon_threadsafe')
767 handle = self._call_soon(callback, args, context)
768 if handle._source_traceback:
769 del handle._source_traceback[-1]
770 self._write_to_self()
771 return handle
773 def run_in_executor(self, executor, func, *args):
774 self._check_closed()
775 if self._debug:
776 self._check_callback(func, 'run_in_executor')
777 if executor is None:
778 executor = self._default_executor
779 if executor is None:
780 executor = concurrent.futures.ThreadPoolExecutor()
781 self._default_executor = executor
782 return futures.wrap_future(
783 executor.submit(func, *args), loop=self)
785 def set_default_executor(self, executor):
786 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
787 warnings.warn(
788 'Using the default executor that is not an instance of '
789 'ThreadPoolExecutor is deprecated and will be prohibited '
790 'in Python 3.9',
791 DeprecationWarning, 2)
792 self._default_executor = executor
794 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
795 msg = [f"{host}:{port!r}"]
796 if family:
797 msg.append(f'family={family!r}')
798 if type:
799 msg.append(f'type={type!r}')
800 if proto:
801 msg.append(f'proto={proto!r}')
802 if flags:
803 msg.append(f'flags={flags!r}')
804 msg = ', '.join(msg)
805 logger.debug('Get address info %s', msg)
807 t0 = self.time()
808 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
809 dt = self.time() - t0
811 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
812 if dt >= self.slow_callback_duration:
813 logger.info(msg)
814 else:
815 logger.debug(msg)
816 return addrinfo
818 async def getaddrinfo(self, host, port, *,
819 family=0, type=0, proto=0, flags=0):
820 if self._debug:
821 getaddr_func = self._getaddrinfo_debug
822 else:
823 getaddr_func = socket.getaddrinfo
825 return await self.run_in_executor(
826 None, getaddr_func, host, port, family, type, proto, flags)
828 async def getnameinfo(self, sockaddr, flags=0):
829 return await self.run_in_executor(
830 None, socket.getnameinfo, sockaddr, flags)
832 async def sock_sendfile(self, sock, file, offset=0, count=None,
833 *, fallback=True):
834 if self._debug and sock.gettimeout() != 0:
835 raise ValueError("the socket must be non-blocking")
836 self._check_sendfile_params(sock, file, offset, count)
837 try:
838 return await self._sock_sendfile_native(sock, file,
839 offset, count)
840 except exceptions.SendfileNotAvailableError as exc:
841 if not fallback:
842 raise
843 return await self._sock_sendfile_fallback(sock, file,
844 offset, count)
846 async def _sock_sendfile_native(self, sock, file, offset, count):
847 # NB: sendfile syscall is not supported for SSL sockets and
848 # non-mmap files even if sendfile is supported by OS
849 raise exceptions.SendfileNotAvailableError(
850 f"syscall sendfile is not available for socket {sock!r} "
851 "and file {file!r} combination")
853 async def _sock_sendfile_fallback(self, sock, file, offset, count):
854 if offset:
855 file.seek(offset)
856 blocksize = (
857 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
858 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
859 )
860 buf = bytearray(blocksize)
861 total_sent = 0
862 try:
863 while True:
864 if count:
865 blocksize = min(count - total_sent, blocksize)
866 if blocksize <= 0:
867 break
868 view = memoryview(buf)[:blocksize]
869 read = await self.run_in_executor(None, file.readinto, view)
870 if not read:
871 break # EOF
872 await self.sock_sendall(sock, view[:read])
873 total_sent += read
874 return total_sent
875 finally:
876 if total_sent > 0 and hasattr(file, 'seek'):
877 file.seek(offset + total_sent)
879 def _check_sendfile_params(self, sock, file, offset, count):
880 if 'b' not in getattr(file, 'mode', 'b'):
881 raise ValueError("file should be opened in binary mode")
882 if not sock.type == socket.SOCK_STREAM:
883 raise ValueError("only SOCK_STREAM type sockets are supported")
884 if count is not None:
885 if not isinstance(count, int):
886 raise TypeError(
887 "count must be a positive integer (got {!r})".format(count))
888 if count <= 0:
889 raise ValueError(
890 "count must be a positive integer (got {!r})".format(count))
891 if not isinstance(offset, int):
892 raise TypeError(
893 "offset must be a non-negative integer (got {!r})".format(
894 offset))
895 if offset < 0:
896 raise ValueError(
897 "offset must be a non-negative integer (got {!r})".format(
898 offset))
900 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
901 """Create, bind and connect one socket."""
902 my_exceptions = []
903 exceptions.append(my_exceptions)
904 family, type_, proto, _, address = addr_info
905 sock = None
906 try:
907 sock = socket.socket(family=family, type=type_, proto=proto)
908 sock.setblocking(False)
909 if local_addr_infos is not None:
910 for _, _, _, _, laddr in local_addr_infos:
911 try:
912 sock.bind(laddr)
913 break
914 except OSError as exc:
915 msg = (
916 f'error while attempting to bind on '
917 f'address {laddr!r}: '
918 f'{exc.strerror.lower()}'
919 )
920 exc = OSError(exc.errno, msg)
921 my_exceptions.append(exc)
922 else: # all bind attempts failed
923 raise my_exceptions.pop()
924 await self.sock_connect(sock, address)
925 return sock
926 except OSError as exc:
927 my_exceptions.append(exc)
928 if sock is not None:
929 sock.close()
930 raise
931 except:
932 if sock is not None:
933 sock.close()
934 raise
936 async def create_connection(
937 self, protocol_factory, host=None, port=None,
938 *, ssl=None, family=0,
939 proto=0, flags=0, sock=None,
940 local_addr=None, server_hostname=None,
941 ssl_handshake_timeout=None,
942 happy_eyeballs_delay=None, interleave=None):
943 """Connect to a TCP server.
945 Create a streaming transport connection to a given Internet host and
946 port: socket family AF_INET or socket.AF_INET6 depending on host (or
947 family if specified), socket type SOCK_STREAM. protocol_factory must be
948 a callable returning a protocol instance.
950 This method is a coroutine which will try to establish the connection
951 in the background. When successful, the coroutine returns a
952 (transport, protocol) pair.
953 """
954 if server_hostname is not None and not ssl:
955 raise ValueError('server_hostname is only meaningful with ssl')
957 if server_hostname is None and ssl:
958 # Use host as default for server_hostname. It is an error
959 # if host is empty or not set, e.g. when an
960 # already-connected socket was passed or when only a port
961 # is given. To avoid this error, you can pass
962 # server_hostname='' -- this will bypass the hostname
963 # check. (This also means that if host is a numeric
964 # IP/IPv6 address, we will attempt to verify that exact
965 # address; this will probably fail, but it is possible to
966 # create a certificate for a specific IP address, so we
967 # don't judge it here.)
968 if not host:
969 raise ValueError('You must set server_hostname '
970 'when using ssl without a host')
971 server_hostname = host
973 if ssl_handshake_timeout is not None and not ssl:
974 raise ValueError(
975 'ssl_handshake_timeout is only meaningful with ssl')
977 if happy_eyeballs_delay is not None and interleave is None:
978 # If using happy eyeballs, default to interleave addresses by family
979 interleave = 1
981 if host is not None or port is not None:
982 if sock is not None:
983 raise ValueError(
984 'host/port and sock can not be specified at the same time')
986 infos = await self._ensure_resolved(
987 (host, port), family=family,
988 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
989 if not infos:
990 raise OSError('getaddrinfo() returned empty list')
992 if local_addr is not None:
993 laddr_infos = await self._ensure_resolved(
994 local_addr, family=family,
995 type=socket.SOCK_STREAM, proto=proto,
996 flags=flags, loop=self)
997 if not laddr_infos:
998 raise OSError('getaddrinfo() returned empty list')
999 else:
1000 laddr_infos = None
1002 if interleave:
1003 infos = _interleave_addrinfos(infos, interleave)
1005 exceptions = []
1006 if happy_eyeballs_delay is None:
1007 # not using happy eyeballs
1008 for addrinfo in infos:
1009 try:
1010 sock = await self._connect_sock(
1011 exceptions, addrinfo, laddr_infos)
1012 break
1013 except OSError:
1014 continue
1015 else: # using happy eyeballs
1016 sock, _, _ = await staggered.staggered_race(
1017 (functools.partial(self._connect_sock,
1018 exceptions, addrinfo, laddr_infos)
1019 for addrinfo in infos),
1020 happy_eyeballs_delay, loop=self)
1022 if sock is None:
1023 exceptions = [exc for sub in exceptions for exc in sub]
1024 if len(exceptions) == 1:
1025 raise exceptions[0]
1026 else:
1027 # If they all have the same str(), raise one.
1028 model = str(exceptions[0])
1029 if all(str(exc) == model for exc in exceptions):
1030 raise exceptions[0]
1031 # Raise a combined exception so the user can see all
1032 # the various error messages.
1033 raise OSError('Multiple exceptions: {}'.format(
1034 ', '.join(str(exc) for exc in exceptions)))
1036 else:
1037 if sock is None:
1038 raise ValueError(
1039 'host and port was not specified and no sock specified')
1040 if sock.type != socket.SOCK_STREAM:
1041 # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1042 # are SOCK_STREAM.
1043 # We support passing AF_UNIX sockets even though we have
1044 # a dedicated API for that: create_unix_connection.
1045 # Disallowing AF_UNIX in this method, breaks backwards
1046 # compatibility.
1047 raise ValueError(
1048 f'A Stream Socket was expected, got {sock!r}')
1050 transport, protocol = await self._create_connection_transport(
1051 sock, protocol_factory, ssl, server_hostname,
1052 ssl_handshake_timeout=ssl_handshake_timeout)
1053 if self._debug:
1054 # Get the socket from the transport because SSL transport closes
1055 # the old socket and creates a new SSL socket
1056 sock = transport.get_extra_info('socket')
1057 logger.debug("%r connected to %s:%r: (%r, %r)",
1058 sock, host, port, transport, protocol)
1059 return transport, protocol
1061 async def _create_connection_transport(
1062 self, sock, protocol_factory, ssl,
1063 server_hostname, server_side=False,
1064 ssl_handshake_timeout=None):
1066 sock.setblocking(False)
1068 protocol = protocol_factory()
1069 waiter = self.create_future()
1070 if ssl:
1071 sslcontext = None if isinstance(ssl, bool) else ssl
1072 transport = self._make_ssl_transport(
1073 sock, protocol, sslcontext, waiter,
1074 server_side=server_side, server_hostname=server_hostname,
1075 ssl_handshake_timeout=ssl_handshake_timeout)
1076 else:
1077 transport = self._make_socket_transport(sock, protocol, waiter)
1079 try:
1080 await waiter
1081 except:
1082 transport.close()
1083 raise
1085 return transport, protocol
1087 async def sendfile(self, transport, file, offset=0, count=None,
1088 *, fallback=True):
1089 """Send a file to transport.
1091 Return the total number of bytes which were sent.
1093 The method uses high-performance os.sendfile if available.
1095 file must be a regular file object opened in binary mode.
1097 offset tells from where to start reading the file. If specified,
1098 count is the total number of bytes to transmit as opposed to
1099 sending the file until EOF is reached. File position is updated on
1100 return or also in case of error in which case file.tell()
1101 can be used to figure out the number of bytes
1102 which were sent.
1104 fallback set to True makes asyncio to manually read and send
1105 the file when the platform does not support the sendfile syscall
1106 (e.g. Windows or SSL socket on Unix).
1108 Raise SendfileNotAvailableError if the system does not support
1109 sendfile syscall and fallback is False.
1110 """
1111 if transport.is_closing():
1112 raise RuntimeError("Transport is closing")
1113 mode = getattr(transport, '_sendfile_compatible',
1114 constants._SendfileMode.UNSUPPORTED)
1115 if mode is constants._SendfileMode.UNSUPPORTED:
1116 raise RuntimeError(
1117 f"sendfile is not supported for transport {transport!r}")
1118 if mode is constants._SendfileMode.TRY_NATIVE:
1119 try:
1120 return await self._sendfile_native(transport, file,
1121 offset, count)
1122 except exceptions.SendfileNotAvailableError as exc:
1123 if not fallback:
1124 raise
1126 if not fallback:
1127 raise RuntimeError(
1128 f"fallback is disabled and native sendfile is not "
1129 f"supported for transport {transport!r}")
1131 return await self._sendfile_fallback(transport, file,
1132 offset, count)
1134 async def _sendfile_native(self, transp, file, offset, count):
1135 raise exceptions.SendfileNotAvailableError(
1136 "sendfile syscall is not supported")
1138 async def _sendfile_fallback(self, transp, file, offset, count):
1139 if offset:
1140 file.seek(offset)
1141 blocksize = min(count, 16384) if count else 16384
1142 buf = bytearray(blocksize)
1143 total_sent = 0
1144 proto = _SendfileFallbackProtocol(transp)
1145 try:
1146 while True:
1147 if count:
1148 blocksize = min(count - total_sent, blocksize)
1149 if blocksize <= 0:
1150 return total_sent
1151 view = memoryview(buf)[:blocksize]
1152 read = await self.run_in_executor(None, file.readinto, view)
1153 if not read:
1154 return total_sent # EOF
1155 await proto.drain()
1156 transp.write(view[:read])
1157 total_sent += read
1158 finally:
1159 if total_sent > 0 and hasattr(file, 'seek'):
1160 file.seek(offset + total_sent)
1161 await proto.restore()
1163 async def start_tls(self, transport, protocol, sslcontext, *,
1164 server_side=False,
1165 server_hostname=None,
1166 ssl_handshake_timeout=None):
1167 """Upgrade transport to TLS.
1169 Return a new transport that *protocol* should start using
1170 immediately.
1171 """
1172 if ssl is None:
1173 raise RuntimeError('Python ssl module is not available')
1175 if not isinstance(sslcontext, ssl.SSLContext):
1176 raise TypeError(
1177 f'sslcontext is expected to be an instance of ssl.SSLContext, '
1178 f'got {sslcontext!r}')
1180 if not getattr(transport, '_start_tls_compatible', False):
1181 raise TypeError(
1182 f'transport {transport!r} is not supported by start_tls()')
1184 waiter = self.create_future()
1185 ssl_protocol = sslproto.SSLProtocol(
1186 self, protocol, sslcontext, waiter,
1187 server_side, server_hostname,
1188 ssl_handshake_timeout=ssl_handshake_timeout,
1189 call_connection_made=False)
1191 # Pause early so that "ssl_protocol.data_received()" doesn't
1192 # have a chance to get called before "ssl_protocol.connection_made()".
1193 transport.pause_reading()
1195 transport.set_protocol(ssl_protocol)
1196 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1197 resume_cb = self.call_soon(transport.resume_reading)
1199 try:
1200 await waiter
1201 except BaseException:
1202 transport.close()
1203 conmade_cb.cancel()
1204 resume_cb.cancel()
1205 raise
1207 return ssl_protocol._app_transport
1209 async def create_datagram_endpoint(self, protocol_factory,
1210 local_addr=None, remote_addr=None, *,
1211 family=0, proto=0, flags=0,
1212 reuse_address=_unset, reuse_port=None,
1213 allow_broadcast=None, sock=None):
1214 """Create datagram connection."""
1215 if sock is not None:
1216 if sock.type != socket.SOCK_DGRAM:
1217 raise ValueError(
1218 f'A UDP Socket was expected, got {sock!r}')
1219 if (local_addr or remote_addr or
1220 family or proto or flags or
1221 reuse_port or allow_broadcast):
1222 # show the problematic kwargs in exception msg
1223 opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1224 family=family, proto=proto, flags=flags,
1225 reuse_address=reuse_address, reuse_port=reuse_port,
1226 allow_broadcast=allow_broadcast)
1227 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1228 raise ValueError(
1229 f'socket modifier keyword arguments can not be used '
1230 f'when sock is specified. ({problems})')
1231 sock.setblocking(False)
1232 r_addr = None
1233 else:
1234 if not (local_addr or remote_addr):
1235 if family == 0:
1236 raise ValueError('unexpected address family')
1237 addr_pairs_info = (((family, proto), (None, None)),)
1238 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1239 for addr in (local_addr, remote_addr):
1240 if addr is not None and not isinstance(addr, str):
1241 raise TypeError('string is expected')
1243 if local_addr and local_addr[0] not in (0, '\x00'):
1244 try:
1245 if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1246 os.remove(local_addr)
1247 except FileNotFoundError:
1248 pass
1249 except OSError as err:
1250 # Directory may have permissions only to create socket.
1251 logger.error('Unable to check or remove stale UNIX '
1252 'socket %r: %r',
1253 local_addr, err)
1255 addr_pairs_info = (((family, proto),
1256 (local_addr, remote_addr)), )
1257 else:
1258 # join address by (family, protocol)
1259 addr_infos = {} # Using order preserving dict
1260 for idx, addr in ((0, local_addr), (1, remote_addr)):
1261 if addr is not None:
1262 assert isinstance(addr, tuple) and len(addr) == 2, (
1263 '2-tuple is expected')
1265 infos = await self._ensure_resolved(
1266 addr, family=family, type=socket.SOCK_DGRAM,
1267 proto=proto, flags=flags, loop=self)
1268 if not infos:
1269 raise OSError('getaddrinfo() returned empty list')
1271 for fam, _, pro, _, address in infos:
1272 key = (fam, pro)
1273 if key not in addr_infos:
1274 addr_infos[key] = [None, None]
1275 addr_infos[key][idx] = address
1277 # each addr has to have info for each (family, proto) pair
1278 addr_pairs_info = [
1279 (key, addr_pair) for key, addr_pair in addr_infos.items()
1280 if not ((local_addr and addr_pair[0] is None) or
1281 (remote_addr and addr_pair[1] is None))]
1283 if not addr_pairs_info:
1284 raise ValueError('can not get address information')
1286 exceptions = []
1288 # bpo-37228
1289 if reuse_address is not _unset:
1290 if reuse_address:
1291 raise ValueError("Passing `reuse_address=True` is no "
1292 "longer supported, as the usage of "
1293 "SO_REUSEPORT in UDP poses a significant "
1294 "security concern.")
1295 else:
1296 warnings.warn("The *reuse_address* parameter has been "
1297 "deprecated as of 3.5.10 and is scheduled "
1298 "for removal in 3.11.", DeprecationWarning,
1299 stacklevel=2)
1301 for ((family, proto),
1302 (local_address, remote_address)) in addr_pairs_info:
1303 sock = None
1304 r_addr = None
1305 try:
1306 sock = socket.socket(
1307 family=family, type=socket.SOCK_DGRAM, proto=proto)
1308 if reuse_port:
1309 _set_reuseport(sock)
1310 if allow_broadcast:
1311 sock.setsockopt(
1312 socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1313 sock.setblocking(False)
1315 if local_addr:
1316 sock.bind(local_address)
1317 if remote_addr:
1318 if not allow_broadcast:
1319 await self.sock_connect(sock, remote_address)
1320 r_addr = remote_address
1321 except OSError as exc:
1322 if sock is not None:
1323 sock.close()
1324 exceptions.append(exc)
1325 except:
1326 if sock is not None:
1327 sock.close()
1328 raise
1329 else:
1330 break
1331 else:
1332 raise exceptions[0]
1334 protocol = protocol_factory()
1335 waiter = self.create_future()
1336 transport = self._make_datagram_transport(
1337 sock, protocol, r_addr, waiter)
1338 if self._debug:
1339 if local_addr:
1340 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1341 "created: (%r, %r)",
1342 local_addr, remote_addr, transport, protocol)
1343 else:
1344 logger.debug("Datagram endpoint remote_addr=%r created: "
1345 "(%r, %r)",
1346 remote_addr, transport, protocol)
1348 try:
1349 await waiter
1350 except:
1351 transport.close()
1352 raise
1354 return transport, protocol
1356 async def _ensure_resolved(self, address, *,
1357 family=0, type=socket.SOCK_STREAM,
1358 proto=0, flags=0, loop):
1359 host, port = address[:2]
1360 info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1361 if info is not None:
1362 # "host" is already a resolved IP.
1363 return [info]
1364 else:
1365 return await loop.getaddrinfo(host, port, family=family, type=type,
1366 proto=proto, flags=flags)
1368 async def _create_server_getaddrinfo(self, host, port, family, flags):
1369 infos = await self._ensure_resolved((host, port), family=family,
1370 type=socket.SOCK_STREAM,
1371 flags=flags, loop=self)
1372 if not infos:
1373 raise OSError(f'getaddrinfo({host!r}) returned empty list')
1374 return infos
1376 async def create_server(
1377 self, protocol_factory, host=None, port=None,
1378 *,
1379 family=socket.AF_UNSPEC,
1380 flags=socket.AI_PASSIVE,
1381 sock=None,
1382 backlog=100,
1383 ssl=None,
1384 reuse_address=None,
1385 reuse_port=None,
1386 ssl_handshake_timeout=None,
1387 start_serving=True):
1388 """Create a TCP server.
1390 The host parameter can be a string, in that case the TCP server is
1391 bound to host and port.
1393 The host parameter can also be a sequence of strings and in that case
1394 the TCP server is bound to all hosts of the sequence. If a host
1395 appears multiple times (possibly indirectly e.g. when hostnames
1396 resolve to the same IP address), the server is only bound once to that
1397 host.
1399 Return a Server object which can be used to stop the service.
1401 This method is a coroutine.
1402 """
1403 if isinstance(ssl, bool):
1404 raise TypeError('ssl argument must be an SSLContext or None')
1406 if ssl_handshake_timeout is not None and ssl is None:
1407 raise ValueError(
1408 'ssl_handshake_timeout is only meaningful with ssl')
1410 if host is not None or port is not None:
1411 if sock is not None:
1412 raise ValueError(
1413 'host/port and sock can not be specified at the same time')
1415 if reuse_address is None:
1416 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1417 sockets = []
1418 if host == '':
1419 hosts = [None]
1420 elif (isinstance(host, str) or
1421 not isinstance(host, collections.abc.Iterable)):
1422 hosts = [host]
1423 else:
1424 hosts = host
1426 fs = [self._create_server_getaddrinfo(host, port, family=family,
1427 flags=flags)
1428 for host in hosts]
1429 infos = await tasks.gather(*fs, loop=self)
1430 infos = set(itertools.chain.from_iterable(infos))
1432 completed = False
1433 try:
1434 for res in infos:
1435 af, socktype, proto, canonname, sa = res
1436 try:
1437 sock = socket.socket(af, socktype, proto)
1438 except socket.error:
1439 # Assume it's a bad family/type/protocol combination.
1440 if self._debug:
1441 logger.warning('create_server() failed to create '
1442 'socket.socket(%r, %r, %r)',
1443 af, socktype, proto, exc_info=True)
1444 continue
1445 sockets.append(sock)
1446 if reuse_address:
1447 sock.setsockopt(
1448 socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1449 if reuse_port:
1450 _set_reuseport(sock)
1451 # Disable IPv4/IPv6 dual stack support (enabled by
1452 # default on Linux) which makes a single socket
1453 # listen on both address families.
1454 if (_HAS_IPv6 and
1455 af == socket.AF_INET6 and
1456 hasattr(socket, 'IPPROTO_IPV6')):
1457 sock.setsockopt(socket.IPPROTO_IPV6,
1458 socket.IPV6_V6ONLY,
1459 True)
1460 try:
1461 sock.bind(sa)
1462 except OSError as err:
1463 raise OSError(err.errno, 'error while attempting '
1464 'to bind on address %r: %s'
1465 % (sa, err.strerror.lower())) from None
1466 completed = True
1467 finally:
1468 if not completed:
1469 for sock in sockets:
1470 sock.close()
1471 else:
1472 if sock is None:
1473 raise ValueError('Neither host/port nor sock were specified')
1474 if sock.type != socket.SOCK_STREAM:
1475 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1476 sockets = [sock]
1478 for sock in sockets:
1479 sock.setblocking(False)
1481 server = Server(self, sockets, protocol_factory,
1482 ssl, backlog, ssl_handshake_timeout)
1483 if start_serving:
1484 server._start_serving()
1485 # Skip one loop iteration so that all 'loop.add_reader'
1486 # go through.
1487 await tasks.sleep(0, loop=self)
1489 if self._debug:
1490 logger.info("%r is serving", server)
1491 return server
1493 async def connect_accepted_socket(
1494 self, protocol_factory, sock,
1495 *, ssl=None,
1496 ssl_handshake_timeout=None):
1497 """Handle an accepted connection.
1499 This is used by servers that accept connections outside of
1500 asyncio but that use asyncio to handle connections.
1502 This method is a coroutine. When completed, the coroutine
1503 returns a (transport, protocol) pair.
1504 """
1505 if sock.type != socket.SOCK_STREAM:
1506 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1508 if ssl_handshake_timeout is not None and not ssl:
1509 raise ValueError(
1510 'ssl_handshake_timeout is only meaningful with ssl')
1512 transport, protocol = await self._create_connection_transport(
1513 sock, protocol_factory, ssl, '', server_side=True,
1514 ssl_handshake_timeout=ssl_handshake_timeout)
1515 if self._debug:
1516 # Get the socket from the transport because SSL transport closes
1517 # the old socket and creates a new SSL socket
1518 sock = transport.get_extra_info('socket')
1519 logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1520 return transport, protocol
1522 async def connect_read_pipe(self, protocol_factory, pipe):
1523 protocol = protocol_factory()
1524 waiter = self.create_future()
1525 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1527 try:
1528 await waiter
1529 except:
1530 transport.close()
1531 raise
1533 if self._debug:
1534 logger.debug('Read pipe %r connected: (%r, %r)',
1535 pipe.fileno(), transport, protocol)
1536 return transport, protocol
1538 async def connect_write_pipe(self, protocol_factory, pipe):
1539 protocol = protocol_factory()
1540 waiter = self.create_future()
1541 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1543 try:
1544 await waiter
1545 except:
1546 transport.close()
1547 raise
1549 if self._debug:
1550 logger.debug('Write pipe %r connected: (%r, %r)',
1551 pipe.fileno(), transport, protocol)
1552 return transport, protocol
1554 def _log_subprocess(self, msg, stdin, stdout, stderr):
1555 info = [msg]
1556 if stdin is not None:
1557 info.append(f'stdin={_format_pipe(stdin)}')
1558 if stdout is not None and stderr == subprocess.STDOUT:
1559 info.append(f'stdout=stderr={_format_pipe(stdout)}')
1560 else:
1561 if stdout is not None:
1562 info.append(f'stdout={_format_pipe(stdout)}')
1563 if stderr is not None:
1564 info.append(f'stderr={_format_pipe(stderr)}')
1565 logger.debug(' '.join(info))
1567 async def subprocess_shell(self, protocol_factory, cmd, *,
1568 stdin=subprocess.PIPE,
1569 stdout=subprocess.PIPE,
1570 stderr=subprocess.PIPE,
1571 universal_newlines=False,
1572 shell=True, bufsize=0,
1573 encoding=None, errors=None, text=None,
1574 **kwargs):
1575 if not isinstance(cmd, (bytes, str)):
1576 raise ValueError("cmd must be a string")
1577 if universal_newlines:
1578 raise ValueError("universal_newlines must be False")
1579 if not shell:
1580 raise ValueError("shell must be True")
1581 if bufsize != 0:
1582 raise ValueError("bufsize must be 0")
1583 if text:
1584 raise ValueError("text must be False")
1585 if encoding is not None:
1586 raise ValueError("encoding must be None")
1587 if errors is not None:
1588 raise ValueError("errors must be None")
1590 protocol = protocol_factory()
1591 debug_log = None
1592 if self._debug:
1593 # don't log parameters: they may contain sensitive information
1594 # (password) and may be too long
1595 debug_log = 'run shell command %r' % cmd
1596 self._log_subprocess(debug_log, stdin, stdout, stderr)
1597 transport = await self._make_subprocess_transport(
1598 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1599 if self._debug and debug_log is not None:
1600 logger.info('%s: %r', debug_log, transport)
1601 return transport, protocol
1603 async def subprocess_exec(self, protocol_factory, program, *args,
1604 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1605 stderr=subprocess.PIPE, universal_newlines=False,
1606 shell=False, bufsize=0,
1607 encoding=None, errors=None, text=None,
1608 **kwargs):
1609 if universal_newlines:
1610 raise ValueError("universal_newlines must be False")
1611 if shell:
1612 raise ValueError("shell must be False")
1613 if bufsize != 0:
1614 raise ValueError("bufsize must be 0")
1615 if text:
1616 raise ValueError("text must be False")
1617 if encoding is not None:
1618 raise ValueError("encoding must be None")
1619 if errors is not None:
1620 raise ValueError("errors must be None")
1622 popen_args = (program,) + args
1623 protocol = protocol_factory()
1624 debug_log = None
1625 if self._debug:
1626 # don't log parameters: they may contain sensitive information
1627 # (password) and may be too long
1628 debug_log = f'execute program {program!r}'
1629 self._log_subprocess(debug_log, stdin, stdout, stderr)
1630 transport = await self._make_subprocess_transport(
1631 protocol, popen_args, False, stdin, stdout, stderr,
1632 bufsize, **kwargs)
1633 if self._debug and debug_log is not None:
1634 logger.info('%s: %r', debug_log, transport)
1635 return transport, protocol
1637 def get_exception_handler(self):
1638 """Return an exception handler, or None if the default one is in use.
1639 """
1640 return self._exception_handler
1642 def set_exception_handler(self, handler):
1643 """Set handler as the new event loop exception handler.
1645 If handler is None, the default exception handler will
1646 be set.
1648 If handler is a callable object, it should have a
1649 signature matching '(loop, context)', where 'loop'
1650 will be a reference to the active event loop, 'context'
1651 will be a dict object (see `call_exception_handler()`
1652 documentation for details about context).
1653 """
1654 if handler is not None and not callable(handler):
1655 raise TypeError(f'A callable object or None is expected, '
1656 f'got {handler!r}')
1657 self._exception_handler = handler
1659 def default_exception_handler(self, context):
1660 """Default exception handler.
1662 This is called when an exception occurs and no exception
1663 handler is set, and can be called by a custom exception
1664 handler that wants to defer to the default behavior.
1666 This default handler logs the error message and other
1667 context-dependent information. In debug mode, a truncated
1668 stack trace is also appended showing where the given object
1669 (e.g. a handle or future or task) was created, if any.
1671 The context parameter has the same meaning as in
1672 `call_exception_handler()`.
1673 """
1674 message = context.get('message')
1675 if not message:
1676 message = 'Unhandled exception in event loop'
1678 exception = context.get('exception')
1679 if exception is not None:
1680 exc_info = (type(exception), exception, exception.__traceback__)
1681 else:
1682 exc_info = False
1684 if ('source_traceback' not in context and
1685 self._current_handle is not None and
1686 self._current_handle._source_traceback):
1687 context['handle_traceback'] = \
1688 self._current_handle._source_traceback
1690 log_lines = [message]
1691 for key in sorted(context):
1692 if key in {'message', 'exception'}:
1693 continue
1694 value = context[key]
1695 if key == 'source_traceback':
1696 tb = ''.join(traceback.format_list(value))
1697 value = 'Object created at (most recent call last):\n'
1698 value += tb.rstrip()
1699 elif key == 'handle_traceback':
1700 tb = ''.join(traceback.format_list(value))
1701 value = 'Handle created at (most recent call last):\n'
1702 value += tb.rstrip()
1703 else:
1704 value = repr(value)
1705 log_lines.append(f'{key}: {value}')
1707 logger.error('\n'.join(log_lines), exc_info=exc_info)
1709 def call_exception_handler(self, context):
1710 """Call the current event loop's exception handler.
1712 The context argument is a dict containing the following keys:
1714 - 'message': Error message;
1715 - 'exception' (optional): Exception object;
1716 - 'future' (optional): Future instance;
1717 - 'task' (optional): Task instance;
1718 - 'handle' (optional): Handle instance;
1719 - 'protocol' (optional): Protocol instance;
1720 - 'transport' (optional): Transport instance;
1721 - 'socket' (optional): Socket instance;
1722 - 'asyncgen' (optional): Asynchronous generator that caused
1723 the exception.
1725 New keys maybe introduced in the future.
1727 Note: do not overload this method in an event loop subclass.
1728 For custom exception handling, use the
1729 `set_exception_handler()` method.
1730 """
1731 if self._exception_handler is None:
1732 try:
1733 self.default_exception_handler(context)
1734 except (SystemExit, KeyboardInterrupt):
1735 raise
1736 except BaseException:
1737 # Second protection layer for unexpected errors
1738 # in the default implementation, as well as for subclassed
1739 # event loops with overloaded "default_exception_handler".
1740 logger.error('Exception in default exception handler',
1741 exc_info=True)
1742 else:
1743 try:
1744 self._exception_handler(self, context)
1745 except (SystemExit, KeyboardInterrupt):
1746 raise
1747 except BaseException as exc:
1748 # Exception in the user set custom exception handler.
1749 try:
1750 # Let's try default handler.
1751 self.default_exception_handler({
1752 'message': 'Unhandled error in exception handler',
1753 'exception': exc,
1754 'context': context,
1755 })
1756 except (SystemExit, KeyboardInterrupt):
1757 raise
1758 except BaseException:
1759 # Guard 'default_exception_handler' in case it is
1760 # overloaded.
1761 logger.error('Exception in default exception handler '
1762 'while handling an unexpected error '
1763 'in custom exception handler',
1764 exc_info=True)
1766 def _add_callback(self, handle):
1767 """Add a Handle to _scheduled (TimerHandle) or _ready."""
1768 assert isinstance(handle, events.Handle), 'A Handle is required here'
1769 if handle._cancelled:
1770 return
1771 assert not isinstance(handle, events.TimerHandle)
1772 self._ready.append(handle)
1774 def _add_callback_signalsafe(self, handle):
1775 """Like _add_callback() but called from a signal handler."""
1776 self._add_callback(handle)
1777 self._write_to_self()
1779 def _timer_handle_cancelled(self, handle):
1780 """Notification that a TimerHandle has been cancelled."""
1781 if handle._scheduled:
1782 self._timer_cancelled_count += 1
1784 def _run_once(self):
1785 """Run one full iteration of the event loop.
1787 This calls all currently ready callbacks, polls for I/O,
1788 schedules the resulting callbacks, and finally schedules
1789 'call_later' callbacks.
1790 """
1792 sched_count = len(self._scheduled)
1793 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1794 self._timer_cancelled_count / sched_count >
1795 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1796 # Remove delayed calls that were cancelled if their number
1797 # is too high
1798 new_scheduled = []
1799 for handle in self._scheduled:
1800 if handle._cancelled:
1801 handle._scheduled = False
1802 else:
1803 new_scheduled.append(handle)
1805 heapq.heapify(new_scheduled)
1806 self._scheduled = new_scheduled
1807 self._timer_cancelled_count = 0
1808 else:
1809 # Remove delayed calls that were cancelled from head of queue.
1810 while self._scheduled and self._scheduled[0]._cancelled:
1811 self._timer_cancelled_count -= 1
1812 handle = heapq.heappop(self._scheduled)
1813 handle._scheduled = False
1815 timeout = None
1816 if self._ready or self._stopping:
1817 timeout = 0
1818 elif self._scheduled:
1819 # Compute the desired timeout.
1820 when = self._scheduled[0]._when
1821 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1823 event_list = self._selector.select(timeout)
1824 self._process_events(event_list)
1826 # Handle 'later' callbacks that are ready.
1827 end_time = self.time() + self._clock_resolution
1828 while self._scheduled:
1829 handle = self._scheduled[0]
1830 if handle._when >= end_time:
1831 break
1832 handle = heapq.heappop(self._scheduled)
1833 handle._scheduled = False
1834 self._ready.append(handle)
1836 # This is the only place where callbacks are actually *called*.
1837 # All other places just add them to ready.
1838 # Note: We run all currently scheduled callbacks, but not any
1839 # callbacks scheduled by callbacks run this time around --
1840 # they will be run the next time (after another I/O poll).
1841 # Use an idiom that is thread-safe without using locks.
1842 ntodo = len(self._ready)
1843 for i in range(ntodo):
1844 handle = self._ready.popleft()
1845 if handle._cancelled:
1846 continue
1847 if self._debug:
1848 try:
1849 self._current_handle = handle
1850 t0 = self.time()
1851 handle._run()
1852 dt = self.time() - t0
1853 if dt >= self.slow_callback_duration:
1854 logger.warning('Executing %s took %.3f seconds',
1855 _format_handle(handle), dt)
1856 finally:
1857 self._current_handle = None
1858 else:
1859 handle._run()
1860 handle = None # Needed to break cycles when an exception occurs.
1862 def _set_coroutine_origin_tracking(self, enabled):
1863 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1864 return
1866 if enabled:
1867 self._coroutine_origin_tracking_saved_depth = (
1868 sys.get_coroutine_origin_tracking_depth())
1869 sys.set_coroutine_origin_tracking_depth(
1870 constants.DEBUG_STACK_DEPTH)
1871 else:
1872 sys.set_coroutine_origin_tracking_depth(
1873 self._coroutine_origin_tracking_saved_depth)
1875 self._coroutine_origin_tracking_enabled = enabled
1877 def get_debug(self):
1878 return self._debug
1880 def set_debug(self, enabled):
1881 self._debug = enabled
1883 if self.is_running():
1884 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)