Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/sugar/context.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1"""Python bindings for 0MQ.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6from __future__ import annotations 

7 

8import atexit 

9import os 

10from threading import Lock 

11from typing import Any, Callable, Generic, TypeVar, overload 

12from warnings import warn 

13from weakref import WeakSet 

14 

15import zmq 

16from zmq._typing import TypeAlias 

17from zmq.backend import Context as ContextBase 

18from zmq.constants import ContextOption, Errno, SocketOption 

19from zmq.error import ZMQError 

20from zmq.utils.interop import cast_int_addr 

21 

22from .attrsettr import AttributeSetter, OptValT 

23from .socket import Socket, SyncSocket 

24 

25# notice when exiting, to avoid triggering term on exit 

26_exiting = False 

27 

28 

29def _notice_atexit() -> None: 

30 global _exiting 

31 _exiting = True 

32 

33 

34atexit.register(_notice_atexit) 

35 

36_ContextType = TypeVar('_ContextType', bound='Context') 

37_SocketType = TypeVar('_SocketType', bound='Socket', covariant=True) 

38 

39 

40class Context(ContextBase, AttributeSetter, Generic[_SocketType]): 

41 """Create a zmq Context 

42 

43 A zmq Context creates sockets via its ``ctx.socket`` method. 

44 

45 .. versionchanged:: 24 

46 

47 When using a Context as a context manager (``with zmq.Context()``), 

48 or deleting a context without closing it first, 

49 ``ctx.destroy()`` is called, 

50 closing any leftover sockets, 

51 instead of `ctx.term()` which requires sockets to be closed first. 

52 

53 This prevents hangs caused by `ctx.term()` if sockets are left open, 

54 but means that unclean destruction of contexts 

55 (with sockets left open) is not safe 

56 if sockets are managed in other threads. 

57 

58 .. versionadded:: 25 

59 

60 Contexts can now be shadowed by passing another Context. 

61 This helps in creating an async copy of a sync context or vice versa:: 

62 

63 ctx = zmq.Context(async_ctx) 

64 

65 Which previously had to be:: 

66 

67 ctx = zmq.Context.shadow(async_ctx.underlying) 

68 """ 

69 

70 sockopts: dict[int, Any] 

71 _instance: Any = None 

72 _instance_lock = Lock() 

73 _instance_pid: int | None = None 

74 _shadow = False 

75 _shadow_obj = None 

76 _warn_destroy_close = False 

77 _sockets: WeakSet 

78 # mypy doesn't like a default value here 

79 _socket_class: type[_SocketType] = Socket # type: ignore 

80 

81 @overload 

82 def __init__(self: SyncContext, io_threads: int = 1): ... 

83 

84 @overload 

85 def __init__(self: SyncContext, io_threads: Context, /): ... 

86 

87 @overload 

88 def __init__(self: SyncContext, *, shadow: Context | int): ... 

89 

90 def __init__( 

91 self: SyncContext, 

92 io_threads: int | Context = 1, 

93 shadow: Context | int = 0, 

94 ) -> None: 

95 if isinstance(io_threads, Context): 

96 # allow positional shadow `zmq.Context(zmq.asyncio.Context())` 

97 # this s 

98 shadow = io_threads 

99 io_threads = 1 

100 

101 shadow_address: int = 0 

102 if shadow: 

103 self._shadow = True 

104 # hold a reference to the shadow object 

105 self._shadow_obj = shadow 

106 if not isinstance(shadow, int): 

107 try: 

108 shadow = shadow.underlying 

109 except AttributeError: 

110 pass 

111 shadow_address = cast_int_addr(shadow) 

112 else: 

113 self._shadow = False 

114 super().__init__(io_threads=io_threads, shadow=shadow_address) 

115 self.sockopts = {} 

116 self._sockets = WeakSet() 

117 

118 def __del__(self) -> None: 

119 """Deleting a Context without closing it destroys it and all sockets. 

120 

121 .. versionchanged:: 24 

122 Switch from threadsafe `term()` which hangs in the event of open sockets 

123 to less safe `destroy()` which 

124 warns about any leftover sockets and closes them. 

