Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/unix_events.py: 18%

802 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1"""Selector event loop for Unix with signal handling.""" 

2 

3import errno 

4import io 

5import itertools 

6import os 

7import selectors 

8import signal 

9import socket 

10import stat 

11import subprocess 

12import sys 

13import threading 

14import warnings 

15 

16from . import base_events 

17from . import base_subprocess 

18from . import constants 

19from . import coroutines 

20from . import events 

21from . import exceptions 

22from . import futures 

23from . import selector_events 

24from . import tasks 

25from . import transports 

26from .log import logger 

27 

28 

29__all__ = ( 

30 'SelectorEventLoop', 

31 'AbstractChildWatcher', 'SafeChildWatcher', 

32 'FastChildWatcher', 

33 'MultiLoopChildWatcher', 'ThreadedChildWatcher', 

34 'DefaultEventLoopPolicy', 

35) 

36 

37 

38if sys.platform == 'win32': # pragma: no cover 

39 raise ImportError('Signals are not really supported on Windows') 

40 

41 

42def _sighandler_noop(signum, frame): 

43 """Dummy signal handler.""" 

44 pass 

45 

46 

47class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 

48 """Unix event loop. 

49 

50 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. 

51 """ 

52 

53 def __init__(self, selector=None): 

54 super().__init__(selector) 

55 self._signal_handlers = {} 

56 

57 def close(self): 

58 super().close() 

59 if not sys.is_finalizing(): 

60 for sig in list(self._signal_handlers): 

61 self.remove_signal_handler(sig) 

62 else: 

63 if self._signal_handlers: 

64 warnings.warn(f"Closing the loop {self!r} " 

65 f"on interpreter shutdown " 

66 f"stage, skipping signal handlers removal", 

67 ResourceWarning, 

68 source=self) 

69 self._signal_handlers.clear() 

70 

71 def _process_self_data(self, data): 

72 for signum in data: 

73 if not signum: 

74 # ignore null bytes written by _write_to_self() 

75 continue 

76 self._handle_signal(signum) 

77 

78 def add_signal_handler(self, sig, callback, *args): 

79 """Add a handler for a signal. UNIX only. 

80 

81 Raise ValueError if the signal number is invalid or uncatchable. 

82 Raise RuntimeError if there is a problem setting up the handler. 

83 """ 

84 if (coroutines.iscoroutine(callback) or 

85 coroutines.iscoroutinefunction(callback)): 

86 raise TypeError("coroutines cannot be used " 

87 "with add_signal_handler()") 

88 self._check_signal(sig) 

89 self._check_closed() 

90 try: 

91 # set_wakeup_fd() raises ValueError if this is not the 

92 # main thread. By calling it early we ensure that an 

93 # event loop running in another thread cannot add a signal 

94 # handler. 

95 signal.set_wakeup_fd(self._csock.fileno()) 

96 except (ValueError, OSError) as exc: 

97 raise RuntimeError(str(exc)) 

98 

99 handle = events.Handle(callback, args, self, None) 

100 self._signal_handlers[sig] = handle 

101 

102 try: 

103 # Register a dummy signal handler to ask Python to write the signal 

104 # number in the wakup file descriptor. _process_self_data() will 

105 # read signal numbers from this file descriptor to handle signals. 

106 signal.signal(sig, _sighandler_noop) 

107 

108 # Set SA_RESTART to limit EINTR occurrences. 

109 signal.siginterrupt(sig, False) 

110 except OSError as exc: 

111 del self._signal_handlers[sig] 

112 if not self._signal_handlers: 

113 try: 

114 signal.set_wakeup_fd(-1) 

115 except (ValueError, OSError) as nexc: 

116 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 

117 

118 if exc.errno == errno.EINVAL: 

119 raise RuntimeError(f'sig {sig} cannot be caught') 

120 else: 

121 raise 

122 

123 def _handle_signal(self, sig): 

124 """Internal helper that is the actual signal handler.""" 

125 handle = self._signal_handlers.get(sig) 

126 if handle is None: 

127 return # Assume it's some race condition. 

128 if handle._cancelled: 

129 self.remove_signal_handler(sig) # Remove it properly. 

130 else: 

131 self._add_callback_signalsafe(handle) 

132 

133 def remove_signal_handler(self, sig): 

134 """Remove a handler for a signal. UNIX only. 

135 

136 Return True if a signal handler was removed, False if not. 

137 """ 

138 self._check_signal(sig) 

139 try: 

140 del self._signal_handlers[sig] 

141 except KeyError: 

142 return False 

143 

144 if sig == signal.SIGINT: 

145 handler = signal.default_int_handler 

146 else: 

147 handler = signal.SIG_DFL 

148 

149 try: 

150 signal.signal(sig, handler) 

151 except OSError as exc: 

152 if exc.errno == errno.EINVAL: 

153 raise RuntimeError(f'sig {sig} cannot be caught') 

154 else: 

155 raise 

156 

157 if not self._signal_handlers: 

158 try: 

159 signal.set_wakeup_fd(-1) 

160 except (ValueError, OSError) as exc: 

161 logger.info('set_wakeup_fd(-1) failed: %s', exc) 

162 

163 return True 

164 

165 def _check_signal(self, sig): 

166 """Internal helper to validate a signal. 

167 

168 Raise ValueError if the signal number is invalid or uncatchable. 

169 Raise RuntimeError if there is a problem setting up the handler. 

170 """ 

171 if not isinstance(sig, int): 

172 raise TypeError(f'sig must be an int, not {sig!r}') 

173 

174 if sig not in signal.valid_signals(): 

175 raise ValueError(f'invalid signal number {sig}') 

176 

177 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 

178 extra=None): 

179 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 

180 

181 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 

182 extra=None): 

183 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 

184 

