Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_utils.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import select
2import socket
3import sys
4import typing
7def is_socket_readable(sock: typing.Optional[socket.socket]) -> bool:
8 """
9 Return whether a socket, as identifed by its file descriptor, is readable.
10 "A socket is readable" means that the read buffer isn't empty, i.e. that calling
11 .recv() on it would immediately return some data.
12 """
13 # NOTE: we want check for readability without actually attempting to read, because
14 # we don't want to block forever if it's not readable.
16 # In the case that the socket no longer exists, or cannot return a file
17 # descriptor, we treat it as being readable, as if it the next read operation
18 # on it is ready to return the terminating `b""`.
19 sock_fd = None if sock is None else sock.fileno()
20 if sock_fd is None or sock_fd < 0: # pragma: nocover
21 return True
23 # The implementation below was stolen from:
24 # https://github.com/python-trio/trio/blob/20ee2b1b7376db637435d80e266212a35837ddcc/trio/_socket.py#L471-L478
25 # See also: https://github.com/encode/httpcore/pull/193#issuecomment-703129316
27 # Use select.select on Windows, and when poll is unavailable and select.poll
28 # everywhere else. (E.g. When eventlet is in use. See #327)
29 if (
30 sys.platform == "win32" or getattr(select, "poll", None) is None
31 ): # pragma: nocover
32 rready, _, _ = select.select([sock_fd], [], [], 0)
33 return bool(rready)
34 p = select.poll()
35 p.register(sock_fd, select.POLLIN)
36 return bool(p.poll(0))