125 """ 

126 

127 # Calling locals() here conceals issue #1167 on Windows CPython 3.5.4. 

128 locals() 

129 

130 if not self._shadow and not _exiting and not self.closed: 

131 self._warn_destroy_close = True 

132 if warn is not None and getattr(self, "_sockets", None) is not None: 

133 # warn can be None during process teardown 

134 warn( 

135 f"Unclosed context {self}", 

136 ResourceWarning, 

137 stacklevel=2, 

138 source=self, 

139 ) 

140 self.destroy() 

141 

142 _repr_cls = "zmq.Context" 

143 

144 def __repr__(self) -> str: 

145 cls = self.__class__ 

146 # look up _repr_cls on exact class, not inherited 

147 _repr_cls = cls.__dict__.get("_repr_cls", None) 

148 if _repr_cls is None: 

149 _repr_cls = f"{cls.__module__}.{cls.__name__}" 

150 

151 closed = ' closed' if self.closed else '' 

152 if getattr(self, "_sockets", None): 

153 n_sockets = len(self._sockets) 

154 s = 's' if n_sockets > 1 else '' 

155 sockets = f"{n_sockets} socket{s}" 

156 else: 

157 sockets = "" 

158 return f"<{_repr_cls}({sockets}) at {hex(id(self))}{closed}>" 

159 

160 def __enter__(self: _ContextType) -> _ContextType: 

161 return self 

162 

163 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 

164 # warn about any leftover sockets before closing them 

165 self._warn_destroy_close = True 

166 self.destroy() 

167 

168 def __copy__(self: _ContextType, memo: Any = None) -> _ContextType: 

169 """Copying a Context creates a shadow copy""" 

170 return self.__class__.shadow(self.underlying) 

171 

172 __deepcopy__ = __copy__ 

173 

174 @classmethod 

175 def shadow(cls: type[_ContextType], address: int | zmq.Context) -> _ContextType: 

176 """Shadow an existing libzmq context 

177 

178 address is a zmq.Context or an integer (or FFI pointer) 

179 representing the address of the libzmq context. 

180 

181 .. versionadded:: 14.1 

182 

183 .. versionadded:: 25 

184 Support for shadowing `zmq.Context` objects, 

185 instead of just integer addresses. 

186 """ 

187 return cls(shadow=address) 

188 

189 @classmethod 

190 def shadow_pyczmq(cls: type[_ContextType], ctx: Any) -> _ContextType: 

191 """Shadow an existing pyczmq context 

192 

193 ctx is the FFI `zctx_t *` pointer 

194 

195 .. versionadded:: 14.1 

196 """ 

197 from pyczmq import zctx # type: ignore 

198 

199 from zmq.utils.interop import cast_int_addr 

200 

201 underlying = zctx.underlying(ctx) 

202 address = cast_int_addr(underlying) 

203 return cls(shadow=address) 

204 

205 # static method copied from tornado IOLoop.instance 

206 @classmethod 

207 def instance(cls: type[_ContextType], io_threads: int = 1) -> _ContextType: 

208 """Returns a global Context instance. 

209 

210 Most single-process applications have a single, global Context. 

211 Use this method instead of passing around Context instances 

212 throughout your code. 

213 

214 A common pattern for classes that depend on Contexts is to use 

215 a default argument to enable programs with multiple Contexts 

216 but not require the argument for simpler applications:: 

217 

218 class MyClass(object): 

219 def __init__(self, context=None): 

220 self.context = context or Context.instance() 

221 

222 .. versionchanged:: 18.1 

223 

224 When called in a subprocess after forking, 

225 a new global instance is created instead of inheriting 

226 a Context that won't work from the parent process. 

227 """ 

228 if ( 

229 cls._instance is None 

230 or cls._instance_pid != os.getpid() 

231 or cls._instance.closed 

232 ): 

233 with cls._instance_lock: 

234 if ( 

235 cls._instance is None 

236 or cls._instance_pid != os.getpid() 

237 or cls._instance.closed 

238 ): 

239 cls._instance = cls(io_threads=io_threads) 

240 cls._instance_pid = os.getpid() 

241 return cls._instance 

242 

243 def term(self) -> None: 

244 """Close or terminate the context. 

245 

246 Context termination is performed in the following steps: 

247 

248 - Any blocking operations currently in progress on sockets open within context shall 

249 raise :class:`zmq.ContextTerminated`. 

250 With the exception of socket.close(), any further operations on sockets open within this context 

251 shall raise :class:`zmq.ContextTerminated`. 

252 - After interrupting all blocking calls, term shall block until the following conditions are satisfied: 

253 - All sockets open within context have been closed. 

254 - For each socket within context, all messages sent on the socket have either been 

255 physically transferred to a network peer, 

256 or the socket's linger period set with the zmq.LINGER socket option has expired. 

257 

258 For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER. 

259 

260 This can be called to close the context by hand. If this is not called, 

261 the context will automatically be closed when it is garbage collected, 

262 in which case you may see a ResourceWarning about the unclosed context. 