185 async def _make_subprocess_transport(self, protocol, args, shell, 

186 stdin, stdout, stderr, bufsize, 

187 extra=None, **kwargs): 

188 with events.get_child_watcher() as watcher: 

189 if not watcher.is_active(): 

190 # Check early. 

191 # Raising exception before process creation 

192 # prevents subprocess execution if the watcher 

193 # is not ready to handle it. 

194 raise RuntimeError("asyncio.get_child_watcher() is not activated, " 

195 "subprocess support is not installed.") 

196 waiter = self.create_future() 

197 transp = _UnixSubprocessTransport(self, protocol, args, shell, 

198 stdin, stdout, stderr, bufsize, 

199 waiter=waiter, extra=extra, 

200 **kwargs) 

201 

202 watcher.add_child_handler(transp.get_pid(), 

203 self._child_watcher_callback, transp) 

204 try: 

205 await waiter 

206 except (SystemExit, KeyboardInterrupt): 

207 raise 

208 except BaseException: 

209 transp.close() 

210 await transp._wait() 

211 raise 

212 

213 return transp 

214 

215 def _child_watcher_callback(self, pid, returncode, transp): 

216 self.call_soon_threadsafe(transp._process_exited, returncode) 

217 

218 async def create_unix_connection( 

219 self, protocol_factory, path=None, *, 

220 ssl=None, sock=None, 

221 server_hostname=None, 

222 ssl_handshake_timeout=None): 

223 assert server_hostname is None or isinstance(server_hostname, str) 

224 if ssl: 

225 if server_hostname is None: 

226 raise ValueError( 

227 'you have to pass server_hostname when using ssl') 

228 else: 

229 if server_hostname is not None: 

230 raise ValueError('server_hostname is only meaningful with ssl') 

231 if ssl_handshake_timeout is not None: 

232 raise ValueError( 

233 'ssl_handshake_timeout is only meaningful with ssl') 

234 

235 if path is not None: 

236 if sock is not None: 

237 raise ValueError( 

238 'path and sock can not be specified at the same time') 

239 

240 path = os.fspath(path) 

241 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 

242 try: 

243 sock.setblocking(False) 

244 await self.sock_connect(sock, path) 

245 except: 

246 sock.close() 

247 raise 

248 

249 else: 

250 if sock is None: 

251 raise ValueError('no path and sock were specified') 

252 if (sock.family != socket.AF_UNIX or 

253 sock.type != socket.SOCK_STREAM): 

254 raise ValueError( 

255 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 

256 sock.setblocking(False) 

257 

258 transport, protocol = await self._create_connection_transport( 

259 sock, protocol_factory, ssl, server_hostname, 

260 ssl_handshake_timeout=ssl_handshake_timeout) 

261 return transport, protocol 

262 

263 async def create_unix_server( 

264 self, protocol_factory, path=None, *, 

265 sock=None, backlog=100, ssl=None, 

266 ssl_handshake_timeout=None, 

267 start_serving=True): 

268 if isinstance(ssl, bool): 

269 raise TypeError('ssl argument must be an SSLContext or None') 

270 

271 if ssl_handshake_timeout is not None and not ssl: 

272 raise ValueError( 

273 'ssl_handshake_timeout is only meaningful with ssl') 

274 

275 if path is not None: 

276 if sock is not None: 

277 raise ValueError( 

278 'path and sock can not be specified at the same time') 

279 

280 path = os.fspath(path) 

281 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

282 

283 # Check for abstract socket. `str` and `bytes` paths are supported. 

284 if path[0] not in (0, '\x00'): 

285 try: 

286 if stat.S_ISSOCK(os.stat(path).st_mode): 

287 os.remove(path) 

288 except FileNotFoundError: 

289 pass 

290 except OSError as err: 

291 # Directory may have permissions only to create socket. 

292 logger.error('Unable to check or remove stale UNIX socket ' 

293 '%r: %r', path, err) 

294 

295 try: 

296 sock.bind(path) 

297 except OSError as exc: 

298 sock.close() 

299 if exc.errno == errno.EADDRINUSE: 

300 # Let's improve the error message by adding 

301 # with what exact address it occurs. 

302 msg = f'Address {path!r} is already in use' 

303 raise OSError(errno.EADDRINUSE, msg) from None 

304 else: 

305 raise 

306 except: 

307 sock.close() 

308 raise 

309 else: 

310 if sock is None: 

311 raise ValueError( 

312 'path was not specified, and no sock specified') 

313 

314 if (sock.family != socket.AF_UNIX or 

315 sock.type != socket.SOCK_STREAM): 

316 raise ValueError( 

317 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 

318 

319 sock.setblocking(False) 

320 server = base_events.Server(self, [sock], protocol_factory, 

321 ssl, backlog, ssl_handshake_timeout) 

322 if start_serving: 

323 server._start_serving() 

324 # Skip one loop iteration so that all 'loop.add_reader' 

325 # go through. 

326 await tasks.sleep(0, loop=self) 

327 

328 return server 

329 

330 async def _sock_sendfile_native(self, sock, file, offset, count): 

331 try: 

332 os.sendfile 

333 except AttributeError as exc: 

334 raise exceptions.SendfileNotAvailableError( 

335 "os.sendfile() is not available") 

336 try: 

337 fileno = file.fileno() 

338 except (AttributeError, io.UnsupportedOperation) as err: 

339 raise exceptions.SendfileNotAvailableError("not a regular file") 

340 try: 

341 fsize = os.fstat(fileno).st_size 

342 except OSError as err: 

343 raise exceptions.SendfileNotAvailableError("not a regular file") 

344 blocksize = count if count else fsize 

345 if not blocksize: 

346 return 0 # empty file 

347 

348 fut = self.create_future() 

349 self._sock_sendfile_native_impl(fut, None, sock, fileno, 

350 offset, count, blocksize, 0) 

351 return await fut 

352 

353 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno, 

354 offset, count, blocksize, total_sent): 

355 fd = sock.fileno() 

356 if registered_fd is not None: 

357 # Remove the callback early. It should be rare that the 

358 # selector says the fd is ready but the call still returns 

359 # EAGAIN, and I am willing to take a hit in that case in 

360 # order to simplify the common case. 

361 self.remove_writer(registered_fd) 

362 if fut.cancelled(): 

363 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

364 return 

365 if count: 

366 blocksize = count - total_sent 

367 if blocksize <= 0: 

368 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

369 fut.set_result(total_sent) 

370 return 

371 

372 try: 

373 sent = os.sendfile(fd, fileno, offset, blocksize) 

374 except (BlockingIOError, InterruptedError): 

375 if registered_fd is None: 

376 self._sock_add_cancellation_callback(fut, sock) 

377 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 

378 fd, sock, fileno, 

379 offset, count, blocksize, total_sent) 

