Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_sockets.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

141 statements  

1from __future__ import annotations 

2 

3import errno 

4import socket 

5from abc import abstractmethod 

6from collections.abc import Callable, Collection, Mapping 

7from contextlib import AsyncExitStack 

8from io import IOBase 

9from ipaddress import IPv4Address, IPv6Address 

10from socket import AddressFamily 

11from typing import Any, TypeAlias, TypeVar 

12 

13from .._core._eventloop import get_async_backend 

14from .._core._typedattr import ( 

15 TypedAttributeProvider, 

16 TypedAttributeSet, 

17 typed_attribute, 

18) 

19from ._streams import ByteStream, Listener, UnreliableObjectStream 

20from ._tasks import TaskGroup 

21 

22IPAddressType: TypeAlias = str | IPv4Address | IPv6Address 

23IPSockAddrType: TypeAlias = tuple[str, int] 

24SockAddrType: TypeAlias = IPSockAddrType | str 

25UDPPacketType: TypeAlias = tuple[bytes, IPSockAddrType] 

26UNIXDatagramPacketType: TypeAlias = tuple[bytes, str] 

27T_Retval = TypeVar("T_Retval") 

28 

29 

30def _validate_socket( 

31 sock_or_fd: socket.socket | int, 

32 sock_type: socket.SocketKind, 

33 addr_family: socket.AddressFamily = socket.AF_UNSPEC, 

34 *, 

35 require_connected: bool = False, 

36 require_bound: bool = False, 

37) -> socket.socket: 

38 if isinstance(sock_or_fd, int): 

39 try: 

40 sock = socket.socket(fileno=sock_or_fd) 

41 except OSError as exc: 

42 if exc.errno == errno.ENOTSOCK: 

43 raise ValueError( 

44 "the file descriptor does not refer to a socket" 

45 ) from exc 

46 elif require_connected: 

47 raise ValueError("the socket must be connected") from exc 

48 elif require_bound: 

49 raise ValueError("the socket must be bound to a local address") from exc 

50 else: 

51 raise 

52 elif isinstance(sock_or_fd, socket.socket): 

53 sock = sock_or_fd 

54 else: 

55 raise TypeError( 

56 f"expected an int or socket, got {type(sock_or_fd).__qualname__} instead" 

57 ) 

58 

59 try: 

60 if require_connected: 

61 try: 

62 sock.getpeername() 

63 except OSError as exc: 

64 raise ValueError("the socket must be connected") from exc 

65 

66 if require_bound: 

67 try: 

68 if sock.family in (socket.AF_INET, socket.AF_INET6): 

69 bound_addr = sock.getsockname()[1] 

70 else: 

71 bound_addr = sock.getsockname() 

72 except OSError: 

73 bound_addr = None 

74 

75 if not bound_addr: 

76 raise ValueError("the socket must be bound to a local address") 

77 

78 if addr_family != socket.AF_UNSPEC and sock.family != addr_family: 

79 raise ValueError( 

80 f"address family mismatch: expected {addr_family.name}, got " 

81 f"{sock.family.name}" 

82 ) 

83 

84 if sock.type != sock_type: 

85 raise ValueError( 

86 f"socket type mismatch: expected {sock_type.name}, got {sock.type.name}" 

87 ) 

88 except BaseException: 

89 # Avoid ResourceWarning from the locally constructed socket object 

90 if isinstance(sock_or_fd, int): 

91 sock.detach() 

92 

93 raise 

94 

95 sock.setblocking(False) 

96 return sock 

97 

98 

99class SocketAttribute(TypedAttributeSet): 

100 """ 

101 .. attribute:: family 

102 :type: socket.AddressFamily 

103 

104 the address family of the underlying socket 

105 

106 .. attribute:: local_address 

107 :type: tuple[str, int] | str 

108 

109 the local address the underlying socket is connected to 

110 

111 .. attribute:: local_port 

112 :type: int 

113 

114 for IP based sockets, the local port the underlying socket is bound to 

115 

116 .. attribute:: raw_socket 

117 :type: socket.socket 

118 

119 the underlying stdlib socket object 

120 

121 .. attribute:: remote_address 

122 :type: tuple[str, int] | str 

123 

124 the remote address the underlying socket is connected to 

125 

126 .. attribute:: remote_port 

127 :type: int 

128 

129 for IP based sockets, the remote port the underlying socket is connected to 

130 """ 

