Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/selector_events.py: 14%

743 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1"""Event loop using a selector and related classes. 

2 

3A selector is a "notify-when-ready" multiplexer. For a subclass which 

4also includes support for signal handling, see the unix_events sub-module. 

5""" 

6 

7__all__ = 'BaseSelectorEventLoop', 

8 

9import collections 

10import errno 

11import functools 

12import selectors 

13import socket 

14import warnings 

15import weakref 

16try: 

17 import ssl 

18except ImportError: # pragma: no cover 

19 ssl = None 

20 

21from . import base_events 

22from . import constants 

23from . import events 

24from . import futures 

25from . import protocols 

26from . import sslproto 

27from . import transports 

28from . import trsock 

29from .log import logger 

30 

31 

32def _test_selector_event(selector, fd, event): 

33 # Test if the selector is monitoring 'event' events 

34 # for the file descriptor 'fd'. 

35 try: 

36 key = selector.get_key(fd) 

37 except KeyError: 

38 return False 

39 else: 

40 return bool(key.events & event) 

41 

42 

43def _check_ssl_socket(sock): 

44 if ssl is not None and isinstance(sock, ssl.SSLSocket): 

45 raise TypeError("Socket cannot be of type SSLSocket") 

46 

47 

48class BaseSelectorEventLoop(base_events.BaseEventLoop): 

49 """Selector event loop. 

50 

51 See events.EventLoop for API specification. 

52 """ 

53 

54 def __init__(self, selector=None): 

55 super().__init__() 

56 

57 if selector is None: 

58 selector = selectors.DefaultSelector() 

59 logger.debug('Using selector: %s', selector.__class__.__name__) 

60 self._selector = selector 

61 self._make_self_pipe() 

62 self._transports = weakref.WeakValueDictionary() 

63 

64 def _make_socket_transport(self, sock, protocol, waiter=None, *, 

65 extra=None, server=None): 

66 return _SelectorSocketTransport(self, sock, protocol, waiter, 

67 extra, server) 

68 