380 except OSError as exc: 

381 if (registered_fd is not None and 

382 exc.errno == errno.ENOTCONN and 

383 type(exc) is not ConnectionError): 

384 # If we have an ENOTCONN and this isn't a first call to 

385 # sendfile(), i.e. the connection was closed in the middle 

386 # of the operation, normalize the error to ConnectionError 

387 # to make it consistent across all Posix systems. 

388 new_exc = ConnectionError( 

389 "socket is not connected", errno.ENOTCONN) 

390 new_exc.__cause__ = exc 

391 exc = new_exc 

392 if total_sent == 0: 

393 # We can get here for different reasons, the main 

394 # one being 'file' is not a regular mmap(2)-like 

395 # file, in which case we'll fall back on using 

396 # plain send(). 

397 err = exceptions.SendfileNotAvailableError( 

398 "os.sendfile call failed") 

399 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

400 fut.set_exception(err) 

401 else: 

402 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

403 fut.set_exception(exc) 

404 except (SystemExit, KeyboardInterrupt): 

405 raise 

406 except BaseException as exc: 

407 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

408 fut.set_exception(exc) 

409 else: 

410 if sent == 0: 

411 # EOF 

412 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 

413 fut.set_result(total_sent) 

414 else: 

415 offset += sent 

416 total_sent += sent 

417 if registered_fd is None: 

418 self._sock_add_cancellation_callback(fut, sock) 

419 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 

420 fd, sock, fileno, 

421 offset, count, blocksize, total_sent) 

422 

423 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent): 

424 if total_sent > 0: 

425 os.lseek(fileno, offset, os.SEEK_SET) 

426 

427 def _sock_add_cancellation_callback(self, fut, sock): 

428 def cb(fut): 

429 if fut.cancelled(): 

430 fd = sock.fileno() 

431 if fd != -1: 

432 self.remove_writer(fd) 

433 fut.add_done_callback(cb) 

434 

435 

436class _UnixReadPipeTransport(transports.ReadTransport): 

437 

438 max_size = 256 * 1024 # max bytes we read in one event loop iteration 

439 

440 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 

441 super().__init__(extra) 

442 self._extra['pipe'] = pipe 

443 self._loop = loop 

444 self._pipe = pipe 

445 self._fileno = pipe.fileno() 

446 self._protocol = protocol 

447 self._closing = False 

448 self._paused = False 

449 

450 mode = os.fstat(self._fileno).st_mode 

451 if not (stat.S_ISFIFO(mode) or 

452 stat.S_ISSOCK(mode) or 

453 stat.S_ISCHR(mode)): 

454 self._pipe = None 

455 self._fileno = None 

456 self._protocol = None 

457 raise ValueError("Pipe transport is for pipes/sockets only.") 

458 

459 os.set_blocking(self._fileno, False) 

460 

461 self._loop.call_soon(self._protocol.connection_made, self) 

462 # only start reading when connection_made() has been called 

463 self._loop.call_soon(self._loop._add_reader, 

464 self._fileno, self._read_ready) 

465 if waiter is not None: 

466 # only wake up the waiter when connection_made() has been called 

467 self._loop.call_soon(futures._set_result_unless_cancelled, 

468 waiter, None) 

469 

470 def __repr__(self): 

471 info = [self.__class__.__name__] 

472 if self._pipe is None: 

473 info.append('closed') 

474 elif self._closing: 

475 info.append('closing') 

476 info.append(f'fd={self._fileno}') 

477 selector = getattr(self._loop, '_selector', None) 

478 if self._pipe is not None and selector is not None: 

479 polling = selector_events._test_selector_event( 

480 selector, self._fileno, selectors.EVENT_READ) 

481 if polling: 

482 info.append('polling') 

483 else: 

484 info.append('idle') 

485 elif self._pipe is not None: 

486 info.append('open') 

487 else: 

488 info.append('closed') 

489 return '<{}>'.format(' '.join(info)) 

490 

491 def _read_ready(self): 

492 try: 

493 data = os.read(self._fileno, self.max_size) 

494 except (BlockingIOError, InterruptedError): 

495 pass 

496 except OSError as exc: 

497 self._fatal_error(exc, 'Fatal read error on pipe transport') 

498 else: 

499 if data: 

500 self._protocol.data_received(data) 

501 else: 

502 if self._loop.get_debug(): 

503 logger.info("%r was closed by peer", self) 

504 self._closing = True 

505 self._loop._remove_reader(self._fileno) 

506 self._loop.call_soon(self._protocol.eof_received) 

507 self._loop.call_soon(self._call_connection_lost, None) 

508 

509 def pause_reading(self): 

510 if self._closing or self._paused: 

511 return 

512 self._paused = True 

513 self._loop._remove_reader(self._fileno) 

