Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/asyncio/selector_events.py: 14%

750 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1"""Event loop using a selector and related classes. 

2 

3A selector is a "notify-when-ready" multiplexer. For a subclass which 

4also includes support for signal handling, see the unix_events sub-module. 

5""" 

6 

7__all__ = 'BaseSelectorEventLoop', 

8 

9import collections 

10import errno 

11import functools 

12import selectors 

13import socket 

14import warnings 

15import weakref 

16try: 

17 import ssl 

18except ImportError: # pragma: no cover 

19 ssl = None 

20 

21from . import base_events 

22from . import constants 

23from . import events 

24from . import futures 

25from . import protocols 

26from . import sslproto 

27from . import transports 

28from . import trsock 

29from .log import logger 

30 

31 

32def _test_selector_event(selector, fd, event): 

33 # Test if the selector is monitoring 'event' events 

34 # for the file descriptor 'fd'. 

35 try: 

36 key = selector.get_key(fd) 

37 except KeyError: 

38 return False 

39 else: 

40 return bool(key.events & event) 

41 

42 

43def _check_ssl_socket(sock): 

44 if ssl is not None and isinstance(sock, ssl.SSLSocket): 

45 raise TypeError("Socket cannot be of type SSLSocket") 

46 

47 

48class BaseSelectorEventLoop(base_events.BaseEventLoop): 

49 """Selector event loop. 

50 

51 See events.EventLoop for API specification. 

52 """ 

53 

54 def __init__(self, selector=None): 

55 super().__init__() 

56 

57 if selector is None: 

58 selector = selectors.DefaultSelector() 

59 logger.debug('Using selector: %s', selector.__class__.__name__) 

60 self._selector = selector 

61 self._make_self_pipe() 

62 self._transports = weakref.WeakValueDictionary() 

63 

64 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

65 extra=None, server=None): 

66 return _SelectorSocketTransport(self, sock, protocol, waiter, 

67 extra, server) 

68 