263 """ 

264 super().term() 

265 

266 # ------------------------------------------------------------------------- 

267 # Hooks for ctxopt completion 

268 # ------------------------------------------------------------------------- 

269 

270 def __dir__(self) -> list[str]: 

271 keys = dir(self.__class__) 

272 keys.extend(ContextOption.__members__) 

273 return keys 

274 

275 # ------------------------------------------------------------------------- 

276 # Creating Sockets 

277 # ------------------------------------------------------------------------- 

278 

279 def _add_socket(self, socket: Any) -> None: 

280 """Add a weakref to a socket for Context.destroy / reference counting""" 

281 self._sockets.add(socket) 

282 

283 def _rm_socket(self, socket: Any) -> None: 

284 """Remove a socket for Context.destroy / reference counting""" 

285 # allow _sockets to be None in case of process teardown 

286 if getattr(self, "_sockets", None) is not None: 

287 self._sockets.discard(socket) 

288 

289 def destroy(self, linger: int | None = None) -> None: 

290 """Close all sockets associated with this context and then terminate 

291 the context. 

292 

293 .. warning:: 

294 

295 destroy involves calling :meth:`Socket.close`, which is **NOT** threadsafe. 

296 If there are active sockets in other threads, this must not be called. 

297 

298 Parameters 

299 ---------- 

300 

301 linger : int, optional 

302 If specified, set LINGER on sockets prior to closing them. 

303 """ 

304 if self.closed: 

305 return 

306 

307 sockets: list[_SocketType] = list(getattr(self, "_sockets", None) or []) 

308 for s in sockets: 

309 if s and not s.closed: 

310 if self._warn_destroy_close and warn is not None: 

311 # warn can be None during process teardown 

312 warn( 

313 f"Destroying context with unclosed socket {s}", 

314 ResourceWarning, 

315 stacklevel=3, 

316 source=s, 

317 ) 

318 if linger is not None: 

319 s.setsockopt(SocketOption.LINGER, linger) 

320 s.close() 

321 

322 self.term() 

323 

324 def socket( 

325 self: _ContextType, 

326 socket_type: int, 

327 socket_class: Callable[[_ContextType, int], _SocketType] | None = None, 

328 **kwargs: Any, 

329 ) -> _SocketType: 

330 """Create a Socket associated with this Context. 

331 

332 Parameters 

333 ---------- 

334 socket_type : int 

335 The socket type, which can be any of the 0MQ socket types: 

336 REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc. 

337 

338 socket_class: zmq.Socket 

339 The socket class to instantiate, if different from the default for this Context. 

340 e.g. for creating an asyncio socket attached to a default Context or vice versa. 

341 

342 .. versionadded:: 25 

343 

344 kwargs: 

345 will be passed to the __init__ method of the socket class. 

346 """ 

347 if self.closed: 

348 raise ZMQError(Errno.ENOTSUP) 

349 if socket_class is None: 

350 socket_class = self._socket_class 

351 s: _SocketType = ( 

352 socket_class( # set PYTHONTRACEMALLOC=2 to get the calling frame 

353 self, socket_type, **kwargs 

354 ) 

355 ) 

356 for opt, value in self.sockopts.items(): 

357 try: 

358 s.setsockopt(opt, value) 

359 except ZMQError: 

360 # ignore ZMQErrors, which are likely for socket options 

361 # that do not apply to a particular socket type, e.g. 

362 # SUBSCRIBE for non-SUB sockets. 

363 pass 

364 self._add_socket(s) 

365 return s 

366 

367 def setsockopt(self, opt: int, value: Any) -> None: 

368 """set default socket options for new sockets created by this Context 

369 

370 .. versionadded:: 13.0 

371 """ 

372 self.sockopts[opt] = value 

373 

374 def getsockopt(self, opt: int) -> OptValT: 

375 """get default socket options for new sockets created by this Context 

376 

377 .. versionadded:: 13.0 

378 """ 

379 return self.sockopts[opt] 

380 

381 def _set_attr_opt(self, name: str, opt: int, value: OptValT) -> None: 

382 """set default sockopts as attributes""" 

383 if name in ContextOption.__members__: 

384 return self.set(opt, value) 

385 elif name in SocketOption.__members__: 

386 self.sockopts[opt] = value 

387 else: 

388 raise AttributeError(f"No such context or socket option: {name}") 

389 

390 def _get_attr_opt(self, name: str, opt: int) -> OptValT: 

391 """get default sockopts as attributes""" 

392 if name in ContextOption.__members__: 

393 return self.get(opt) 

394 else: 

395 if opt not in self.sockopts: 

396 raise AttributeError(name) 

397 else: 

398 return self.sockopts[opt] 

399 

400 def __delattr__(self, key: str) -> None: 

401 """delete default sockopts as attributes""" 

402 if key in self.__dict__: 

403 self.__dict__.pop(key) 

404 return 

405 key = key.upper() 

406 try: 

407 opt = getattr(SocketOption, key) 

408 except AttributeError: 

409 raise AttributeError(f"No such socket option: {key!r}") 

410 else: 

411 if opt not in self.sockopts: 

412 raise AttributeError(key) 

413 else: 

414 del self.sockopts[opt] 

415 

416 

417SyncContext: TypeAlias = Context[SyncSocket] 

418 

419 

420__all__ = ['Context', 'SyncContext']