514 if self._loop.get_debug(): 

515 logger.debug("%r pauses reading", self) 

516 

517 def resume_reading(self): 

518 if self._closing or not self._paused: 

519 return 

520 self._paused = False 

521 self._loop._add_reader(self._fileno, self._read_ready) 

522 if self._loop.get_debug(): 

523 logger.debug("%r resumes reading", self) 

524 

525 def set_protocol(self, protocol): 

526 self._protocol = protocol 

527 

528 def get_protocol(self): 

529 return self._protocol 

530 

531 def is_closing(self): 

532 return self._closing 

533 

534 def close(self): 

535 if not self._closing: 

536 self._close(None) 

537 

538 def __del__(self, _warn=warnings.warn): 

539 if self._pipe is not None: 

540 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

541 self._pipe.close() 

542 

543 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 

544 # should be called by exception handler only 

545 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 

546 if self._loop.get_debug(): 

547 logger.debug("%r: %s", self, message, exc_info=True) 

548 else: 

549 self._loop.call_exception_handler({ 

550 'message': message, 

551 'exception': exc, 

552 'transport': self, 

553 'protocol': self._protocol, 

554 }) 

555 self._close(exc) 

556 

557 def _close(self, exc): 

558 self._closing = True 

559 self._loop._remove_reader(self._fileno) 

560 self._loop.call_soon(self._call_connection_lost, exc) 

561 

562 def _call_connection_lost(self, exc): 

563 try: 

564 self._protocol.connection_lost(exc) 

565 finally: 

566 self._pipe.close() 

567 self._pipe = None 

568 self._protocol = None 

569 self._loop = None 

570 

571 

572class _UnixWritePipeTransport(transports._FlowControlMixin, 

573 transports.WriteTransport): 

574 

575 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 

576 super().__init__(extra, loop) 

577 self._extra['pipe'] = pipe 

578 self._pipe = pipe 

579 self._fileno = pipe.fileno() 

580 self._protocol = protocol 

581 self._buffer = bytearray() 

582 self._conn_lost = 0 

583 self._closing = False # Set when close() or write_eof() called. 

584 

585 mode = os.fstat(self._fileno).st_mode 

586 is_char = stat.S_ISCHR(mode) 

587 is_fifo = stat.S_ISFIFO(mode) 

588 is_socket = stat.S_ISSOCK(mode) 

589 if not (is_char or is_fifo or is_socket): 

590 self._pipe = None 

591 self._fileno = None 

592 self._protocol = None 

593 raise ValueError("Pipe transport is only for " 

594 "pipes, sockets and character devices") 

595 

596 os.set_blocking(self._fileno, False) 

597 self._loop.call_soon(self._protocol.connection_made, self) 

598 

599 # On AIX, the reader trick (to be notified when the read end of the 

600 # socket is closed) only works for sockets. On other platforms it 

601 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 

602 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 

603 # only start reading when connection_made() has been called 

604 self._loop.call_soon(self._loop._add_reader, 

605 self._fileno, self._read_ready) 

606 

607 if waiter is not None: 

608 # only wake up the waiter when connection_made() has been called 

609 self._loop.call_soon(futures._set_result_unless_cancelled, 

610 waiter, None) 

611 

612 def __repr__(self): 

613 info = [self.__class__.__name__] 

614 if self._pipe is None: 

615 info.append('closed') 

616 elif self._closing: 

617 info.append('closing') 

618 info.append(f'fd={self._fileno}') 

619 selector = getattr(self._loop, '_selector', None) 

620 if self._pipe is not None and selector is not None: 

621 polling = selector_events._test_selector_event( 

622 selector, self._fileno, selectors.EVENT_WRITE) 

623 if polling: 

624 info.append('polling') 

625 else: 

626 info.append('idle') 

627 

628 bufsize = self.get_write_buffer_size() 

629 info.append(f'bufsize={bufsize}') 

630 elif self._pipe is not None: 

631 info.append('open') 

632 else: 

633 info.append('closed') 

634 return '<{}>'.format(' '.join(info)) 

635 

636 def get_write_buffer_size(self): 

637 return len(self._buffer) 

638 

639 def _read_ready(self): 

640 # Pipe was closed by peer. 

641 if self._loop.get_debug(): 

642 logger.info("%r was closed by peer", self) 

643 if self._buffer: 

644 self._close(BrokenPipeError()) 

645 else: 

646 self._close() 

647 

648 def write(self, data): 

649 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 

650 if isinstance(data, bytearray): 

651 data = memoryview(data) 

652 if not data: 

653 return 

654 

655 if self._conn_lost or self._closing: 

656 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

657 logger.warning('pipe closed by peer or ' 

658 'os.write(pipe, data) raised exception.') 

659 self._conn_lost += 1 

660 return 

661 

662 if not self._buffer: 

663 # Attempt to send it right away first. 

664 try: 

665 n = os.write(self._fileno, data) 

666 except (BlockingIOError, InterruptedError): 

667 n = 0 

668 except (SystemExit, KeyboardInterrupt): 

669 raise 

670 except BaseException as exc: 

671 self._conn_lost += 1 

672 self._fatal_error(exc, 'Fatal write error on pipe transport') 

673 return 

674 if n == len(data): 

675 return 

676 elif n > 0: 

677 data = memoryview(data)[n:] 

678 self._loop._add_writer(self._fileno, self._write_ready) 

679 

680 self._buffer += data 

681 self._maybe_pause_protocol() 

682 

683 def _write_ready(self): 

684 assert self._buffer, 'Data should not be empty' 

685 

686 try: 

687 n = os.write(self._fileno, self._buffer) 

688 except (BlockingIOError, InterruptedError): 

689 pass 

690 except (SystemExit, KeyboardInterrupt): 

691 raise 

