Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/context.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1"""Python bindings for 0MQ.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6from __future__ import annotations 

7 

8import atexit 

9import os 

10from threading import Lock 

11from typing import Any, Callable, Generic, TypeVar, overload 

12from warnings import warn 

13from weakref import WeakSet 

14 

15import zmq 

16from zmq._typing import TypeAlias 

17from zmq.backend import Context as ContextBase 

18from zmq.constants import ContextOption, Errno, SocketOption 

19from zmq.error import ZMQError 

20from zmq.utils.interop import cast_int_addr 

21 

22from .attrsettr import AttributeSetter, OptValT 

23from .socket import Socket, SyncSocket 

24 

25# notice when exiting, to avoid triggering term on exit 

26_exiting = False 

27 

28 

29def _notice_atexit() -> None: 

30 global _exiting 

31 _exiting = True 

32 

33 

34atexit.register(_notice_atexit) 

35 

36_ContextType = TypeVar('_ContextType', bound='Context') 

37_SocketType = TypeVar('_SocketType', bound='Socket', covariant=True) 

38 

39 

40class Context(ContextBase, AttributeSetter, Generic[_SocketType]): 

41 """Create a zmq Context 

42 

43 A zmq Context creates sockets via its ``ctx.socket`` method. 

44 

45 .. versionchanged:: 24 

46 

47 When using a Context as a context manager (``with zmq.Context()``), 

48 or deleting a context without closing it first, 

49 ``ctx.destroy()`` is called, 

50 closing any leftover sockets, 

51 instead of `ctx.term()` which requires sockets to be closed first. 

52 

53 This prevents hangs caused by `ctx.term()` if sockets are left open, 

54 but means that unclean destruction of contexts 

55 (with sockets left open) is not safe 

56 if sockets are managed in other threads. 

57 

58 .. versionadded:: 25 

59 

60 Contexts can now be shadowed by passing another Context. 

61 This helps in creating an async copy of a sync context or vice versa:: 

62 

63 ctx = zmq.Context(async_ctx) 

64 

65 Which previously had to be:: 

66 

67 ctx = zmq.Context.shadow(async_ctx.underlying) 

68 """ 

69 

70 sockopts: dict[int, Any] 

71 _instance: Any = None 

72 _instance_lock = Lock() 

73 _instance_pid: int | None = None 

74 _shadow = False 

75 _shadow_obj = None 

76 _warn_destroy_close = False 

77 _sockets: WeakSet 

78 # mypy doesn't like a default value here 

79 _socket_class: type[_SocketType] = Socket # type: ignore 

80 

81 @overload 

82 def __init__(self: SyncContext, io_threads: int = 1): ... 

83 

84 @overload 

85 def __init__(self: SyncContext, io_threads: Context): 

86 # this should be positional-only, but that requires 3.8 

87 ... 

88 

89 @overload 

90 def __init__(self: SyncContext, *, shadow: Context | int): ... 

91 

92 def __init__( 

93 self: SyncContext, 

94 io_threads: int | Context = 1, 

95 shadow: Context | int = 0, 

96 ) -> None: 

97 if isinstance(io_threads, Context): 

98 # allow positional shadow `zmq.Context(zmq.asyncio.Context())` 

99 # this s 

100 shadow = io_threads 

101 io_threads = 1 

102 

103 shadow_address: int = 0 

104 if shadow: 

105 self._shadow = True 

106 # hold a reference to the shadow object 

107 self._shadow_obj = shadow 

108 if not isinstance(shadow, int): 

109 try: 

110 shadow = shadow.underlying 

111 except AttributeError: 

112 pass 

113 shadow_address = cast_int_addr(shadow) 

114 else: 

115 self._shadow = False 

116 super().__init__(io_threads=io_threads, shadow=shadow_address) 

117 self.sockopts = {} 

118 self._sockets = WeakSet() 

119 

120 def __del__(self) -> None: 

121 """Deleting a Context without closing it destroys it and all sockets. 

122 

123 .. versionchanged:: 24 

124 Switch from threadsafe `term()` which hangs in the event of open sockets 

125 to less safe `destroy()` which 

126 warns about any leftover sockets and closes them. 