131 

132 family: AddressFamily = typed_attribute() 

133 local_address: SockAddrType = typed_attribute() 

134 local_port: int = typed_attribute() 

135 raw_socket: socket.socket = typed_attribute() 

136 remote_address: SockAddrType = typed_attribute() 

137 remote_port: int = typed_attribute() 

138 

139 

140class _SocketProvider(TypedAttributeProvider): 

141 @property 

142 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

143 from .._core._sockets import convert_ipv6_sockaddr as convert 

144 

145 attributes: dict[Any, Callable[[], Any]] = { 

146 SocketAttribute.family: lambda: self._raw_socket.family, 

147 SocketAttribute.local_address: lambda: convert( 

148 self._raw_socket.getsockname() 

149 ), 

150 SocketAttribute.raw_socket: lambda: self._raw_socket, 

151 } 

152 try: 

153 peername: tuple[str, int] | None = convert(self._raw_socket.getpeername()) 

154 except OSError: 

155 peername = None 

156 

157 # Provide the remote address for connected sockets 

158 if peername is not None: 

159 attributes[SocketAttribute.remote_address] = lambda: peername 

160 

161 # Provide local and remote ports for IP based sockets 

162 if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6): 

163 attributes[SocketAttribute.local_port] = lambda: ( 

164 self._raw_socket.getsockname()[1] 

165 ) 

166 if peername is not None: 

167 remote_port = peername[1] 

168 attributes[SocketAttribute.remote_port] = lambda: remote_port 

169 

170 return attributes 

171 

172 @property 

173 @abstractmethod 

174 def _raw_socket(self) -> socket.socket: 

175 pass 

176 

177 

178class SocketStream(ByteStream, _SocketProvider): 

179 """ 

180 Transports bytes over a socket. 

181 

182 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

183 """ 

184 

185 @classmethod 

186 async def from_socket(cls, sock_or_fd: socket.socket | int) -> SocketStream: 

187 """ 

188 Wrap an existing socket object or file descriptor as a socket stream. 

189 

190 The newly created socket wrapper takes ownership of the socket being passed in. 

191 The existing socket must already be connected. 

192 

193 :param sock_or_fd: a socket object or file descriptor 

194 :return: a socket stream 

195 

196 """ 

197 sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_connected=True) 

198 return await get_async_backend().wrap_stream_socket(sock) 

199 

200 

201class UNIXSocketStream(SocketStream): 

202 @classmethod 

203 async def from_socket(cls, sock_or_fd: socket.socket | int) -> UNIXSocketStream: 

204 """ 

205 Wrap an existing socket object or file descriptor as a UNIX socket stream. 

206 

207 The newly created socket wrapper takes ownership of the socket being passed in. 

208 The existing socket must already be connected. 

209 

210 :param sock_or_fd: a socket object or file descriptor 

211 :return: a UNIX socket stream 

212 

213 """ 

214 sock = _validate_socket( 

215 sock_or_fd, socket.SOCK_STREAM, socket.AF_UNIX, require_connected=True 

216 ) 

217 return await get_async_backend().wrap_unix_stream_socket(sock) 

218 

219 @abstractmethod 

220 async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None: 

221 """ 

222 Send file descriptors along with a message to the peer. 

223 

224 :param message: a non-empty bytestring 

225 :param fds: a collection of files (either numeric file descriptors or open file 

226 or socket objects) 

227 """ 

228 

229 @abstractmethod 

230 async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]: 

231 """ 

232 Receive file descriptors along with a message from the peer. 

233 

234 :param msglen: length of the message to expect from the peer 

235 :param maxfds: maximum number of file descriptors to expect from the peer 

236 :return: a tuple of (message, file descriptors) 

237 """ 

238 

239 

240class SocketListener(Listener[SocketStream], _SocketProvider): 

241 """ 

242 Listens to incoming socket connections. 

243 

244 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

245 """ 

246 

247 @classmethod 

248 async def from_socket( 

249 cls, 

250 sock_or_fd: socket.socket | int, 

251 ) -> SocketListener: 

252 """ 

253 Wrap an existing socket object or file descriptor as a socket listener. 

254 

255 The newly created listener takes ownership of the socket being passed in. 

256 

257 :param sock_or_fd: a socket object or file descriptor 

258 :return: a socket listener 

259 

260 """ 