692 except BaseException as exc: 

693 self._buffer.clear() 

694 self._conn_lost += 1 

695 # Remove writer here, _fatal_error() doesn't it 

696 # because _buffer is empty. 

697 self._loop._remove_writer(self._fileno) 

698 self._fatal_error(exc, 'Fatal write error on pipe transport') 

699 else: 

700 if n == len(self._buffer): 

701 self._buffer.clear() 

702 self._loop._remove_writer(self._fileno) 

703 self._maybe_resume_protocol() # May append to buffer. 

704 if self._closing: 

705 self._loop._remove_reader(self._fileno) 

706 self._call_connection_lost(None) 

707 return 

708 elif n > 0: 

709 del self._buffer[:n] 

710 

711 def can_write_eof(self): 

712 return True 

713 

714 def write_eof(self): 

715 if self._closing: 

716 return 

717 assert self._pipe 

718 self._closing = True 

719 if not self._buffer: 

720 self._loop._remove_reader(self._fileno) 

721 self._loop.call_soon(self._call_connection_lost, None) 

722 

723 def set_protocol(self, protocol): 

724 self._protocol = protocol 

725 

726 def get_protocol(self): 

727 return self._protocol 

728 

729 def is_closing(self): 

730 return self._closing 

731 

732 def close(self): 

733 if self._pipe is not None and not self._closing: 

734 # write_eof is all what we needed to close the write pipe 

735 self.write_eof() 

736 

737 def __del__(self, _warn=warnings.warn): 

738 if self._pipe is not None: 

739 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

740 self._pipe.close() 

741 

742 def abort(self): 

743 self._close(None) 

744 

745 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 

746 # should be called by exception handler only 

747 if isinstance(exc, OSError): 

748 if self._loop.get_debug(): 

749 logger.debug("%r: %s", self, message, exc_info=True) 

750 else: 

751 self._loop.call_exception_handler({ 

752 'message': message, 

753 'exception': exc, 

754 'transport': self, 

755 'protocol': self._protocol, 

756 }) 

757 self._close(exc) 

758 

759 def _close(self, exc=None): 

760 self._closing = True 

761 if self._buffer: 

762 self._loop._remove_writer(self._fileno) 

763 self._buffer.clear() 

764 self._loop._remove_reader(self._fileno) 

765 self._loop.call_soon(self._call_connection_lost, exc) 

766 

767 def _call_connection_lost(self, exc): 

768 try: 

769 self._protocol.connection_lost(exc) 

770 finally: 

771 self._pipe.close() 

772 self._pipe = None 

773 self._protocol = None 

774 self._loop = None 

775 

776 

777class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 

778 

779 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 

780 stdin_w = None 

781 if stdin == subprocess.PIPE: 

782 # Use a socket pair for stdin, since not all platforms 

783 # support selecting read events on the write end of a 

784 # socket (which we use in order to detect closing of the 

785 # other end). Notably this is needed on AIX, and works 

786 # just fine on other platforms. 

787 stdin, stdin_w = socket.socketpair() 

788 try: 

789 self._proc = subprocess.Popen( 

790 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 

791 universal_newlines=False, bufsize=bufsize, **kwargs) 

792 if stdin_w is not None: 

793 stdin.close() 

794 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 

795 stdin_w = None 

796 finally: 

797 if stdin_w is not None: 

798 stdin.close() 

799 stdin_w.close() 

800 

801 

802class AbstractChildWatcher: 

803 """Abstract base class for monitoring child processes. 

804 

805 Objects derived from this class monitor a collection of subprocesses and 

806 report their termination or interruption by a signal. 

807 

808 New callbacks are registered with .add_child_handler(). Starting a new 

809 process must be done within a 'with' block to allow the watcher to suspend 

810 its activity until the new process if fully registered (this is needed to 

811 prevent a race condition in some implementations). 

812 

813 Example: 

814 with watcher: 

815 proc = subprocess.Popen("sleep 1") 

816 watcher.add_child_handler(proc.pid, callback) 

817 

818 Notes: 

819 Implementations of this class must be thread-safe. 

820 

821 Since child watcher objects may catch the SIGCHLD signal and call 

822 waitpid(-1), there should be only one active object per process. 

823 """ 

824 

825 def add_child_handler(self, pid, callback, *args): 

826 """Register a new child handler. 

827 

828 Arrange for callback(pid, returncode, *args) to be called when 

829 process 'pid' terminates. Specifying another callback for the same 

830 process replaces the previous handler. 

831 

832 Note: callback() must be thread-safe. 

833 """ 

834 raise NotImplementedError() 

835 

836 def remove_child_handler(self, pid): 

837 """Removes the handler for process 'pid'. 

838 

839 The function returns True if the handler was successfully removed, 

840 False if there was nothing to remove.""" 

841 

842 raise NotImplementedError() 

843 

844 def attach_loop(self, loop): 

845 """Attach the watcher to an event loop. 

846 

847 If the watcher was previously attached to an event loop, then it is 

848 first detached before attaching to the new loop. 

849 

850 Note: loop may be None. 

851 """ 

852 raise NotImplementedError() 

853 

854 def close(self): 

855 """Close the watcher. 

856 

857 This must be called to make sure that any underlying resource is freed. 

858 """ 

859 raise NotImplementedError() 

860 

861 def is_active(self): 

862 """Return ``True`` if the watcher is active and is used by the event loop. 

863 

864 Return True if the watcher is installed and ready to handle process exit 

865 notifications. 

866 

867 """ 

868 raise NotImplementedError() 

869 

870 def __enter__(self): 

871 """Enter the watcher's context and allow starting new processes 

872 

873 This function must return self""" 

874 raise NotImplementedError() 

875 

876 def __exit__(self, a, b, c): 

877 """Exit the watcher's context""" 