127 """ 

128 

129 # Calling locals() here conceals issue #1167 on Windows CPython 3.5.4. 

130 locals() 

131 

132 if not self._shadow and not _exiting and not self.closed: 

133 self._warn_destroy_close = True 

134 if warn is not None and getattr(self, "_sockets", None) is not None: 

135 # warn can be None during process teardown 

136 warn( 

137 f"Unclosed context {self}", 

138 ResourceWarning, 

139 stacklevel=2, 

140 source=self, 

141 ) 

142 self.destroy() 

143 

144 _repr_cls = "zmq.Context" 

145 

146 def __repr__(self) -> str: 

147 cls = self.__class__ 

148 # look up _repr_cls on exact class, not inherited 

149 _repr_cls = cls.__dict__.get("_repr_cls", None) 

150 if _repr_cls is None: 

151 _repr_cls = f"{cls.__module__}.{cls.__name__}" 

152 

153 closed = ' closed' if self.closed else '' 

154 if getattr(self, "_sockets", None): 

155 n_sockets = len(self._sockets) 

156 s = 's' if n_sockets > 1 else '' 

157 sockets = f"{n_sockets} socket{s}" 

158 else: 

159 sockets = "" 

160 return f"<{_repr_cls}({sockets}) at {hex(id(self))}{closed}>" 

161 

162 def __enter__(self: _ContextType) -> _ContextType: 

163 return self 

164 

165 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 

166 # warn about any leftover sockets before closing them 

167 self._warn_destroy_close = True 

168 self.destroy() 

169 

170 def __copy__(self: _ContextType, memo: Any = None) -> _ContextType: 

171 """Copying a Context creates a shadow copy""" 

172 return self.__class__.shadow(self.underlying) 

173 

174 __deepcopy__ = __copy__ 

175 

176 @classmethod 

177 def shadow(cls: type[_ContextType], address: int | zmq.Context) -> _ContextType: 

178 """Shadow an existing libzmq context 

179 

180 address is a zmq.Context or an integer (or FFI pointer) 

181 representing the address of the libzmq context. 

182 

183 .. versionadded:: 14.1 

184 

185 .. versionadded:: 25 

186 Support for shadowing `zmq.Context` objects, 

187 instead of just integer addresses. 

188 """ 

189 return cls(shadow=address) 

190 

191 @classmethod 

192 def shadow_pyczmq(cls: type[_ContextType], ctx: Any) -> _ContextType: 

193 """Shadow an existing pyczmq context 

194 

195 ctx is the FFI `zctx_t *` pointer 

196 

197 .. versionadded:: 14.1 

198 """ 

199 from pyczmq import zctx # type: ignore 

200 

201 from zmq.utils.interop import cast_int_addr 

202 

203 underlying = zctx.underlying(ctx) 

204 address = cast_int_addr(underlying) 

205 return cls(shadow=address) 

206 

207 # static method copied from tornado IOLoop.instance 

208 @classmethod 

209 def instance(cls: type[_ContextType], io_threads: int = 1) -> _ContextType: 

210 """Returns a global Context instance. 

211 

212 Most single-process applications have a single, global Context. 

213 Use this method instead of passing around Context instances 

214 throughout your code. 

215 

216 A common pattern for classes that depend on Contexts is to use 

217 a default argument to enable programs with multiple Contexts 

218 but not require the argument for simpler applications:: 

219 

220 class MyClass(object): 

221 def __init__(self, context=None): 

222 self.context = context or Context.instance() 

223 

224 .. versionchanged:: 18.1 

225 

226 When called in a subprocess after forking, 

227 a new global instance is created instead of inheriting 

228 a Context that won't work from the parent process. 

229 """ 

230 if ( 

231 cls._instance is None 

232 or cls._instance_pid != os.getpid() 

233 or cls._instance.closed 

234 ): 

235 with cls._instance_lock: 

236 if ( 

237 cls._instance is None 

238 or cls._instance_pid != os.getpid() 

239 or cls._instance.closed 

240 ): 

241 cls._instance = cls(io_threads=io_threads) 

242 cls._instance_pid = os.getpid() 

243 return cls._instance 

244 

245 def term(self) -> None: 

246 """Close or terminate the context. 

247 

248 Context termination is performed in the following steps: 

249 

250 - Any blocking operations currently in progress on sockets open within context shall 

251 raise :class:`zmq.ContextTerminated`. 

252 With the exception of socket.close(), any further operations on sockets open within this context 

253 shall raise :class:`zmq.ContextTerminated`. 

254 - After interrupting all blocking calls, term shall block until the following conditions are satisfied: 

255 - All sockets open within context have been closed. 

256 - For each socket within context, all messages sent on the socket have either been 

257 physically transferred to a network peer, 

258 or the socket's linger period set with the zmq.LINGER socket option has expired. 

259 

260 For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER. 

261 

262 This can be called to close the context by hand. If this is not called, 

263 the context will automatically be closed when it is garbage collected, 

264 in which case you may see a ResourceWarning about the unclosed context. 