261 sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_bound=True) 

262 return await get_async_backend().wrap_listener_socket(sock) 

263 

264 @abstractmethod 

265 async def accept(self) -> SocketStream: 

266 """Accept an incoming connection.""" 

267 

268 async def serve( 

269 self, 

270 handler: Callable[[SocketStream], Any], 

271 task_group: TaskGroup | None = None, 

272 ) -> None: 

273 from .. import create_task_group 

274 

275 async with AsyncExitStack() as stack: 

276 if task_group is None: 

277 task_group = await stack.enter_async_context(create_task_group()) 

278 

279 while True: 

280 stream = await self.accept() 

281 task_group.start_soon(handler, stream) 

282 

283 

284class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider): 

285 """ 

286 Represents an unconnected UDP socket. 

287 

288 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

289 """ 

290 

291 @classmethod 

292 async def from_socket(cls, sock_or_fd: socket.socket | int) -> UDPSocket: 

293 """ 

294 Wrap an existing socket object or file descriptor as a UDP socket. 

295 

296 The newly created socket wrapper takes ownership of the socket being passed in. 

297 The existing socket must be bound to a local address. 

298 

299 :param sock_or_fd: a socket object or file descriptor 

300 :return: a UDP socket 

301 

302 """ 

303 sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, require_bound=True) 

304 return await get_async_backend().wrap_udp_socket(sock) 

305 

306 async def sendto(self, data: bytes, host: str, port: int) -> None: 

307 """ 

308 Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port))). 

309 

310 """ 

311 return await self.send((data, (host, port))) 

312 

313 

314class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider): 

315 """ 

316 Represents an connected UDP socket. 

317 

318 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

319 """ 

320 

321 @classmethod 

322 async def from_socket(cls, sock_or_fd: socket.socket | int) -> ConnectedUDPSocket: 

323 """ 

324 Wrap an existing socket object or file descriptor as a connected UDP socket. 

325 

326 The newly created socket wrapper takes ownership of the socket being passed in. 

327 The existing socket must already be connected. 

328 

329 :param sock_or_fd: a socket object or file descriptor 

330 :return: a connected UDP socket 

331 

332 """ 

333 sock = _validate_socket( 

334 sock_or_fd, 

335 socket.SOCK_DGRAM, 

336 require_connected=True, 

337 ) 

338 return await get_async_backend().wrap_connected_udp_socket(sock) 

339 

340 

341class UNIXDatagramSocket( 

342 UnreliableObjectStream[UNIXDatagramPacketType], _SocketProvider 

343): 

344 """ 

345 Represents an unconnected Unix datagram socket. 

346 

347 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

348 """ 

349 

350 @classmethod 

351 async def from_socket( 

352 cls, 

353 sock_or_fd: socket.socket | int, 

354 ) -> UNIXDatagramSocket: 

355 """ 

356 Wrap an existing socket object or file descriptor as a UNIX datagram 

357 socket. 

358 

359 The newly created socket wrapper takes ownership of the socket being passed in. 

360 

361 :param sock_or_fd: a socket object or file descriptor 

362 :return: a UNIX datagram socket 

363 

364 """ 

365 sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX) 

366 return await get_async_backend().wrap_unix_datagram_socket(sock) 

367 

368 async def sendto(self, data: bytes, path: str) -> None: 

369 """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, path)).""" 

370 return await self.send((data, path)) 

371 

372 

373class ConnectedUNIXDatagramSocket(UnreliableObjectStream[bytes], _SocketProvider): 

374 """ 

375 Represents a connected Unix datagram socket. 

376 

377 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

378 """ 

379 

380 @classmethod 

381 async def from_socket( 

382 cls, 

383 sock_or_fd: socket.socket | int, 

384 ) -> ConnectedUNIXDatagramSocket: 

385 """ 

386 Wrap an existing socket object or file descriptor as a connected UNIX datagram 

387 socket. 

388 

389 The newly created socket wrapper takes ownership of the socket being passed in. 

390 The existing socket must already be connected. 

391 

392 :param sock_or_fd: a socket object or file descriptor 

393 :return: a connected UNIX datagram socket 

394 

395 """ 

396 sock = _validate_socket( 

397 sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX, require_connected=True 

398 ) 

399 return await get_async_backend().wrap_connected_unix_datagram_socket(sock)