878 raise NotImplementedError() 

879 

880 

881def _compute_returncode(status): 

882 if os.WIFSIGNALED(status): 

883 # The child process died because of a signal. 

884 return -os.WTERMSIG(status) 

885 elif os.WIFEXITED(status): 

886 # The child process exited (e.g sys.exit()). 

887 return os.WEXITSTATUS(status) 

888 else: 

889 # The child exited, but we don't understand its status. 

890 # This shouldn't happen, but if it does, let's just 

891 # return that status; perhaps that helps debug it. 

892 return status 

893 

894 

895class BaseChildWatcher(AbstractChildWatcher): 

896 

897 def __init__(self): 

898 self._loop = None 

899 self._callbacks = {} 

900 

901 def close(self): 

902 self.attach_loop(None) 

903 

904 def is_active(self): 

905 return self._loop is not None and self._loop.is_running() 

906 

907 def _do_waitpid(self, expected_pid): 

908 raise NotImplementedError() 

909 

910 def _do_waitpid_all(self): 

911 raise NotImplementedError() 

912 

913 def attach_loop(self, loop): 

914 assert loop is None or isinstance(loop, events.AbstractEventLoop) 

915 

916 if self._loop is not None and loop is None and self._callbacks: 

917 warnings.warn( 

918 'A loop is being detached ' 

919 'from a child watcher with pending handlers', 

920 RuntimeWarning) 

921 

922 if self._loop is not None: 

923 self._loop.remove_signal_handler(signal.SIGCHLD) 

924 

925 self._loop = loop 

926 if loop is not None: 

927 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) 

928 

929 # Prevent a race condition in case a child terminated 

930 # during the switch. 

931 self._do_waitpid_all() 

932 

933 def _sig_chld(self): 

934 try: 

935 self._do_waitpid_all() 

936 except (SystemExit, KeyboardInterrupt): 

937 raise 

938 except BaseException as exc: 

939 # self._loop should always be available here 

940 # as '_sig_chld' is added as a signal handler 

941 # in 'attach_loop' 

942 self._loop.call_exception_handler({ 

943 'message': 'Unknown exception in SIGCHLD handler', 

944 'exception': exc, 

945 }) 

946 

947 

948class SafeChildWatcher(BaseChildWatcher): 

949 """'Safe' child watcher implementation. 

950 

951 This implementation avoids disrupting other code spawning processes by 

952 polling explicitly each process in the SIGCHLD handler instead of calling 

953 os.waitpid(-1). 

954 

955 This is a safe solution but it has a significant overhead when handling a 

956 big number of children (O(n) each time SIGCHLD is raised) 

957 """ 

958 

959 def close(self): 

960 self._callbacks.clear() 

961 super().close() 

962 

963 def __enter__(self): 

964 return self 

965 

966 def __exit__(self, a, b, c): 

967 pass 

968 

969 def add_child_handler(self, pid, callback, *args): 

970 self._callbacks[pid] = (callback, args) 

971 

972 # Prevent a race condition in case the child is already terminated. 

973 self._do_waitpid(pid) 

974 

975 def remove_child_handler(self, pid): 

976 try: 

977 del self._callbacks[pid] 

978 return True 

979 except KeyError: 

980 return False 

981 

982 def _do_waitpid_all(self): 

983 

984 for pid in list(self._callbacks): 

985 self._do_waitpid(pid) 

986 

987 def _do_waitpid(self, expected_pid): 

988 assert expected_pid > 0 

989 

990 try: 

991 pid, status = os.waitpid(expected_pid, os.WNOHANG) 

992 except ChildProcessError: 

993 # The child process is already reaped 

994 # (may happen if waitpid() is called elsewhere). 

995 pid = expected_pid 

996 returncode = 255 

997 logger.warning( 

998 "Unknown child process pid %d, will report returncode 255", 

999 pid) 

1000 else: 

1001 if pid == 0: 

1002 # The child process is still alive. 

1003 return 

1004 

1005 returncode = _compute_returncode(status) 

1006 if self._loop.get_debug(): 

1007 logger.debug('process %s exited with returncode %s', 

1008 expected_pid, returncode) 

1009 

1010 try: 

1011 callback, args = self._callbacks.pop(pid) 

1012 except KeyError: # pragma: no cover 

1013 # May happen if .remove_child_handler() is called 

1014 # after os.waitpid() returns. 

1015 if self._loop.get_debug(): 

1016 logger.warning("Child watcher got an unexpected pid: %r", 

1017 pid, exc_info=True) 

1018 else: 

1019 callback(pid, returncode, *args) 

1020 

1021 

1022class FastChildWatcher(BaseChildWatcher): 

1023 """'Fast' child watcher implementation. 

1024 

1025 This implementation reaps every terminated processes by calling 

1026 os.waitpid(-1) directly, possibly breaking other code spawning processes 

1027 and waiting for their termination. 

1028 

1029 There is no noticeable overhead when handling a big number of children 

1030 (O(1) each time a child terminates). 

1031 """ 

1032 def __init__(self): 

1033 super().__init__() 

1034 self._lock = threading.Lock() 

1035 self._zombies = {} 

1036 self._forks = 0 

1037 

1038 def close(self): 

1039 self._callbacks.clear() 

1040 self._zombies.clear() 

1041 super().close() 

1042 

1043 def __enter__(self): 

1044 with self._lock: 

1045 self._forks += 1 

1046 

1047 return self 

1048 

1049 def __exit__(self, a, b, c): 

1050 with self._lock: 

1051 self._forks -= 1 

1052 

1053 if self._forks or not self._zombies: 

1054 return 

1055 

1056 collateral_victims = str(self._zombies) 

1057 self._zombies.clear() 

1058 