69 def _make_ssl_transport( 

70 self, rawsock, protocol, sslcontext, waiter=None, 

71 *, server_side=False, server_hostname=None, 

72 extra=None, server=None, 

73 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 

74 ssl_protocol = sslproto.SSLProtocol( 

75 self, protocol, sslcontext, waiter, 

76 server_side, server_hostname, 

77 ssl_handshake_timeout=ssl_handshake_timeout) 

78 _SelectorSocketTransport(self, rawsock, ssl_protocol, 

79 extra=extra, server=server) 

80 return ssl_protocol._app_transport 

81 

82 def _make_datagram_transport(self, sock, protocol, 

83 address=None, waiter=None, extra=None): 

84 return _SelectorDatagramTransport(self, sock, protocol, 

85 address, waiter, extra) 

86 

87 def close(self): 

88 if self.is_running(): 

89 raise RuntimeError("Cannot close a running event loop") 

90 if self.is_closed(): 

91 return 

92 self._close_self_pipe() 

93 super().close() 

94 if self._selector is not None: 

95 self._selector.close() 

96 self._selector = None 

97 

98 def _close_self_pipe(self): 

99 self._remove_reader(self._ssock.fileno()) 

100 self._ssock.close() 

101 self._ssock = None 

102 self._csock.close() 

103 self._csock = None 

104 self._internal_fds -= 1 

105 

106 def _make_self_pipe(self): 

107 # A self-socket, really. :-) 

108 self._ssock, self._csock = socket.socketpair() 

109 self._ssock.setblocking(False) 

110 self._csock.setblocking(False) 

111 self._internal_fds += 1 

112 self._add_reader(self._ssock.fileno(), self._read_from_self) 

113 

114 def _process_self_data(self, data): 

115 pass 

116 

117 def _read_from_self(self): 

118 while True: 

119 try: 

120 data = self._ssock.recv(4096) 

121 if not data: 

122 break 

123 self._process_self_data(data) 

124 except InterruptedError: 

125 continue 

126 except BlockingIOError: 

127 break 

128 

129 def _write_to_self(self): 

130 # This may be called from a different thread, possibly after 

131 # _close_self_pipe() has been called or even while it is 

132 # running. Guard for self._csock being None or closed. When 

133 # a socket is closed, send() raises OSError (with errno set to 

134 # EBADF, but let's not rely on the exact error code). 

135 csock = self._csock 

136 if csock is not None: 

137 try: 

138 csock.send(b'\0') 

139 except OSError: 

140 if self._debug: 

141 logger.debug("Fail to write a null byte into the " 

142 "self-pipe socket", 

143 exc_info=True) 

144 

145 def _start_serving(self, protocol_factory, sock, 

146 sslcontext=None, server=None, backlog=100, 

147 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 

148 self._add_reader(sock.fileno(), self._accept_connection, 

149 protocol_factory, sock, sslcontext, server, backlog, 

150 ssl_handshake_timeout) 

151 

152 def _accept_connection( 

153 self, protocol_factory, sock, 

154 sslcontext=None, server=None, backlog=100, 

155 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 

156 # This method is only called once for each event loop tick where the 

157 # listening socket has triggered an EVENT_READ. There may be multiple 

158 # connections waiting for an .accept() so it is called in a loop. 

159 # See https://bugs.python.org/issue27906 for more details. 

160 for _ in range(backlog): 

161 try: 

162 conn, addr = sock.accept() 

163 if self._debug: 

164 logger.debug("%r got a new connection from %r: %r", 

165 server, addr, conn) 

166 conn.setblocking(False) 

167 except (BlockingIOError, InterruptedError, ConnectionAbortedError): 

168 # Early exit because the socket accept buffer is empty. 

169 return None 

170 except OSError as exc: 

171 # There's nowhere to send the error, so just log it. 

172 if exc.errno in (errno.EMFILE, errno.ENFILE, 

173 errno.ENOBUFS, errno.ENOMEM): 

174 # Some platforms (e.g. Linux keep reporting the FD as 

175 # ready, so we remove the read handler temporarily. 

176 # We'll try again in a while. 

177 self.call_exception_handler({ 

178 'message': 'socket.accept() out of system resource', 

179 'exception': exc, 

180 'socket': trsock.TransportSocket(sock), 

181 }) 

182 self._remove_reader(sock.fileno()) 

183 self.call_later(constants.ACCEPT_RETRY_DELAY, 

184 self._start_serving, 

185 protocol_factory, sock, sslcontext, server, 

186 backlog, ssl_handshake_timeout) 

187 else: 

188 raise # The event loop will catch, log and ignore it. 

189 else: 

190 extra = {'peername': addr} 

191 accept = self._accept_connection2( 

192 protocol_factory, conn, extra, sslcontext, server, 

193 ssl_handshake_timeout) 

194 self.create_task(accept) 

195 

196 async def _accept_connection2( 

197 self, protocol_factory, conn, extra, 

198 sslcontext=None, server=None, 

199 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 

200 protocol = None 

201 transport = None 

202 try: 

203 protocol = protocol_factory() 

204 waiter = self.create_future() 

205 if sslcontext: 

206 transport = self._make_ssl_transport( 

207 conn, protocol, sslcontext, waiter=waiter, 

208 server_side=True, extra=extra, server=server, 

209 ssl_handshake_timeout=ssl_handshake_timeout) 

210 else: 

211 transport = self._make_socket_transport( 

212 conn, protocol, waiter=waiter, extra=extra, 

213 server=server) 

214 

215 try: 

216 await waiter 

217 except BaseException: 

218 transport.close() 

219 raise 

220 # It's now up to the protocol to handle the connection. 

221 

222 except (SystemExit, KeyboardInterrupt): 

223 raise 

224 except BaseException as exc: 

225 if self._debug: 

226 context = { 

227 'message': 

228 'Error on transport creation for incoming connection', 

229 'exception': exc, 

230 } 

231 if protocol is not None: 

232 context['protocol'] = protocol 

233 if transport is not None: 

234 context['transport'] = transport 

235 self.call_exception_handler(context) 

236 

237 def _ensure_fd_no_transport(self, fd): 

238 fileno = fd 

239 if not isinstance(fileno, int): 

240 try: 

241 fileno = int(fileno.fileno()) 

242 except (AttributeError, TypeError, ValueError): 

243 # This code matches selectors._fileobj_to_fd function. 

244 raise ValueError(f"Invalid file object: {fd!r}") from None 

245 try: 

246 transport = self._transports[fileno] 

247 except KeyError: 

248 pass 

249 else: 

250 if not transport.is_closing(): 

251 raise RuntimeError( 

252 f'File descriptor {fd!r} is used by transport ' 

253 f'{transport!r}') 

254 

255 def _add_reader(self, fd, callback, *args): 

256 self._check_closed() 

257 handle = events.Handle(callback, args, self, None) 

258 try: 

259 key = self._selector.get_key(fd) 

260 except KeyError: 

261 self._selector.register(fd, selectors.EVENT_READ, 

262 (handle, None)) 

263 else: 

264 mask, (reader, writer) = key.events, key.data 

265 self._selector.modify(fd, mask | selectors.EVENT_READ, 

266 (handle, writer)) 

267 if reader is not None: 

268 reader.cancel() 

269 

270 def _remove_reader(self, fd): 

271 if self.is_closed(): 

272 return False 

273 try: 

274 key = self._selector.get_key(fd) 

275 except KeyError: 

276 return False 

277 else: 

278 mask, (reader, writer) = key.events, key.data 

279 mask &= ~selectors.EVENT_READ 

280 if not mask: 

281 self._selector.unregister(fd) 

282 else: 

283 self._selector.modify(fd, mask, (None, writer)) 

284 

285 if reader is not None: 

286 reader.cancel() 

287 return True 

288 else: 

289 return False 

290 

291 def _add_writer(self, fd, callback, *args): 

292 self._check_closed() 

293 handle = events.Handle(callback, args, self, None) 

294 try: 

295 key = self._selector.get_key(fd) 

296 except KeyError: 

297 self._selector.register(fd, selectors.EVENT_WRITE, 

298 (None, handle)) 

299 else: 

300 mask, (reader, writer) = key.events, key.data 

301 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 

302 (reader, handle)) 

303 if writer is not None: 

304 writer.cancel() 

305 

306 def _remove_writer(self, fd): 

307 """Remove a writer callback.""" 

308 if self.is_closed(): 

309 return False 

310 try: 

311 key = self._selector.get_key(fd) 

312 except KeyError: 

313 return False 

314 else: 

315 mask, (reader, writer) = key.events, key.data 

316 # Remove both writer and connector. 

317 mask &= ~selectors.EVENT_WRITE 

318 if not mask: 

319 self._selector.unregister(fd) 

320 else: 

321 self._selector.modify(fd, mask, (reader, None)) 

322 

323 if writer is not None: 

324 writer.cancel() 

325 return True 

326 else: 

327 return False 

328 

329 def add_reader(self, fd, callback, *args): 

330 """Add a reader callback.""" 

331 self._ensure_fd_no_transport(fd) 

332 return self._add_reader(fd, callback, *args) 

333 

334 def remove_reader(self, fd): 

335 """Remove a reader callback.""" 

336 self._ensure_fd_no_transport(fd) 

337 return self._remove_reader(fd) 

338 

339 def add_writer(self, fd, callback, *args): 

340 """Add a writer callback..""" 

341 self._ensure_fd_no_transport(fd) 

342 return self._add_writer(fd, callback, *args) 

343 

344 def remove_writer(self, fd): 

345 """Remove a writer callback.""" 

346 self._ensure_fd_no_transport(fd) 

347 return self._remove_writer(fd) 

348 

349 async def sock_recv(self, sock, n): 

350 """Receive data from the socket. 

351 

352 The return value is a bytes object representing the data received. 

353 The maximum amount of data to be received at once is specified by 

354 nbytes. 

355 """ 

356 _check_ssl_socket(sock) 

357 if self._debug and sock.gettimeout() != 0: 

358 raise ValueError("the socket must be non-blocking") 

359 try: 

360 return sock.recv(n) 

361 except (BlockingIOError, InterruptedError): 

362 pass 

363 fut = self.create_future() 

364 fd = sock.fileno() 

365 self.add_reader(fd, self._sock_recv, fut, sock, n) 

366 fut.add_done_callback( 

367 functools.partial(self._sock_read_done, fd)) 

368 return await fut 

369 

370 def _sock_read_done(self, fd, fut): 

371 self.remove_reader(fd) 

372 

373 def _sock_recv(self, fut, sock, n): 

374 # _sock_recv() can add itself as an I/O callback if the operation can't 

375 # be done immediately. Don't use it directly, call sock_recv(). 

376 if fut.done(): 

377 return 

378 try: 

379 data = sock.recv(n) 

380 except (BlockingIOError, InterruptedError): 

381 return # try again next time 

382 except (SystemExit, KeyboardInterrupt): 

383 raise 

384 except BaseException as exc: 

385 fut.set_exception(exc) 

386 else: 

387 fut.set_result(data) 

388 

389 async def sock_recv_into(self, sock, buf): 

390 """Receive data from the socket. 

391 

392 The received data is written into *buf* (a writable buffer). 

393 The return value is the number of bytes written. 

394 """ 

395 _check_ssl_socket(sock) 

396 if self._debug and sock.gettimeout() != 0: 

397 raise ValueError("the socket must be non-blocking") 

398 try: 

399 return sock.recv_into(buf) 

400 except (BlockingIOError, InterruptedError): 

401 pass 

402 fut = self.create_future() 

403 fd = sock.fileno() 

404 self.add_reader(fd, self._sock_recv_into, fut, sock, buf) 

405 fut.add_done_callback( 

406 functools.partial(self._sock_read_done, fd)) 

407 return await fut 

408 

409 def _sock_recv_into(self, fut, sock, buf): 

410 # _sock_recv_into() can add itself as an I/O callback if the operation 

411 # can't be done immediately. Don't use it directly, call 

412 # sock_recv_into(). 

413 if fut.done(): 

414 return 

415 try: 

416 nbytes = sock.recv_into(buf) 

417 except (BlockingIOError, InterruptedError): 

418 return # try again next time 

419 except (SystemExit, KeyboardInterrupt): 

420 raise 

421 except BaseException as exc: 

422 fut.set_exception(exc) 

423 else: 

424 fut.set_result(nbytes) 

425 

426 async def sock_sendall(self, sock, data): 

427 """Send data to the socket. 

428 

429 The socket must be connected to a remote socket. This method continues 

430 to send data from data until either all data has been sent or an 

431 error occurs. None is returned on success. On error, an exception is 

432 raised, and there is no way to determine how much data, if any, was 

433 successfully processed by the receiving end of the connection. 

434 """ 

435 _check_ssl_socket(sock) 

436 if self._debug and sock.gettimeout() != 0: 

437 raise ValueError("the socket must be non-blocking") 

438 try: 

439 n = sock.send(data) 

440 except (BlockingIOError, InterruptedError): 

441 n = 0 

442 

443 if n == len(data): 

444 # all data sent 

445 return 

446 

447 fut = self.create_future() 

448 fd = sock.fileno() 

449 fut.add_done_callback( 

450 functools.partial(self._sock_write_done, fd)) 

451 # use a trick with a list in closure to store a mutable state 

452 self.add_writer(fd, self._sock_sendall, fut, sock, 

453 memoryview(data), [n]) 

454 return await fut 

455 

456 def _sock_sendall(self, fut, sock, view, pos): 

457 if fut.done(): 

458 # Future cancellation can be scheduled on previous loop iteration 

459 return 

460 start = pos[0] 

461 try: 

462 n = sock.send(view[start:]) 

463 except (BlockingIOError, InterruptedError): 

464 return 

465 except (SystemExit, KeyboardInterrupt): 

466 raise 

467 except BaseException as exc: 

468 fut.set_exception(exc) 

469 return 

470 

471 start += n 

472 

473 if start == len(view): 

474 fut.set_result(None) 

475 else: 

476 pos[0] = start 

477 

478 async def sock_connect(self, sock, address): 

479 """Connect to a remote socket at address. 

480 

481 This method is a coroutine. 

482 """ 

483 _check_ssl_socket(sock) 

484 if self._debug and sock.gettimeout() != 0: 

485 raise ValueError("the socket must be non-blocking") 

486 

487 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: 

488 resolved = await self._ensure_resolved( 

489 address, family=sock.family, proto=sock.proto, loop=self) 

490 _, _, _, _, address = resolved[0] 

491 

492 fut = self.create_future() 

493 self._sock_connect(fut, sock, address) 

494 return await fut 

495 

496 def _sock_connect(self, fut, sock, address): 

497 fd = sock.fileno() 

498 try: 

499 sock.connect(address) 

500 except (BlockingIOError, InterruptedError): 

501 # Issue #23618: When the C function connect() fails with EINTR, the 

502 # connection runs in background. We have to wait until the socket 

503 # becomes writable to be notified when the connection succeed or 

504 # fails. 

505 fut.add_done_callback( 

506 functools.partial(self._sock_write_done, fd)) 

507 self.add_writer(fd, self._sock_connect_cb, fut, sock, address) 

508 except (SystemExit, KeyboardInterrupt): 

509 raise 

510 except BaseException as exc: 

511 fut.set_exception(exc) 

512 else: 

513 fut.set_result(None) 

514 

515 def _sock_write_done(self, fd, fut): 

516 self.remove_writer(fd) 

517 

518 def _sock_connect_cb(self, fut, sock, address): 

519 if fut.done(): 

520 return 

521 

522 try: 

523 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 

524 if err != 0: 

525 # Jump to any except clause below. 

526 raise OSError(err, f'Connect call failed {address}') 

527 except (BlockingIOError, InterruptedError): 

528 # socket is still registered, the callback will be retried later 

529 pass 

530 except (SystemExit, KeyboardInterrupt): 

531 raise 

532 except BaseException as exc: 

533 fut.set_exception(exc) 

534 else: 

535 fut.set_result(None) 

536 

537 async def sock_accept(self, sock): 

538 """Accept a connection. 

539 

540 The socket must be bound to an address and listening for connections. 

541 The return value is a pair (conn, address) where conn is a new socket 

542 object usable to send and receive data on the connection, and address 

543 is the address bound to the socket on the other end of the connection. 

544 """ 

545 _check_ssl_socket(sock) 

546 if self._debug and sock.gettimeout() != 0: 

547 raise ValueError("the socket must be non-blocking") 

548 fut = self.create_future() 

549 self._sock_accept(fut, False, sock) 

550 return await fut 

551 

552 def _sock_accept(self, fut, registered, sock): 

553 fd = sock.fileno() 

554 if registered: 

555 self.remove_reader(fd) 

556 if fut.done(): 

557 return 

558 try: 

559 conn, address = sock.accept() 

560 conn.setblocking(False) 

561 except (BlockingIOError, InterruptedError): 

562 self.add_reader(fd, self._sock_accept, fut, True, sock) 

563 except (SystemExit, KeyboardInterrupt): 

564 raise 

565 except BaseException as exc: 

566 fut.set_exception(exc) 

567 else: 

568 fut.set_result((conn, address)) 

569 

570 async def _sendfile_native(self, transp, file, offset, count): 

571 del self._transports[transp._sock_fd] 

572 resume_reading = transp.is_reading() 

573 transp.pause_reading() 

574 await transp._make_empty_waiter() 

575 try: 

576 return await self.sock_sendfile(transp._sock, file, offset, count, 

577 fallback=False) 

578 finally: 

579 transp._reset_empty_waiter() 

580 if resume_reading: 

581 transp.resume_reading() 

582 self._transports[transp._sock_fd] = transp 

583 

584 def _process_events(self, event_list): 

585 for key, mask in event_list: 

586 fileobj, (reader, writer) = key.fileobj, key.data 

587 if mask & selectors.EVENT_READ and reader is not None: 

588 if reader._cancelled: 

589 self._remove_reader(fileobj) 

590 else: 

591 self._add_callback(reader) 

592 if mask & selectors.EVENT_WRITE and writer is not None: 

593 if writer._cancelled: 

594 self._remove_writer(fileobj) 

595 else: 

596 self._add_callback(writer) 

597 

598 def _stop_serving(self, sock): 

599 self._remove_reader(sock.fileno()) 

600 sock.close() 

601 

602 

603class _SelectorTransport(transports._FlowControlMixin, 

604 transports.Transport): 

605 

606 max_size = 256 * 1024 # Buffer size passed to recv(). 

607 

608 _buffer_factory = bytearray # Constructs initial value for self._buffer. 

609 

610 # Attribute used in the destructor: it must be set even if the constructor 

611 # is not called (see _SelectorSslTransport which may start by raising an 

612 # exception) 

613 _sock = None 

614 

615 def __init__(self, loop, sock, protocol, extra=None, server=None): 

616 super().__init__(extra, loop) 

617 self._extra['socket'] = trsock.TransportSocket(sock) 

618 try: 

619 self._extra['sockname'] = sock.getsockname() 

620 except OSError: 

621 self._extra['sockname'] = None 

622 if 'peername' not in self._extra: 

623 try: 

624 self._extra['peername'] = sock.getpeername() 

625 except socket.error: 

626 self._extra['peername'] = None 

627 self._sock = sock 

628 self._sock_fd = sock.fileno() 

629 

630 self._protocol_connected = False 

631 self.set_protocol(protocol) 

632 

633 self._server = server 

634 self._buffer = self._buffer_factory() 

635 self._conn_lost = 0 # Set when call to connection_lost scheduled. 

636 self._closing = False # Set when close() called. 

637 if self._server is not None: 

638 self._server._attach() 

639 loop._transports[self._sock_fd] = self 

640 

641 def __repr__(self): 

642 info = [self.__class__.__name__] 

643 if self._sock is None: 

644 info.append('closed') 

645 elif self._closing: 

646 info.append('closing') 

647 info.append(f'fd={self._sock_fd}') 

648 # test if the transport was closed 

649 if self._loop is not None and not self._loop.is_closed(): 

650 polling = _test_selector_event(self._loop._selector, 

651 self._sock_fd, selectors.EVENT_READ) 

652 if polling: 

653 info.append('read=polling') 

654 else: 

655 info.append('read=idle') 

656 

657 polling = _test_selector_event(self._loop._selector, 

658 self._sock_fd, 

659 selectors.EVENT_WRITE) 

660 if polling: 

661 state = 'polling' 

662 else: 

663 state = 'idle' 

664 

665 bufsize = self.get_write_buffer_size() 

666 info.append(f'write=<{state}, bufsize={bufsize}>') 

667 return '<{}>'.format(' '.join(info)) 

668 

669 def abort(self): 

670 self._force_close(None) 

671 

672 def set_protocol(self, protocol): 

673 self._protocol = protocol 

674 self._protocol_connected = True 

675 

676 def get_protocol(self): 

677 return self._protocol 

678 

679 def is_closing(self): 

680 return self._closing 

681 

682 def close(self): 

683 if self._closing: 

684 return 

685 self._closing = True 

686 self._loop._remove_reader(self._sock_fd) 

687 if not self._buffer: 

688 self._conn_lost += 1 

689 self._loop._remove_writer(self._sock_fd) 

690 self._loop.call_soon(self._call_connection_lost, None) 

691 

692 def __del__(self, _warn=warnings.warn): 

693 if self._sock is not None: 

694 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 

695 self._sock.close() 

696 

697 def _fatal_error(self, exc, message='Fatal error on transport'): 

698 # Should be called from exception handler only. 

699 if isinstance(exc, OSError): 

700 if self._loop.get_debug(): 

701 logger.debug("%r: %s", self, message, exc_info=True) 

702 else: 

703 self._loop.call_exception_handler({ 

704 'message': message, 

705 'exception': exc, 

706 'transport': self, 

707 'protocol': self._protocol, 

708 }) 

709 self._force_close(exc) 

710 

711 def _force_close(self, exc): 

712 if self._conn_lost: 

713 return 

714 if self._buffer: 

715 self._buffer.clear() 

716 self._loop._remove_writer(self._sock_fd) 

717 if not self._closing: 

718 self._closing = True 

719 self._loop._remove_reader(self._sock_fd) 

720 self._conn_lost += 1 

721 self._loop.call_soon(self._call_connection_lost, exc) 

722 

723 def _call_connection_lost(self, exc): 

724 try: 

725 if self._protocol_connected: 

726 self._protocol.connection_lost(exc) 

727 finally: 

728 self._sock.close() 

729 self._sock = None 

730 self._protocol = None 

731 self._loop = None 

732 server = self._server 

733 if server is not None: 

734 server._detach() 

735 self._server = None 

736 

737 def get_write_buffer_size(self): 

738 return len(self._buffer) 

739 

740 def _add_reader(self, fd, callback, *args): 

741 if self._closing: 

742 return 

743 

744 self._loop._add_reader(fd, callback, *args) 

745 

746 

747class _SelectorSocketTransport(_SelectorTransport): 

748 

749 _start_tls_compatible = True 

750 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 

751 

752 def __init__(self, loop, sock, protocol, waiter=None, 

753 extra=None, server=None): 

754 

755 self._read_ready_cb = None 

756 super().__init__(loop, sock, protocol, extra, server) 

757 self._eof = False 

758 self._paused = False 

759 self._empty_waiter = None 

760 

761 # Disable the Nagle algorithm -- small writes will be 

762 # sent without waiting for the TCP ACK. This generally 

763 # decreases the latency (in some cases significantly.) 

764 base_events._set_nodelay(self._sock) 

765 

766 self._loop.call_soon(self._protocol.connection_made, self) 

767 # only start reading when connection_made() has been called 

768 self._loop.call_soon(self._add_reader, 

769 self._sock_fd, self._read_ready) 

770 if waiter is not None: 

771 # only wake up the waiter when connection_made() has been called 

772 self._loop.call_soon(futures._set_result_unless_cancelled, 

773 waiter, None) 

774 

775 def set_protocol(self, protocol): 

776 if isinstance(protocol, protocols.BufferedProtocol): 

777 self._read_ready_cb = self._read_ready__get_buffer 

778 else: 

779 self._read_ready_cb = self._read_ready__data_received 

780 

781 super().set_protocol(protocol) 

782 

783 def is_reading(self): 

784 return not self._paused and not self._closing 

785 

786 def pause_reading(self): 

787 if self._closing or self._paused: 

788 return 

789 self._paused = True 

790 self._loop._remove_reader(self._sock_fd) 

791 if self._loop.get_debug(): 

792 logger.debug("%r pauses reading", self) 

793 

794 def resume_reading(self): 

795 if self._closing or not self._paused: 

796 return 

797 self._paused = False 

798 self._add_reader(self._sock_fd, self._read_ready) 

799 if self._loop.get_debug(): 

800 logger.debug("%r resumes reading", self) 

801 

802 def _read_ready(self): 

803 self._read_ready_cb() 

804 

805 def _read_ready__get_buffer(self): 

806 if self._conn_lost: 

807 return 

808 

809 try: 

810 buf = self._protocol.get_buffer(-1) 

811 if not len(buf): 

812 raise RuntimeError('get_buffer() returned an empty buffer') 

813 except (SystemExit, KeyboardInterrupt): 

814 raise 

815 except BaseException as exc: 

816 self._fatal_error( 

817 exc, 'Fatal error: protocol.get_buffer() call failed.') 

818 return 

819 

820 try: 

821 nbytes = self._sock.recv_into(buf) 

822 except (BlockingIOError, InterruptedError): 

823 return 

824 except (SystemExit, KeyboardInterrupt): 

825 raise 

826 except BaseException as exc: 

827 self._fatal_error(exc, 'Fatal read error on socket transport') 

828 return 

829 

830 if not nbytes: 

831 self._read_ready__on_eof() 

832 return 

833 

834 try: 

835 self._protocol.buffer_updated(nbytes) 

836 except (SystemExit, KeyboardInterrupt): 

837 raise 

838 except BaseException as exc: 

839 self._fatal_error( 

840 exc, 'Fatal error: protocol.buffer_updated() call failed.') 

841 

842 def _read_ready__data_received(self): 

843 if self._conn_lost: 

844 return 

845 try: 

846 data = self._sock.recv(self.max_size) 

847 except (BlockingIOError, InterruptedError): 

848 return 

849 except (SystemExit, KeyboardInterrupt): 

850 raise 

851 except BaseException as exc: 

852 self._fatal_error(exc, 'Fatal read error on socket transport') 

853 return 

854 

855 if not data: 

856 self._read_ready__on_eof() 

857 return 

858 

859 try: 

860 self._protocol.data_received(data) 

861 except (SystemExit, KeyboardInterrupt): 

862 raise 

863 except BaseException as exc: 

864 self._fatal_error( 

865 exc, 'Fatal error: protocol.data_received() call failed.') 

866 

867 def _read_ready__on_eof(self): 

868 if self._loop.get_debug(): 

869 logger.debug("%r received EOF", self) 

870 

871 try: 

872 keep_open = self._protocol.eof_received() 

873 except (SystemExit, KeyboardInterrupt): 

874 raise 

875 except BaseException as exc: 

876 self._fatal_error( 

877 exc, 'Fatal error: protocol.eof_received() call failed.') 

878 return 

879 

880 if keep_open: 

881 # We're keeping the connection open so the 

882 # protocol can write more, but we still can't 

883 # receive more, so remove the reader callback. 

884 self._loop._remove_reader(self._sock_fd) 

885 else: 

886 self.close() 

887 

888 def write(self, data): 

889 if not isinstance(data, (bytes, bytearray, memoryview)): 

890 raise TypeError(f'data argument must be a bytes-like object, ' 

891 f'not {type(data).__name__!r}') 

892 if self._eof: 

893 raise RuntimeError('Cannot call write() after write_eof()') 

894 if self._empty_waiter is not None: 

895 raise RuntimeError('unable to write; sendfile is in progress') 

896 if not data: 

897 return 

898 

899 if self._conn_lost: 

900 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

901 logger.warning('socket.send() raised exception.') 

902 self._conn_lost += 1 

903 return 

904 

905 if not self._buffer: 

906 # Optimization: try to send now. 

907 try: 

908 n = self._sock.send(data) 

909 except (BlockingIOError, InterruptedError): 

910 pass 

911 except (SystemExit, KeyboardInterrupt): 

912 raise 

913 except BaseException as exc: 

914 self._fatal_error(exc, 'Fatal write error on socket transport') 

915 return 

916 else: 

917 data = data[n:] 

918 if not data: 

919 return 

920 # Not all was written; register write handler. 

921 self._loop._add_writer(self._sock_fd, self._write_ready) 

922 

923 # Add it to the buffer. 

924 self._buffer.extend(data) 

925 self._maybe_pause_protocol() 

926 

927 def _write_ready(self): 

928 assert self._buffer, 'Data should not be empty' 

929 

930 if self._conn_lost: 

931 return 

932 try: 

933 n = self._sock.send(self._buffer) 

934 except (BlockingIOError, InterruptedError): 

935 pass 

936 except (SystemExit, KeyboardInterrupt): 

937 raise 

938 except BaseException as exc: 

939 self._loop._remove_writer(self._sock_fd) 

940 self._buffer.clear() 

941 self._fatal_error(exc, 'Fatal write error on socket transport') 

942 if self._empty_waiter is not None: 

943 self._empty_waiter.set_exception(exc) 

944 else: 

945 if n: 

946 del self._buffer[:n] 

947 self._maybe_resume_protocol() # May append to buffer. 

948 if not self._buffer: 

949 self._loop._remove_writer(self._sock_fd) 

950 if self._empty_waiter is not None: 

951 self._empty_waiter.set_result(None) 

952 if self._closing: 

953 self._call_connection_lost(None) 

954 elif self._eof: 

955 self._sock.shutdown(socket.SHUT_WR) 

956 

957 def write_eof(self): 

958 if self._closing or self._eof: 

959 return 

960 self._eof = True 

961 if not self._buffer: 

962 self._sock.shutdown(socket.SHUT_WR) 

963 

964 def can_write_eof(self): 

965 return True 

966 

967 def _call_connection_lost(self, exc): 

968 super()._call_connection_lost(exc) 

969 if self._empty_waiter is not None: 

970 self._empty_waiter.set_exception( 

971 ConnectionError("Connection is closed by peer")) 

972 

973 def _make_empty_waiter(self): 

974 if self._empty_waiter is not None: 

975 raise RuntimeError("Empty waiter is already set") 

976 self._empty_waiter = self._loop.create_future() 

977 if not self._buffer: 

978 self._empty_waiter.set_result(None) 

979 return self._empty_waiter 

980 

981 def _reset_empty_waiter(self): 

982 self._empty_waiter = None 

983 

984 

985class _SelectorDatagramTransport(_SelectorTransport): 

986 

987 _buffer_factory = collections.deque 

988 

989 def __init__(self, loop, sock, protocol, address=None, 

990 waiter=None, extra=None): 

991 super().__init__(loop, sock, protocol, extra) 

992 self._address = address 

993 self._loop.call_soon(self._protocol.connection_made, self) 

994 # only start reading when connection_made() has been called 

995 self._loop.call_soon(self._add_reader, 

996 self._sock_fd, self._read_ready) 

997 if waiter is not None: 

998 # only wake up the waiter when connection_made() has been called 

999 self._loop.call_soon(futures._set_result_unless_cancelled, 

1000 waiter, None) 

1001 

1002 def get_write_buffer_size(self): 

1003 return sum(len(data) for data, _ in self._buffer) 

1004 

1005 def _read_ready(self): 

1006 if self._conn_lost: 

1007 return 

1008 try: 

1009 data, addr = self._sock.recvfrom(self.max_size) 

1010 except (BlockingIOError, InterruptedError): 

1011 pass 

1012 except OSError as exc: 

1013 self._protocol.error_received(exc) 

1014 except (SystemExit, KeyboardInterrupt): 

1015 raise 

1016 except BaseException as exc: 

1017 self._fatal_error(exc, 'Fatal read error on datagram transport') 

1018 else: 

1019 self._protocol.datagram_received(data, addr) 

1020 

1021 def sendto(self, data, addr=None): 

1022 if not isinstance(data, (bytes, bytearray, memoryview)): 

1023 raise TypeError(f'data argument must be a bytes-like object, ' 

1024 f'not {type(data).__name__!r}') 

1025 if not data: 

1026 return 

1027 

1028 if self._address: 

1029 if addr not in (None, self._address): 

1030 raise ValueError( 

1031 f'Invalid address: must be None or {self._address}') 

1032 addr = self._address 

1033 

1034 if self._conn_lost and self._address: 

1035 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 

1036 logger.warning('socket.send() raised exception.') 

1037 self._conn_lost += 1 

1038 return 

1039 

1040 if not self._buffer: 

1041 # Attempt to send it right away first. 

1042 try: 

1043 if self._extra['peername']: 

1044 self._sock.send(data) 

1045 else: 

1046 self._sock.sendto(data, addr) 

1047 return 

1048 except (BlockingIOError, InterruptedError): 

1049 self._loop._add_writer(self._sock_fd, self._sendto_ready) 

1050 except OSError as exc: 

1051 self._protocol.error_received(exc) 

1052 return 

1053 except (SystemExit, KeyboardInterrupt): 

1054 raise 

1055 except BaseException as exc: 

1056 self._fatal_error( 

1057 exc, 'Fatal write error on datagram transport') 

1058 return 

1059 

1060 # Ensure that what we buffer is immutable. 

1061 self._buffer.append((bytes(data), addr)) 

1062 self._maybe_pause_protocol() 

1063 

1064 def _sendto_ready(self): 

1065 while self._buffer: 

1066 data, addr = self._buffer.popleft() 

1067 try: 

1068 if self._extra['peername']: 

1069 self._sock.send(data) 

1070 else: 

1071 self._sock.sendto(data, addr) 

1072 except (BlockingIOError, InterruptedError): 

1073 self._buffer.appendleft((data, addr)) # Try again later. 

1074 break 

1075 except OSError as exc: 

1076 self._protocol.error_received(exc) 

1077 return 

1078 except (SystemExit, KeyboardInterrupt): 

1079 raise 

1080 except BaseException as exc: 

1081 self._fatal_error( 

1082 exc, 'Fatal write error on datagram transport') 

1083 return 

1084 

1085 self._maybe_resume_protocol() # May append to buffer. 

1086 if not self._buffer: 

1087 self._loop._remove_writer(self._sock_fd) 

1088 if self._closing: 

1089 self._call_connection_lost(None)