69 def _make_ssl_transport( 

70 self, rawsock, protocol, sslcontext, waiter=None, 

71 *, server_side=False, server_hostname=None, 

72 extra=None, server=None, 

73 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 

74 ssl_protocol = sslproto.SSLProtocol( 

75 self, protocol, sslcontext, waiter, 

76 server_side, server_hostname, 

77 ssl_handshake_timeout=ssl_handshake_timeout) 

78 _SelectorSocketTransport(self, rawsock, ssl_protocol, 

79 extra=extra, server=server) 

80 return ssl_protocol._app_transport 

81 

82 def _make_datagram_transport(self, sock, protocol, 

83 address=None, waiter=None, extra=None): 

84 return _SelectorDatagramTransport(self, sock, protocol, 

85 address, waiter, extra) 

86 

87 def close(self): 

88 if self.is_running(): 

89 raise RuntimeError("Cannot close a running event loop") 

90 if self.is_closed(): 

91 return 

92 self._close_self_pipe() 

93 super().close() 

94 if self._selector is not None: 

95 self._selector.close() 

96 self._selector = None 

97 

98 def _close_self_pipe(self): 

99 self._remove_reader(self._ssock.fileno()) 

100 self._ssock.close() 

101 self._ssock = None 

102 self._csock.close() 

103 self._csock = None 

104 self._internal_fds -= 1 

105 

106 def _make_self_pipe(self): 

107 # A self-socket, really. :-) 

108 self._ssock, self._csock = socket.socketpair() 

109 self._ssock.setblocking(False) 

110 self._csock.setblocking(False) 

111 self._internal_fds += 1 

112 self._add_reader(self._ssock.fileno(), self._read_from_self) 

113 

114 def _process_self_data(self, data): 

115 pass 

116 

117 def _read_from_self(self): 

118 while True: 

119 try: 

120 data = self._ssock.recv(4096) 

121 if not data: 

122 break 

123 self._process_self_data(data) 

124 except InterruptedError: 

125 continue 

126 except BlockingIOError: 

127 break 

128 

129 def _write_to_self(self): 

130 # This may be called from a different thread, possibly after 

131 # _close_self_pipe() has been called or even while it is 

132 # running. Guard for self._csock being None or closed. When 

133 # a socket is closed, send() raises OSError (with errno set to 

134 # EBADF, but let's not rely on the exact error code). 

135 csock = self._csock 

136 if csock is None: 

137 return 

138 

139 try: 

140 csock.send(b'\0') 

141 except OSError: 

142 if self._debug: 

143 logger.debug("Fail to write a null byte into the " 

144 "self-pipe socket", 

145 exc_info=True) 

146 

147 def _start_serving(self, protocol_factory, sock, 

148 sslcontext=None, server=None, backlog=100, 

149 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 

150 self._add_reader(sock.fileno(), self._accept_connection, 

151 protocol_factory, sock, sslcontext, server, backlog, 

152 ssl_handshake_timeout) 

153 

154 def _accept_connection( 

155 self, protocol_factory, sock, 

156 sslcontext=None, server=None, backlog=100, 

157 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 

158 # This method is only called once for each event loop tick where the 

159 # listening socket has triggered an EVENT_READ. There may be multiple 

160 # connections waiting for an .accept() so it is called in a loop. 

161 # See https://bugs.python.org/issue27906 for more details. 

162 for _ in range(backlog): 

163 try: 

164 conn, addr = sock.accept() 

165 if self._debug: 

166 logger.debug("%r got a new connection from %r: %r", 

167 server, addr, conn) 

168 conn.setblocking(False) 

169 except (BlockingIOError, InterruptedError, ConnectionAbortedError): 

170 # Early exit because the socket accept buffer is empty. 

171 return None 

172 except OSError as exc: 

173 # There's nowhere to send the error, so just log it. 

174 if exc.errno in (errno.EMFILE, errno.ENFILE, 

175 errno.ENOBUFS, errno.ENOMEM): 

176 # Some platforms (e.g. Linux keep reporting the FD as 

177 # ready, so we remove the read handler temporarily. 

178 # We'll try again in a while. 

179 self.call_exception_handler({ 

180 'message': 'socket.accept() out of system resource', 

181 'exception': exc, 

182 'socket': trsock.TransportSocket(sock), 

183 }) 

184 self._remove_reader(sock.fileno()) 

185 self.call_later(constants.ACCEPT_RETRY_DELAY, 

186 self._start_serving, 

187 protocol_factory, sock, sslcontext, server, 

188 backlog, ssl_handshake_timeout) 

189 else: 

190 raise # The event loop will catch, log and ignore it. 

191 else: 

192 extra = {'peername': addr} 

193 accept = self._accept_connection2( 

194 protocol_factory, conn, extra, sslcontext, server, 

195 ssl_handshake_timeout) 

196 self.create_task(accept) 

197 

198 async def _accept_connection2( 

199 self, protocol_factory, conn, extra, 

200 sslcontext=None, server=None, 

201 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 

202 protocol = None 

203 transport = None 

204 try: 

205 protocol = protocol_factory() 

206 waiter = self.create_future() 

207 if sslcontext: 

208 transport = self._make_ssl_transport( 

209 conn, protocol, sslcontext, waiter=waiter, 

210 server_side=True, extra=extra, server=server, 

211 ssl_handshake_timeout=ssl_handshake_timeout) 

212 else: 

213 transport = self._make_socket_transport( 

214 conn, protocol, waiter=waiter, extra=extra, 

215 server=server) 

216 

217 try: 

218 await waiter 

219 except BaseException: 

220 transport.close() 

221 raise 

222 # It's now up to the protocol to handle the connection. 

223 

224 except (SystemExit, KeyboardInterrupt): 

225 raise 

226 except BaseException as exc: 

227 if self._debug: 

228 context = { 

229 'message': 

230 'Error on transport creation for incoming connection', 

231 'exception': exc, 

232 } 

233 if protocol is not None: 

234 context['protocol'] = protocol 

235 if transport is not None: 

236 context['transport'] = transport 

237 self.call_exception_handler(context) 

238 

239 def _ensure_fd_no_transport(self, fd): 

240 fileno = fd 

241 if not isinstance(fileno, int): 

242 try: 

243 fileno = int(fileno.fileno()) 

244 except (AttributeError, TypeError, ValueError): 

245 # This code matches selectors._fileobj_to_fd function. 

246 raise ValueError(f"Invalid file object: {fd!r}") from None 

247 try: 

248 transport = self._transports[fileno] 

249 except KeyError: 

250 pass 

251 else: 

252 if not transport.is_closing(): 

253 raise RuntimeError( 

254 f'File descriptor {fd!r} is used by transport ' 

255 f'{transport!r}') 

256 

257 def _add_reader(self, fd, callback, *args): 

258 self._check_closed() 

259 handle = events.Handle(callback, args, self, None) 

260 try: 

261 key = self._selector.get_key(fd) 

262 except KeyError: 

263 self._selector.register(fd, selectors.EVENT_READ, 

264 (handle, None)) 

265 else: 

266 mask, (reader, writer) = key.events, key.data 

267 self._selector.modify(fd, mask | selectors.EVENT_READ, 

268 (handle, writer)) 

269 if reader is not None: 

270 reader.cancel() 

271 return handle 

272 

273 def _remove_reader(self, fd): 

274 if self.is_closed(): 

275 return False 

276 try: 

277 key = self._selector.get_key(fd) 

278 except KeyError: 

279 return False 

280 else: 

281 mask, (reader, writer) = key.events, key.data 

282 mask &= ~selectors.EVENT_READ 

283 if not mask: 

284 self._selector.unregister(fd) 

285 else: 

286 self._selector.modify(fd, mask, (None, writer)) 

287 

288 if reader is not None: 

289 reader.cancel() 

290 return True 

291 else: 

292 return False 

293 

294 def _add_writer(self, fd, callback, *args): 

295 self._check_closed() 

296 handle = events.Handle(callback, args, self, None) 

297 try: 

298 key = self._selector.get_key(fd) 

299 except KeyError: 

300 self._selector.register(fd, selectors.EVENT_WRITE, 

301 (None, handle)) 

302 else: 

303 mask, (reader, writer) = key.events, key.data 

304 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 

305 (reader, handle)) 

306 if writer is not None: 

307 writer.cancel() 

308 return handle 

309 

310 def _remove_writer(self, fd): 

311 """Remove a writer callback.""" 

312 if self.is_closed(): 

313 return False 

314 try: 

315 key = self._selector.get_key(fd) 

316 except KeyError: 

317 return False 

318 else: 

319 mask, (reader, writer) = key.events, key.data 

320 # Remove both writer and connector. 

321 mask &= ~selectors.EVENT_WRITE 

322 if not mask: 

323 self._selector.unregister(fd) 

324 else: 

325 self._selector.modify(fd, mask, (reader, None)) 

326 

327 if writer is not None: 

328 writer.cancel() 

329 return True 

330 else: 

331 return False 

332 

333 def add_reader(self, fd, callback, *args): 

334 """Add a reader callback.""" 

335 self._ensure_fd_no_transport(fd) 

336 self._add_reader(fd, callback, *args) 

337 

338 def remove_reader(self, fd): 

339 """Remove a reader callback.""" 

340 self._ensure_fd_no_transport(fd) 

341 return self._remove_reader(fd) 

342 

343 def add_writer(self, fd, callback, *args): 

344 """Add a writer callback..""" 

345 self._ensure_fd_no_transport(fd) 

346 self._add_writer(fd, callback, *args) 

347 

348 def remove_writer(self, fd): 

349 """Remove a writer callback.""" 

350 self._ensure_fd_no_transport(fd) 

351 return self._remove_writer(fd) 

352 

353 async def sock_recv(self, sock, n): 

354 """Receive data from the socket. 

355 

356 The return value is a bytes object representing the data received. 

357 The maximum amount of data to be received at once is specified by 

358 nbytes. 

359 """ 

360 _check_ssl_socket(sock) 

361 if self._debug and sock.gettimeout() != 0: 

362 raise ValueError("the socket must be non-blocking") 

363 try: 

364 return sock.recv(n) 

365 except (BlockingIOError, InterruptedError): 

366 pass 

367 fut = self.create_future() 

368 fd = sock.fileno() 

369 self._ensure_fd_no_transport(fd) 

370 handle = self._add_reader(fd, self._sock_recv, fut, sock, n) 

371 fut.add_done_callback( 

372 functools.partial(self._sock_read_done, fd, handle=handle)) 

373 return await fut 

374 

375 def _sock_read_done(self, fd, fut, handle=None): 

376 if handle is None or not handle.cancelled(): 

377 self.remove_reader(fd) 

378 

379 def _sock_recv(self, fut, sock, n): 

380 # _sock_recv() can add itself as an I/O callback if the operation can't 

381 # be done immediately. Don't use it directly, call sock_recv(). 

382 if fut.done(): 

383 return 

384 try: 

385 data = sock.recv(n) 

386 except (BlockingIOError, InterruptedError): 

387 return # try again next time 

388 except (SystemExit, KeyboardInterrupt): 

389 raise 

390 except BaseException as exc: 

391 fut.set_exception(exc) 

392 else: 

393 fut.set_result(data) 

394 

395 async def sock_recv_into(self, sock, buf): 

396 """Receive data from the socket. 

397 

398 The received data is written into *buf* (a writable buffer). 

399 The return value is the number of bytes written. 

400 """ 

401 _check_ssl_socket(sock) 

402 if self._debug and sock.gettimeout() != 0: 

403 raise ValueError("the socket must be non-blocking") 

404 try: 

405 return sock.recv_into(buf) 

406 except (BlockingIOError, InterruptedError): 

407 pass 

408 fut = self.create_future() 

409 fd = sock.fileno() 

410 self._ensure_fd_no_transport(fd) 

411 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) 

412 fut.add_done_callback( 

413 functools.partial(self._sock_read_done, fd, handle=handle)) 

414 return await fut 

415 

416 def _sock_recv_into(self, fut, sock, buf): 

417 # _sock_recv_into() can add itself as an I/O callback if the operation 

418 # can't be done immediately. Don't use it directly, call 

419 # sock_recv_into(). 

420 if fut.done(): 

421 return 

422 try: 

423 nbytes = sock.recv_into(buf) 

424 except (BlockingIOError, InterruptedError): 

425 return # try again next time 

426 except (SystemExit, KeyboardInterrupt): 

427 raise 

428 except BaseException as exc: 

429 fut.set_exception(exc) 

430 else: 

431 fut.set_result(nbytes) 

432 

433 async def sock_sendall(self, sock, data): 

434 """Send data to the socket. 

435 

436 The socket must be connected to a remote socket. This method continues 

437 to send data from data until either all data has been sent or an 

438 error occurs. None is returned on success. On error, an exception is 

439 raised, and there is no way to determine how much data, if any, was 

440 successfully processed by the receiving end of the connection. 

441 """ 

442 _check_ssl_socket(sock) 

443 if self._debug and sock.gettimeout() != 0: 

444 raise ValueError("the socket must be non-blocking") 

445 try: 

446 n = sock.send(data) 

447 except (BlockingIOError, InterruptedError): 

448 n = 0 

449 

450 if n == len(data): 

451 # all data sent 

452 return 

453 

454 fut = self.create_future() 

455 fd = sock.fileno() 

456 self._ensure_fd_no_transport(fd) 

457 # use a trick with a list in closure to store a mutable state 

458 handle = self._add_writer(fd, self._sock_sendall, fut, sock, 

459 memoryview(data), [n]) 

460 fut.add_done_callback( 

461 functools.partial(self._sock_write_done, fd, handle=handle)) 

462 return await fut 

463 

464 def _sock_sendall(self, fut, sock, view, pos): 

465 if fut.done(): 

466 # Future cancellation can be scheduled on previous loop iteration 

467 return 

468 start = pos[0] 

469 try: 

470 n = sock.send(view[start:]) 

471 except (BlockingIOError, InterruptedError): 

472 return 

473 except (SystemExit, KeyboardInterrupt): 

474 raise 

475 except BaseException as exc: 

476 fut.set_exception(exc) 

477 return 

478 

479 start += n 

480 

481 if start == len(view): 

482 fut.set_result(None) 

483 else: 

484 pos[0] = start 

485 

486 async def sock_connect(self, sock, address): 

487 """Connect to a remote socket at address. 

488 

489 This method is a coroutine. 

490 """ 

491 _check_ssl_socket(sock) 

492 if self._debug and sock.gettimeout() != 0: 

493 raise ValueError("the socket must be non-blocking") 

494 

495 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: 

496 resolved = await self._ensure_resolved( 

497 address, family=sock.family, proto=sock.proto, loop=self) 

498 _, _, _, _, address = resolved[0] 

499 

500 fut = self.create_future() 

501 self._sock_connect(fut, sock, address) 

502 return await fut 

503 

504 def _sock_connect(self, fut, sock, address): 

505 fd = sock.fileno() 

506 try: 

507 sock.connect(address) 

508 except (BlockingIOError, InterruptedError): 

509 # Issue #23618: When the C function connect() fails with EINTR, the 

510 # connection runs in background. We have to wait until the socket 

511 # becomes writable to be notified when the connection succeed or 

512 # fails. 

513 self._ensure_fd_no_transport(fd) 

514 handle = self._add_writer( 

515 fd, self._sock_connect_cb, fut, sock, address) 

516 fut.add_done_callback( 

517 functools.partial(self._sock_write_done, fd, handle=handle)) 

518 except (SystemExit, KeyboardInterrupt): 

519 raise 

520 except BaseException as exc: 

521 fut.set_exception(exc) 

522 else: 

523 fut.set_result(None) 

524 

525 def _sock_write_done(self, fd, fut, handle=None): 

526 if handle is None or not handle.cancelled(): 

527 self.remove_writer(fd) 

528 

529 def _sock_connect_cb(self, fut, sock, address): 

530 if fut.done(): 

531 return 

532 

533 try: 

534 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

535 if err != 0: 

536 # Jump to any except clause below. 

537 raise OSError(err, f'Connect call failed {address}') 

538 except (BlockingIOError, InterruptedError): 

539 # socket is still registered, the callback will be retried later 

540 pass 

541 except (SystemExit, KeyboardInterrupt): 

542 raise 

543 except BaseException as exc: 

544 fut.set_exception(exc) 

545 else: 

546 fut.set_result(None) 

547 

548 async def sock_accept(self, sock): 

549 """Accept a connection. 

550 

551 The socket must be bound to an address and listening for connections. 

552 The return value is a pair (conn, address) where conn is a new socket 

553 object usable to send and receive data on the connection, and address 

554 is the address bound to the socket on the other end of the connection. 

555 """ 

556 _check_ssl_socket(sock) 

557 if self._debug and sock.gettimeout() != 0: 

558 raise ValueError("the socket must be non-blocking") 

559 fut = self.create_future() 

560 self._sock_accept(fut, sock) 

561 return await fut 

562 

563 def _sock_accept(self, fut, sock): 

564 fd = sock.fileno() 

565 try: 

566 conn, address = sock.accept() 

567 conn.setblocking(False) 

568 except (BlockingIOError, InterruptedError): 

569 self._ensure_fd_no_transport(fd) 

570 handle = self._add_reader(fd, self._sock_accept, fut, sock) 

571 fut.add_done_callback( 

572 functools.partial(self._sock_read_done, fd, handle=handle)) 

573 except (SystemExit, KeyboardInterrupt): 

574 raise 

575 except BaseException as exc: 

576 fut.set_exception(exc) 

577 else: 

578 fut.set_result((conn, address)) 

579 

580 async def _sendfile_native(self, transp, file, offset, count): 

581 del self._transports[transp._sock_fd] 

582 resume_reading = transp.is_reading() 

583 transp.pause_reading() 

584 await transp._make_empty_waiter() 

585 try: 

586 return await self.sock_sendfile(transp._sock, file, offset, count, 

587 fallback=False) 

588 finally: 

589 transp._reset_empty_waiter() 

590 if resume_reading: 

591 transp.resume_reading() 

592 self._transports[transp._sock_fd] = transp 

593 

594 def _process_events(self, event_list): 

595 for key, mask in event_list: 

596 fileobj, (reader, writer) = key.fileobj, key.data 

597 if mask & selectors.EVENT_READ and reader is not None: 

598 if reader._cancelled: 

599 self._remove_reader(fileobj) 

600 else: 

601 self._add_callback(reader) 

602 if mask & selectors.EVENT_WRITE and writer is not None: 

603 if writer._cancelled: 

604 self._remove_writer(fileobj) 

605 else: 

606 self._add_callback(writer) 

607 

608 def _stop_serving(self, sock): 

609 self._remove_reader(sock.fileno()) 

610 sock.close() 

611 

612 

613class _SelectorTransport(transports._FlowControlMixin, 

614 transports.Transport): 

615 

616 max_size = 256 * 1024 # Buffer size passed to recv(). 

617 

618 _buffer_factory = bytearray # Constructs initial value for self._buffer. 

619 

620 # Attribute used in the destructor: it must be set even if the constructor 

621 # is not called (see _SelectorSslTransport which may start by raising an 

622 # exception) 

623 _sock = None 

624 

625 def __init__(self, loop, sock, protocol, extra=None, server=None): 

626 super().__init__(extra, loop) 

627 self._extra['socket'] = trsock.TransportSocket(sock) 

628 try: 

629 self._extra['sockname'] = sock.getsockname() 

630 except OSError: 

631 self._extra['sockname'] = None 

632 if 'peername' not in self._extra: 

633 try: 

634 self._extra['peername'] = sock.getpeername() 

635 except socket.error: 

636 self._extra['peername'] = None 

637 self._sock = sock 

638 self._sock_fd = sock.fileno() 

639 

640 self._protocol_connected = False 

641 self.set_protocol(protocol) 

642 

643 self._server = server 

644 self._buffer = self._buffer_factory() 

645 self._conn_lost = 0 # Set when call to connection_lost scheduled. 

646 self._closing = False # Set when close() called. 

647 if self._server is not None: 

648 self._server._attach() 

649 loop._transports[self._sock_fd] = self 

650 

651 def __repr__(self): 

652 info = [self.__class__.__name__] 

653 if self._sock is None: 

654 info.append('closed') 

655 elif self._closing: 

656 info.append('closing') 

657 info.append(f'fd={self._sock_fd}') 

658 # test if the transport was closed 

659 if self._loop is not None and not self._loop.is_closed(): 

660 polling = _test_selector_event(self._loop._selector, 

661 self._sock_fd, selectors.EVENT_READ) 

662 if polling: 

663 info.append('read=polling') 

664 else: 

665 info.append('read=idle') 

666 

667 polling = _test_selector_event(self._loop._selector, 

668 self._sock_fd, 

669 selectors.EVENT_WRITE) 

670 if polling: 

671 state = 'polling' 

672 else: 

673 state = 'idle' 

674 

675 bufsize = self.get_write_buffer_size() 

676 info.append(f'write=<{state}, bufsize={bufsize}>') 

677 return '<{}>'.format(' '.join(info)) 

678 

679 def abort(self): 

680 self._force_close(None) 

681 

682 def set_protocol(self, protocol): 

683 self._protocol = protocol 

684 self._protocol_connected = True 

685 

686 def get_protocol(self): 

687 return self._protocol 

688 

689 def is_closing(self): 

690 return self._closing 

691 

692 def close(self): 

693 if self._closing: 

694 return 

695 self._closing = True 

696 self._loop._remove_reader(self._sock_fd) 

697 if not self._buffer: 

698 self._conn_lost += 1 

699 self._loop._remove_writer(self._sock_fd) 

700 self._loop.call_soon(self._call_connection_lost, None) 

701 

702 def __del__(self, _warn=warnings.warn): 

703 if self._sock is not None: 

704 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

705 self._sock.close() 

706 

707 def _fatal_error(self, exc, message='Fatal error on transport'): 

708 # Should be called from exception handler only. 

709 if isinstance(exc, OSError): 

710 if self._loop.get_debug(): 

711 logger.debug("%r: %s", self, message, exc_info=True) 

712 else: 

713 self._loop.call_exception_handler({ 

714 'message': message, 

715 'exception': exc, 

716 'transport': self, 

717 'protocol': self._protocol, 

718 }) 

719 self._force_close(exc) 

720 

721 def _force_close(self, exc): 

722 if self._conn_lost: 

723 return 

724 if self._buffer: 

725 self._buffer.clear() 

726 self._loop._remove_writer(self._sock_fd) 

727 if not self._closing: 

728 self._closing = True 

729 self._loop._remove_reader(self._sock_fd) 

730 self._conn_lost += 1 

731 self._loop.call_soon(self._call_connection_lost, exc) 

732 

733 def _call_connection_lost(self, exc): 

734 try: 

735 if self._protocol_connected: 

736 self._protocol.connection_lost(exc) 

737 finally: 

738 self._sock.close() 

739 self._sock = None 

740 self._protocol = None 

741 self._loop = None 

742 server = self._server 

743 if server is not None: 

744 server._detach() 

745 self._server = None 

746 

747 def get_write_buffer_size(self): 

748 return len(self._buffer) 

749 

750 def _add_reader(self, fd, callback, *args): 

751 if self._closing: 

752 return 

753 

754 self._loop._add_reader(fd, callback, *args) 

755 

756 

757class _SelectorSocketTransport(_SelectorTransport): 

758 

759 _start_tls_compatible = True 

760 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 

761 

762 def __init__(self, loop, sock, protocol, waiter=None, 

763 extra=None, server=None): 

764 

765 self._read_ready_cb = None 

766 super().__init__(loop, sock, protocol, extra, server) 

767 self._eof = False 

768 self._paused = False 

769 self._empty_waiter = None 

770 

771 # Disable the Nagle algorithm -- small writes will be 

772 # sent without waiting for the TCP ACK. This generally 

773 # decreases the latency (in some cases significantly.) 

774 base_events._set_nodelay(self._sock) 

775 

776 self._loop.call_soon(self._protocol.connection_made, self) 

777 # only start reading when connection_made() has been called 

778 self._loop.call_soon(self._add_reader, 

779 self._sock_fd, self._read_ready) 

780 if waiter is not None: 

781 # only wake up the waiter when connection_made() has been called 

782 self._loop.call_soon(futures._set_result_unless_cancelled, 

783 waiter, None) 

784 

785 def set_protocol(self, protocol): 

786 if isinstance(protocol, protocols.BufferedProtocol): 

787 self._read_ready_cb = self._read_ready__get_buffer 

788 else: 

789 self._read_ready_cb = self._read_ready__data_received 

790 

791 super().set_protocol(protocol) 

792 

793 def is_reading(self): 

794 return not self._paused and not self._closing 

795 

796 def pause_reading(self): 

797 if self._closing or self._paused: 

798 return 

799 self._paused = True 

800 self._loop._remove_reader(self._sock_fd) 

801 if self._loop.get_debug(): 

802 logger.debug("%r pauses reading", self) 

803 

804 def resume_reading(self): 

805 if self._closing or not self._paused: 

806 return 

807 self._paused = False 

808 self._add_reader(self._sock_fd, self._read_ready) 

809 if self._loop.get_debug(): 

810 logger.debug("%r resumes reading", self) 

811 

812 def _read_ready(self): 

813 self._read_ready_cb() 

814 

815 def _read_ready__get_buffer(self): 

816 if self._conn_lost: 

817 return 

818 

819 try: 

820 buf = self._protocol.get_buffer(-1) 

821 if not len(buf): 

822 raise RuntimeError('get_buffer() returned an empty buffer') 

823 except (SystemExit, KeyboardInterrupt): 

824 raise 

825 except BaseException as exc: 

826 self._fatal_error( 

827 exc, 'Fatal error: protocol.get_buffer() call failed.') 

828 return 

829 

830 try: 

831 nbytes = self._sock.recv_into(buf) 

832 except (BlockingIOError, InterruptedError): 

833 return 

834 except (SystemExit, KeyboardInterrupt): 

835 raise 

836 except BaseException as exc: 

837 self._fatal_error(exc, 'Fatal read error on socket transport') 

838 return 

839 

840 if not nbytes: 

841 self._read_ready__on_eof() 

842 return 

843 

844 try: 

845 self._protocol.buffer_updated(nbytes) 

846 except (SystemExit, KeyboardInterrupt): 

847 raise 

848 except BaseException as exc: 

849 self._fatal_error( 

850 exc, 'Fatal error: protocol.buffer_updated() call failed.') 

851 

852 def _read_ready__data_received(self): 

853 if self._conn_lost: 

854 return 

855 try: 

856 data = self._sock.recv(self.max_size) 

857 except (BlockingIOError, InterruptedError): 

858 return 

859 except (SystemExit, KeyboardInterrupt): 

860 raise 

861 except BaseException as exc: 

862 self._fatal_error(exc, 'Fatal read error on socket transport') 

863 return 

864 

865 if not data: 

866 self._read_ready__on_eof() 

867 return 

868 

869 try: 

870 self._protocol.data_received(data) 

871 except (SystemExit, KeyboardInterrupt): 

872 raise 

873 except BaseException as exc: 

874 self._fatal_error( 

875 exc, 'Fatal error: protocol.data_received() call failed.') 

876 

877 def _read_ready__on_eof(self): 

878 if self._loop.get_debug(): 

879 logger.debug("%r received EOF", self) 

880 

881 try: 

882 keep_open = self._protocol.eof_received() 

883 except (SystemExit, KeyboardInterrupt): 

884 raise 

885 except BaseException as exc: 

886 self._fatal_error( 

887 exc, 'Fatal error: protocol.eof_received() call failed.') 

888 return 

889 

890 if keep_open: 

891 # We're keeping the connection open so the 

892 # protocol can write more, but we still can't 

893 # receive more, so remove the reader callback. 

894 self._loop._remove_reader(self._sock_fd) 

895 else: 

896 self.close() 

897 

898 def write(self, data): 

899 if not isinstance(data, (bytes, bytearray, memoryview)): 

900 raise TypeError(f'data argument must be a bytes-like object, ' 

901 f'not {type(data).__name__!r}') 

902 if self._eof: 

903 raise RuntimeError('Cannot call write() after write_eof()') 

904 if self._empty_waiter is not None: 

905 raise RuntimeError('unable to write; sendfile is in progress') 

906 if not data: 

907 return 

908 

909 if self._conn_lost: 

910 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

911 logger.warning('socket.send() raised exception.') 

912 self._conn_lost += 1 

913 return 

914 

915 if not self._buffer: 

916 # Optimization: try to send now. 

917 try: 

918 n = self._sock.send(data) 

919 except (BlockingIOError, InterruptedError): 

920 pass 

921 except (SystemExit, KeyboardInterrupt): 

922 raise 

923 except BaseException as exc: 

924 self._fatal_error(exc, 'Fatal write error on socket transport') 

925 return 

926 else: 

927 data = data[n:] 

928 if not data: 

929 return 

930 # Not all was written; register write handler. 

931 self._loop._add_writer(self._sock_fd, self._write_ready) 

932 

933 # Add it to the buffer. 

934 self._buffer.extend(data) 

935 self._maybe_pause_protocol() 

936 

937 def _write_ready(self): 

938 assert self._buffer, 'Data should not be empty' 

939 

940 if self._conn_lost: 

941 return 

942 try: 

943 n = self._sock.send(self._buffer) 

944 except (BlockingIOError, InterruptedError): 

945 pass 

946 except (SystemExit, KeyboardInterrupt): 

947 raise 

948 except BaseException as exc: 

949 self._loop._remove_writer(self._sock_fd) 

950 self._buffer.clear() 

951 self._fatal_error(exc, 'Fatal write error on socket transport') 

952 if self._empty_waiter is not None: 

953 self._empty_waiter.set_exception(exc) 

954 else: 

955 if n: 

956 del self._buffer[:n] 

957 self._maybe_resume_protocol() # May append to buffer. 

958 if not self._buffer: 

959 self._loop._remove_writer(self._sock_fd) 

960 if self._empty_waiter is not None: 

961 self._empty_waiter.set_result(None) 

962 if self._closing: 

963 self._call_connection_lost(None) 

964 elif self._eof: 

965 self._sock.shutdown(socket.SHUT_WR) 

966 

967 def write_eof(self): 

968 if self._closing or self._eof: 

969 return 

970 self._eof = True 

971 if not self._buffer: 

972 self._sock.shutdown(socket.SHUT_WR) 

973 

974 def can_write_eof(self): 

975 return True 

976 

977 def _call_connection_lost(self, exc): 

978 super()._call_connection_lost(exc) 

979 if self._empty_waiter is not None: 

980 self._empty_waiter.set_exception( 

981 ConnectionError("Connection is closed by peer")) 

982 

983 def _make_empty_waiter(self): 

984 if self._empty_waiter is not None: 

985 raise RuntimeError("Empty waiter is already set") 

986 self._empty_waiter = self._loop.create_future() 

987 if not self._buffer: 

988 self._empty_waiter.set_result(None) 

989 return self._empty_waiter 

990 

991 def _reset_empty_waiter(self): 

992 self._empty_waiter = None 

993 

994 

995class _SelectorDatagramTransport(_SelectorTransport): 

996 

997 _buffer_factory = collections.deque 

998 

999 def __init__(self, loop, sock, protocol, address=None, 

1000 waiter=None, extra=None): 

1001 super().__init__(loop, sock, protocol, extra) 

1002 self._address = address 

1003 self._loop.call_soon(self._protocol.connection_made, self) 

1004 # only start reading when connection_made() has been called 

1005 self._loop.call_soon(self._add_reader, 

1006 self._sock_fd, self._read_ready) 

1007 if waiter is not None: 

1008 # only wake up the waiter when connection_made() has been called 

1009 self._loop.call_soon(futures._set_result_unless_cancelled, 

1010 waiter, None) 

1011 

1012 def get_write_buffer_size(self): 

1013 return sum(len(data) for data, _ in self._buffer) 

1014 

1015 def _read_ready(self): 

1016 if self._conn_lost: 

1017 return 

1018 try: 

1019 data, addr = self._sock.recvfrom(self.max_size) 

1020 except (BlockingIOError, InterruptedError): 

1021 pass 

1022 except OSError as exc: 

1023 self._protocol.error_received(exc) 

1024 except (SystemExit, KeyboardInterrupt): 

1025 raise 

1026 except BaseException as exc: 

1027 self._fatal_error(exc, 'Fatal read error on datagram transport') 

1028 else: 

1029 self._protocol.datagram_received(data, addr) 

1030 

1031 def sendto(self, data, addr=None): 

1032 if not isinstance(data, (bytes, bytearray, memoryview)): 

1033 raise TypeError(f'data argument must be a bytes-like object, ' 

1034 f'not {type(data).__name__!r}') 

1035 if not data: 

1036 return 

1037 

1038 if self._address: 

1039 if addr not in (None, self._address): 

1040 raise ValueError( 

1041 f'Invalid address: must be None or {self._address}') 

1042 addr = self._address 

1043 

1044 if self._conn_lost and self._address: 

1045 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

1046 logger.warning('socket.send() raised exception.') 

1047 self._conn_lost += 1 

1048 return 

1049 

1050 if not self._buffer: 

1051 # Attempt to send it right away first. 

1052 try: 

1053 if self._extra['peername']: 

1054 self._sock.send(data) 

1055 else: 

1056 self._sock.sendto(data, addr) 

1057 return 

1058 except (BlockingIOError, InterruptedError): 

1059 self._loop._add_writer(self._sock_fd, self._sendto_ready) 

1060 except OSError as exc: 

1061 self._protocol.error_received(exc) 

1062 return 

1063 except (SystemExit, KeyboardInterrupt): 

1064 raise 

1065 except BaseException as exc: 

1066 self._fatal_error( 

1067 exc, 'Fatal write error on datagram transport') 

1068 return 

1069 

1070 # Ensure that what we buffer is immutable. 

1071 self._buffer.append((bytes(data), addr)) 

1072 self._maybe_pause_protocol() 

1073 

1074 def _sendto_ready(self): 

1075 while self._buffer: 

1076 data, addr = self._buffer.popleft() 

1077 try: 

1078 if self._extra['peername']: 

1079 self._sock.send(data) 

1080 else: 

1081 self._sock.sendto(data, addr) 

1082 except (BlockingIOError, InterruptedError): 

1083 self._buffer.appendleft((data, addr)) # Try again later. 

1084 break 

1085 except OSError as exc: 

1086 self._protocol.error_received(exc) 

1087 return 

1088 except (SystemExit, KeyboardInterrupt): 

1089 raise 

1090 except BaseException as exc: 

1091 self._fatal_error( 

1092 exc, 'Fatal write error on datagram transport') 

1093 return 

1094 

1095 self._maybe_resume_protocol() # May append to buffer. 

1096 if not self._buffer: 

1097 self._loop._remove_writer(self._sock_fd) 

1098 if self._closing: 

1099 self._call_connection_lost(None)