265 """ 

266 super().term() 

267 

268 # ------------------------------------------------------------------------- 

269 # Hooks for ctxopt completion 

270 # ------------------------------------------------------------------------- 

271 

272 def __dir__(self) -> list[str]: 

273 keys = dir(self.__class__) 

274 keys.extend(ContextOption.__members__) 

275 return keys 

276 

277 # ------------------------------------------------------------------------- 

278 # Creating Sockets 

279 # ------------------------------------------------------------------------- 

280 

281 def _add_socket(self, socket: Any) -> None: 

282 """Add a weakref to a socket for Context.destroy / reference counting""" 

283 self._sockets.add(socket) 

284 

285 def _rm_socket(self, socket: Any) -> None: 

286 """Remove a socket for Context.destroy / reference counting""" 

287 # allow _sockets to be None in case of process teardown 

288 if getattr(self, "_sockets", None) is not None: 

289 self._sockets.discard(socket) 

290 

291 def destroy(self, linger: int | None = None) -> None: 

292 """Close all sockets associated with this context and then terminate 

293 the context. 

294 

295 .. warning:: 

296 

297 destroy involves calling :meth:`Socket.close`, which is **NOT** threadsafe. 

298 If there are active sockets in other threads, this must not be called. 

299 

300 Parameters 

301 ---------- 

302 

303 linger : int, optional 

304 If specified, set LINGER on sockets prior to closing them. 

305 """ 

306 if self.closed: 

307 return 

308 

309 sockets: list[_SocketType] = list(getattr(self, "_sockets", None) or []) 

310 for s in sockets: 

311 if s and not s.closed: 

312 if self._warn_destroy_close and warn is not None: 

313 # warn can be None during process teardown 

314 warn( 

315 f"Destroying context with unclosed socket {s}", 

316 ResourceWarning, 

317 stacklevel=3, 

318 source=s, 

319 ) 

320 if linger is not None: 

321 s.setsockopt(SocketOption.LINGER, linger) 

322 s.close() 

323 

324 self.term() 

325 

326 def socket( 

327 self: _ContextType, 

328 socket_type: int, 

329 socket_class: Callable[[_ContextType, int], _SocketType] | None = None, 

330 **kwargs: Any, 

331 ) -> _SocketType: 

332 """Create a Socket associated with this Context. 

333 

334 Parameters 

335 ---------- 

336 socket_type : int 

337 The socket type, which can be any of the 0MQ socket types: 

338 REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc. 

339 

340 socket_class: zmq.Socket 

341 The socket class to instantiate, if different from the default for this Context. 

342 e.g. for creating an asyncio socket attached to a default Context or vice versa. 

343 

344 .. versionadded:: 25 

345 

346 kwargs: 

347 will be passed to the __init__ method of the socket class. 

348 """ 

349 if self.closed: 

350 raise ZMQError(Errno.ENOTSUP) 

351 if socket_class is None: 

352 socket_class = self._socket_class 

353 s: _SocketType = ( 

354 socket_class( # set PYTHONTRACEMALLOC=2 to get the calling frame 

355 self, socket_type, **kwargs 

356 ) 

357 ) 

358 for opt, value in self.sockopts.items(): 

359 try: 

360 s.setsockopt(opt, value) 

361 except ZMQError: 

362 # ignore ZMQErrors, which are likely for socket options 

363 # that do not apply to a particular socket type, e.g. 

364 # SUBSCRIBE for non-SUB sockets. 

365 pass 

366 self._add_socket(s) 

367 return s 

368 

369 def setsockopt(self, opt: int, value: Any) -> None: 

370 """set default socket options for new sockets created by this Context 

371 

372 .. versionadded:: 13.0 

373 """ 

374 self.sockopts[opt] = value 

375 

376 def getsockopt(self, opt: int) -> OptValT: 

377 """get default socket options for new sockets created by this Context 

378 

379 .. versionadded:: 13.0 

380 """ 

381 return self.sockopts[opt] 

382 

383 def _set_attr_opt(self, name: str, opt: int, value: OptValT) -> None: 

384 """set default sockopts as attributes""" 

385 if name in ContextOption.__members__: 

386 return self.set(opt, value) 

387 elif name in SocketOption.__members__: 

388 self.sockopts[opt] = value 

389 else: 

390 raise AttributeError(f"No such context or socket option: {name}") 

391 

392 def _get_attr_opt(self, name: str, opt: int) -> OptValT: 

393 """get default sockopts as attributes""" 

394 if name in ContextOption.__members__: 

395 return self.get(opt) 

396 else: 

397 if opt not in self.sockopts: 

398 raise AttributeError(name) 

399 else: 

400 return self.sockopts[opt] 

401 

402 def __delattr__(self, key: str) -> None: 

403 """delete default sockopts as attributes""" 

404 if key in self.__dict__: 

405 self.__dict__.pop(key) 

406 return 

407 key = key.upper() 

408 try: 

409 opt = getattr(SocketOption, key) 

410 except AttributeError: 

411 raise AttributeError(f"No such socket option: {key!r}") 

412 else: 

413 if opt not in self.sockopts: 

414 raise AttributeError(key) 

415 else: 

416 del self.sockopts[opt] 

417 

418 

419SyncContext: TypeAlias = Context[SyncSocket] 

420 

421 

422__all__ = ['Context', 'SyncContext']