1059 logger.warning( 

1060 "Caught subprocesses termination from unknown pids: %s", 

1061 collateral_victims) 

1062 

1063 def add_child_handler(self, pid, callback, *args): 

1064 assert self._forks, "Must use the context manager" 

1065 

1066 with self._lock: 

1067 try: 

1068 returncode = self._zombies.pop(pid) 

1069 except KeyError: 

1070 # The child is running. 

1071 self._callbacks[pid] = callback, args 

1072 return 

1073 

1074 # The child is dead already. We can fire the callback. 

1075 callback(pid, returncode, *args) 

1076 

1077 def remove_child_handler(self, pid): 

1078 try: 

1079 del self._callbacks[pid] 

1080 return True 

1081 except KeyError: 

1082 return False 

1083 

1084 def _do_waitpid_all(self): 

1085 # Because of signal coalescing, we must keep calling waitpid() as 

1086 # long as we're able to reap a child. 

1087 while True: 

1088 try: 

1089 pid, status = os.waitpid(-1, os.WNOHANG) 

1090 except ChildProcessError: 

1091 # No more child processes exist. 

1092 return 

1093 else: 

1094 if pid == 0: 

1095 # A child process is still alive. 

1096 return 

1097 

1098 returncode = _compute_returncode(status) 

1099 

1100 with self._lock: 

1101 try: 

1102 callback, args = self._callbacks.pop(pid) 

1103 except KeyError: 

1104 # unknown child 

1105 if self._forks: 

1106 # It may not be registered yet. 

1107 self._zombies[pid] = returncode 

1108 if self._loop.get_debug(): 

1109 logger.debug('unknown process %s exited ' 

1110 'with returncode %s', 

1111 pid, returncode) 

1112 continue 

1113 callback = None 

1114 else: 

1115 if self._loop.get_debug(): 

1116 logger.debug('process %s exited with returncode %s', 

1117 pid, returncode) 

1118 

1119 if callback is None: 

1120 logger.warning( 

1121 "Caught subprocess termination from unknown pid: " 

1122 "%d -> %d", pid, returncode) 

1123 else: 

1124 callback(pid, returncode, *args) 

1125 

1126 

1127class MultiLoopChildWatcher(AbstractChildWatcher): 

1128 """A watcher that doesn't require running loop in the main thread. 

1129 

1130 This implementation registers a SIGCHLD signal handler on 

1131 instantiation (which may conflict with other code that 

1132 install own handler for this signal). 

1133 

1134 The solution is safe but it has a significant overhead when 

1135 handling a big number of processes (*O(n)* each time a 

1136 SIGCHLD is received). 

1137 """ 

1138 

1139 # Implementation note: 

1140 # The class keeps compatibility with AbstractChildWatcher ABC 

1141 # To achieve this it has empty attach_loop() method 

1142 # and doesn't accept explicit loop argument 

1143 # for add_child_handler()/remove_child_handler() 

1144 # but retrieves the current loop by get_running_loop() 

1145 

1146 def __init__(self): 

1147 self._callbacks = {} 

1148 self._saved_sighandler = None 

1149 

1150 def is_active(self): 

1151 return self._saved_sighandler is not None 

1152 

1153 def close(self): 

1154 self._callbacks.clear() 

1155 if self._saved_sighandler is not None: 

1156 handler = signal.getsignal(signal.SIGCHLD) 

1157 if handler != self._sig_chld: 

1158 logger.warning("SIGCHLD handler was changed by outside code") 

1159 else: 

1160 signal.signal(signal.SIGCHLD, self._saved_sighandler) 

1161 self._saved_sighandler = None 

1162 

1163 def __enter__(self): 

1164 return self 

1165 

1166 def __exit__(self, exc_type, exc_val, exc_tb): 

1167 pass 

1168 

1169 def add_child_handler(self, pid, callback, *args): 

1170 loop = events.get_running_loop() 

1171 self._callbacks[pid] = (loop, callback, args) 

1172 

1173 # Prevent a race condition in case the child is already terminated. 

1174 self._do_waitpid(pid) 

1175 

1176 def remove_child_handler(self, pid): 

1177 try: 

1178 del self._callbacks[pid] 

1179 return True 

1180 except KeyError: 

1181 return False 

1182 

1183 def attach_loop(self, loop): 

1184 # Don't save the loop but initialize itself if called first time 

1185 # The reason to do it here is that attach_loop() is called from 

1186 # unix policy only for the main thread. 

1187 # Main thread is required for subscription on SIGCHLD signal 

1188 if self._saved_sighandler is None: 

1189 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) 

1190 if self._saved_sighandler is None: 

1191 logger.warning("Previous SIGCHLD handler was set by non-Python code, " 

1192 "restore to default handler on watcher close.") 

1193 self._saved_sighandler = signal.SIG_DFL 

1194 

1195 # Set SA_RESTART to limit EINTR occurrences. 

1196 signal.siginterrupt(signal.SIGCHLD, False) 

1197 

1198 def _do_waitpid_all(self): 

1199 for pid in list(self._callbacks): 

1200 self._do_waitpid(pid) 

1201 

1202 def _do_waitpid(self, expected_pid): 

1203 assert expected_pid > 0 

1204 

1205 try: 

1206 pid, status = os.waitpid(expected_pid, os.WNOHANG) 

1207 except ChildProcessError: 

1208 # The child process is already reaped 

1209 # (may happen if waitpid() is called elsewhere). 

1210 pid = expected_pid 

1211 returncode = 255 

1212 logger.warning( 

1213 "Unknown child process pid %d, will report returncode 255", 

1214 pid) 

1215 debug_log = False 

1216 else: 

1217 if pid == 0: 

1218 # The child process is still alive. 

1219 return 

1220 

1221 returncode = _compute_returncode(status) 

1222 debug_log = True 

1223 try: 

1224 loop, callback, args = self._callbacks.pop(pid) 

1225 except KeyError: # pragma: no cover 

1226 # May happen if .remove_child_handler() is called 

1227 # after os.waitpid() returns. 

1228 logger.warning("Child watcher got an unexpected pid: %r", 

1229 pid, exc_info=True) 

1230 else: 

1231 if loop.is_closed(): 

1232 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 

1233 else: 

1234 if debug_log and loop.get_debug(): 

1235 logger.debug('process %s exited with returncode %s', 

1236 expected_pid, returncode) 

1237 loop.call_soon_threadsafe(callback, pid, returncode, *args) 

1238 

1239 def _sig_chld(self, signum, frame): 

1240 try: 

1241 self._do_waitpid_all() 

1242 except (SystemExit, KeyboardInterrupt): 

1243 raise 

1244 except BaseException: 

1245 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) 

1246 

1247 

1248class ThreadedChildWatcher(AbstractChildWatcher): 

1249 """Threaded child watcher implementation. 

1250 

1251 The watcher uses a thread per process 

1252 for waiting for the process finish. 

1253 

1254 It doesn't require subscription on POSIX signal 

1255 but a thread creation is not free. 

1256 

1257 The watcher has O(1) complexity, its performance doesn't depend 

1258 on amount of spawn processes. 

1259 """ 

1260 

1261 def __init__(self): 

1262 self._pid_counter = itertools.count(0) 

1263 self._threads = {} 

1264 

1265 def is_active(self): 

1266 return True 

1267 

1268 def close(self): 

1269 self._join_threads() 

1270 

1271 def _join_threads(self): 

1272 """Internal: Join all non-daemon threads""" 

1273 threads = [thread for thread in list(self._threads.values()) 

1274 if thread.is_alive() and not thread.daemon] 

1275 for thread in threads: 

1276 thread.join() 

1277 

1278 def __enter__(self): 

1279 return self 

1280 

1281 def __exit__(self, exc_type, exc_val, exc_tb): 

1282 pass 

1283 

1284 def __del__(self, _warn=warnings.warn): 

1285 threads = [thread for thread in list(self._threads.values()) 

1286 if thread.is_alive()] 

1287 if threads: 

1288 _warn(f"{self.__class__} has registered but not finished child processes", 

1289 ResourceWarning, 

1290 source=self) 

1291 

1292 def add_child_handler(self, pid, callback, *args): 

1293 loop = events.get_running_loop() 

1294 thread = threading.Thread(target=self._do_waitpid, 

1295 name=f"waitpid-{next(self._pid_counter)}", 

1296 args=(loop, pid, callback, args), 

1297 daemon=True) 

1298 self._threads[pid] = thread 

1299 thread.start() 

1300 

1301 def remove_child_handler(self, pid): 

1302 # asyncio never calls remove_child_handler() !!! 

1303 # The method is no-op but is implemented because 

1304 # abstract base classe requires it 

1305 return True 

1306 

1307 def attach_loop(self, loop): 

1308 pass 

1309 

1310 def _do_waitpid(self, loop, expected_pid, callback, args): 

1311 assert expected_pid > 0 

1312 

1313 try: 

1314 pid, status = os.waitpid(expected_pid, 0) 

1315 except ChildProcessError: 

1316 # The child process is already reaped 

1317 # (may happen if waitpid() is called elsewhere). 

1318 pid = expected_pid 

1319 returncode = 255 

1320 logger.warning( 

1321 "Unknown child process pid %d, will report returncode 255", 

1322 pid) 

1323 else: 

1324 returncode = _compute_returncode(status) 

1325 if loop.get_debug(): 

1326 logger.debug('process %s exited with returncode %s', 

1327 expected_pid, returncode) 

1328 

1329 if loop.is_closed(): 

1330 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 

1331 else: 

1332 loop.call_soon_threadsafe(callback, pid, returncode, *args) 

1333 

1334 self._threads.pop(expected_pid) 

1335 

1336 

1337class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 

1338 """UNIX event loop policy with a watcher for child processes.""" 

1339 _loop_factory = _UnixSelectorEventLoop 

1340 

1341 def __init__(self): 

1342 super().__init__() 

1343 self._watcher = None 

1344 

1345 def _init_watcher(self): 

1346 with events._lock: 

1347 if self._watcher is None: # pragma: no branch 

1348 self._watcher = ThreadedChildWatcher() 

1349 if isinstance(threading.current_thread(), 

1350 threading._MainThread): 

1351 self._watcher.attach_loop(self._local._loop) 

1352 

1353 def set_event_loop(self, loop): 

1354 """Set the event loop. 

1355 

1356 As a side effect, if a child watcher was set before, then calling 

1357 .set_event_loop() from the main thread will call .attach_loop(loop) on 

1358 the child watcher. 

1359 """ 

1360 

1361 super().set_event_loop(loop) 

1362 

1363 if (self._watcher is not None and 

1364 isinstance(threading.current_thread(), threading._MainThread)): 

1365 self._watcher.attach_loop(loop) 

1366 

1367 def get_child_watcher(self): 

1368 """Get the watcher for child processes. 

1369 

1370 If not yet set, a ThreadedChildWatcher object is automatically created. 

1371 """ 

1372 if self._watcher is None: 

1373 self._init_watcher() 

1374 

1375 return self._watcher 

1376 

1377 def set_child_watcher(self, watcher): 

1378 """Set the watcher for child processes.""" 

1379 

1380 assert watcher is None or isinstance(watcher, AbstractChildWatcher) 

1381 

1382 if self._watcher is not None: 

1383 self._watcher.close() 

1384 

1385 self._watcher = watcher 

1386 

1387 

1388SelectorEventLoop = _UnixSelectorEventLoop 

1389DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy