Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/context.py: 36%

166 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""Python bindings for 0MQ.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6import atexit 

7import os 

8from threading import Lock 

9from typing import ( 

10 Any, 

11 Callable, 

12 Dict, 

13 Generic, 

14 List, 

15 Optional, 

16 Type, 

17 TypeVar, 

18 Union, 

19 overload, 

20) 

21from warnings import warn 

22from weakref import WeakSet 

23 

24from zmq.backend import Context as ContextBase 

25from zmq.constants import ContextOption, Errno, SocketOption 

26from zmq.error import ZMQError 

27from zmq.utils.interop import cast_int_addr 

28 

29from .attrsettr import AttributeSetter, OptValT 

30from .socket import Socket 

31 

32# notice when exiting, to avoid triggering term on exit 

33_exiting = False 

34 

35 

36def _notice_atexit() -> None: 

37 global _exiting 

38 _exiting = True 

39 

40 

41atexit.register(_notice_atexit) 

42 

43T = TypeVar('T', bound='Context') 

44ST = TypeVar('ST', bound='Socket', covariant=True) 

45 

46 

47class Context(ContextBase, AttributeSetter, Generic[ST]): 

48 """Create a zmq Context 

49 

50 A zmq Context creates sockets via its ``ctx.socket`` method. 

51 

52 .. versionchanged:: 24 

53 

54 When using a Context as a context manager (``with zmq.Context()``), 

55 or deleting a context without closing it first, 

56 ``ctx.destroy()`` is called, 

57 closing any leftover sockets, 

58 instead of `ctx.term()` which requires sockets to be closed first. 

59 

60 This prevents hangs caused by `ctx.term()` if sockets are left open, 

61 but means that unclean destruction of contexts 

62 (with sockets left open) is not safe 

63 if sockets are managed in other threads. 

64 

65 .. versionadded:: 25 

66 

67 Contexts can now be shadowed by passing another Context. 

68 This helps in creating an async copy of a sync context or vice versa:: 

69 

70 ctx = zmq.Context(async_ctx) 

71 

72 Which previously had to be:: 

73 

74 ctx = zmq.Context.shadow(async_ctx.underlying) 

75 """ 

76 

77 sockopts: Dict[int, Any] 

78 _instance: Any = None 

79 _instance_lock = Lock() 

80 _instance_pid: Optional[int] = None 

81 _shadow = False 

82 _shadow_obj = None 

83 _warn_destroy_close = False 

84 _sockets: WeakSet 

85 # mypy doesn't like a default value here 

86 _socket_class: Type[ST] = Socket # type: ignore 

87 

88 @overload 

89 def __init__(self: "Context[Socket]", io_threads: int = 1): 

90 ... 

91 

92 @overload 

93 def __init__(self: "Context[Socket]", io_threads: "Context"): 

94 # this should be positional-only, but that requires 3.8 

95 ... 

96 

97 @overload 

98 def __init__(self: "Context[Socket]", *, shadow: Union["Context", int]): 

99 ... 

100 

101 def __init__( 

102 self: "Context[Socket]", 

103 io_threads: Union[int, "Context"] = 1, 

104 shadow: Union["Context", int] = 0, 

105 ) -> None: 

106 if isinstance(io_threads, Context): 

107 # allow positional shadow `zmq.Context(zmq.asyncio.Context())` 

108 # this s 

109 shadow = io_threads 

110 io_threads = 1 

111 

112 shadow_address: int = 0 

113 if shadow: 

114 self._shadow = True 

115 # hold a reference to the shadow object 

116 self._shadow_obj = shadow 

117 if not isinstance(shadow, int): 

118 try: 

119 shadow = shadow.underlying 

120 except AttributeError: 

121 pass 

122 shadow_address = cast_int_addr(shadow) 

123 else: 

124 self._shadow = False 

125 super().__init__(io_threads=io_threads, shadow=shadow_address) 

126 self.sockopts = {} 

127 self._sockets = WeakSet() 

128 

129 def __del__(self) -> None: 

130 """Deleting a Context without closing it destroys it and all sockets. 

131 

132 .. versionchanged:: 24 

133 Switch from threadsafe `term()` which hangs in the event of open sockets 

134 to less safe `destroy()` which 

135 warns about any leftover sockets and closes them. 

136 """ 

137 

138 # Calling locals() here conceals issue #1167 on Windows CPython 3.5.4. 

139 locals() 

140 

141 if not self._shadow and not _exiting and not self.closed: 

142 self._warn_destroy_close = True 

143 if warn is not None and getattr(self, "_sockets", None) is not None: 

144 # warn can be None during process teardown 

145 warn( 

146 f"Unclosed context {self}", 

147 ResourceWarning, 

148 stacklevel=2, 

149 source=self, 

150 ) 

151 self.destroy() 

152 

153 _repr_cls = "zmq.Context" 

154 

155 def __repr__(self) -> str: 

156 cls = self.__class__ 

157 # look up _repr_cls on exact class, not inherited 

158 _repr_cls = cls.__dict__.get("_repr_cls", None) 

159 if _repr_cls is None: 

160 _repr_cls = f"{cls.__module__}.{cls.__name__}" 

161 

162 closed = ' closed' if self.closed else '' 

163 if getattr(self, "_sockets", None): 

164 n_sockets = len(self._sockets) 

165 s = 's' if n_sockets > 1 else '' 

166 sockets = f"{n_sockets} socket{s}" 

167 else: 

168 sockets = "" 

169 return f"<{_repr_cls}({sockets}) at {hex(id(self))}{closed}>" 

170 

171 def __enter__(self: T) -> T: 

172 return self 

173 

174 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 

175 # warn about any leftover sockets before closing them 

176 self._warn_destroy_close = True 

177 self.destroy() 

178 

179 def __copy__(self: T, memo: Any = None) -> T: 

180 """Copying a Context creates a shadow copy""" 

181 return self.__class__.shadow(self.underlying) 

182 

183 __deepcopy__ = __copy__ 

184 

185 @classmethod 

186 def shadow(cls: Type[T], address: Union[int, "Context"]) -> T: 

187 """Shadow an existing libzmq context 

188 

189 address is a zmq.Context or an integer (or FFI pointer) 

190 representing the address of the libzmq context. 

191 

192 .. versionadded:: 14.1 

193 

194 .. versionadded:: 25 

195 Support for shadowing `zmq.Context` objects, 

196 instead of just integer addresses. 

197 """ 

198 return cls(shadow=address) 

199 

200 @classmethod 

201 def shadow_pyczmq(cls: Type[T], ctx: Any) -> T: 

202 """Shadow an existing pyczmq context 

203 

204 ctx is the FFI `zctx_t *` pointer 

205 

206 .. versionadded:: 14.1 

207 """ 

208 from pyczmq import zctx # type: ignore 

209 

210 from zmq.utils.interop import cast_int_addr 

211 

212 underlying = zctx.underlying(ctx) 

213 address = cast_int_addr(underlying) 

214 return cls(shadow=address) 

215 

216 # static method copied from tornado IOLoop.instance 

217 @classmethod 

218 def instance(cls: Type[T], io_threads: int = 1) -> T: 

219 """Returns a global Context instance. 

220 

221 Most single-threaded applications have a single, global Context. 

222 Use this method instead of passing around Context instances 

223 throughout your code. 

224 

225 A common pattern for classes that depend on Contexts is to use 

226 a default argument to enable programs with multiple Contexts 

227 but not require the argument for simpler applications:: 

228 

229 class MyClass(object): 

230 def __init__(self, context=None): 

231 self.context = context or Context.instance() 

232 

233 .. versionchanged:: 18.1 

234 

235 When called in a subprocess after forking, 

236 a new global instance is created instead of inheriting 

237 a Context that won't work from the parent process. 

238 """ 

239 if ( 

240 cls._instance is None 

241 or cls._instance_pid != os.getpid() 

242 or cls._instance.closed 

243 ): 

244 with cls._instance_lock: 

245 if ( 

246 cls._instance is None 

247 or cls._instance_pid != os.getpid() 

248 or cls._instance.closed 

249 ): 

250 cls._instance = cls(io_threads=io_threads) 

251 cls._instance_pid = os.getpid() 

252 return cls._instance 

253 

254 def term(self) -> None: 

255 """Close or terminate the context. 

256 

257 Context termination is performed in the following steps: 

258 

259 - Any blocking operations currently in progress on sockets open within context shall 

260 raise :class:`zmq.ContextTerminated`. 

261 With the exception of socket.close(), any further operations on sockets open within this context 

262 shall raise :class:`zmq.ContextTerminated`. 

263 - After interrupting all blocking calls, term shall block until the following conditions are satisfied: 

264 - All sockets open within context have been closed. 

265 - For each socket within context, all messages sent on the socket have either been 

266 physically transferred to a network peer, 

267 or the socket's linger period set with the zmq.LINGER socket option has expired. 

268 

269 For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER. 

270 

271 This can be called to close the context by hand. If this is not called, 

272 the context will automatically be closed when it is garbage collected, 

273 in which case you may see a ResourceWarning about the unclosed context. 

274 """ 

275 super().term() 

276 

277 # ------------------------------------------------------------------------- 

278 # Hooks for ctxopt completion 

279 # ------------------------------------------------------------------------- 

280 

281 def __dir__(self) -> List[str]: 

282 keys = dir(self.__class__) 

283 keys.extend(ContextOption.__members__) 

284 return keys 

285 

286 # ------------------------------------------------------------------------- 

287 # Creating Sockets 

288 # ------------------------------------------------------------------------- 

289 

290 def _add_socket(self, socket: Any) -> None: 

291 """Add a weakref to a socket for Context.destroy / reference counting""" 

292 self._sockets.add(socket) 

293 

294 def _rm_socket(self, socket: Any) -> None: 

295 """Remove a socket for Context.destroy / reference counting""" 

296 # allow _sockets to be None in case of process teardown 

297 if getattr(self, "_sockets", None) is not None: 

298 self._sockets.discard(socket) 

299 

300 def destroy(self, linger: Optional[int] = None) -> None: 

301 """Close all sockets associated with this context and then terminate 

302 the context. 

303 

304 .. warning:: 

305 

306 destroy involves calling ``zmq_close()``, which is **NOT** threadsafe. 

307 If there are active sockets in other threads, this must not be called. 

308 

309 Parameters 

310 ---------- 

311 

312 linger : int, optional 

313 If specified, set LINGER on sockets prior to closing them. 

314 """ 

315 if self.closed: 

316 return 

317 

318 sockets: List[ST] = list(getattr(self, "_sockets", None) or []) 

319 for s in sockets: 

320 if s and not s.closed: 

321 if self._warn_destroy_close and warn is not None: 

322 # warn can be None during process teardown 

323 warn( 

324 f"Destroying context with unclosed socket {s}", 

325 ResourceWarning, 

326 stacklevel=3, 

327 source=s, 

328 ) 

329 if linger is not None: 

330 s.setsockopt(SocketOption.LINGER, linger) 

331 s.close() 

332 

333 self.term() 

334 

335 def socket( 

336 self: T, 

337 socket_type: int, 

338 socket_class: Optional[Callable[[T, int], ST]] = None, 

339 **kwargs: Any, 

340 ) -> ST: 

341 """Create a Socket associated with this Context. 

342 

343 Parameters 

344 ---------- 

345 socket_type : int 

346 The socket type, which can be any of the 0MQ socket types: 

347 REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc. 

348 

349 socket_class: zmq.Socket or a subclass 

350 The socket class to instantiate, if different from the default for this Context. 

351 e.g. for creating an asyncio socket attached to a default Context or vice versa. 

352 

353 .. versionadded:: 25 

354 

355 kwargs: 

356 will be passed to the __init__ method of the socket class. 

357 """ 

358 if self.closed: 

359 raise ZMQError(Errno.ENOTSUP) 

360 if socket_class is None: 

361 socket_class = self._socket_class 

362 s: ST = socket_class( # set PYTHONTRACEMALLOC=2 to get the calling frame 

363 self, socket_type, **kwargs 

364 ) 

365 for opt, value in self.sockopts.items(): 

366 try: 

367 s.setsockopt(opt, value) 

368 except ZMQError: 

369 # ignore ZMQErrors, which are likely for socket options 

370 # that do not apply to a particular socket type, e.g. 

371 # SUBSCRIBE for non-SUB sockets. 

372 pass 

373 self._add_socket(s) 

374 return s 

375 

376 def setsockopt(self, opt: int, value: Any) -> None: 

377 """set default socket options for new sockets created by this Context 

378 

379 .. versionadded:: 13.0 

380 """ 

381 self.sockopts[opt] = value 

382 

383 def getsockopt(self, opt: int) -> OptValT: 

384 """get default socket options for new sockets created by this Context 

385 

386 .. versionadded:: 13.0 

387 """ 

388 return self.sockopts[opt] 

389 

390 def _set_attr_opt(self, name: str, opt: int, value: OptValT) -> None: 

391 """set default sockopts as attributes""" 

392 if name in ContextOption.__members__: 

393 return self.set(opt, value) 

394 elif name in SocketOption.__members__: 

395 self.sockopts[opt] = value 

396 else: 

397 raise AttributeError(f"No such context or socket option: {name}") 

398 

399 def _get_attr_opt(self, name: str, opt: int) -> OptValT: 

400 """get default sockopts as attributes""" 

401 if name in ContextOption.__members__: 

402 return self.get(opt) 

403 else: 

404 if opt not in self.sockopts: 

405 raise AttributeError(name) 

406 else: 

407 return self.sockopts[opt] 

408 

409 def __delattr__(self, key: str) -> None: 

410 """delete default sockopts as attributes""" 

411 if key in self.__dict__: 

412 self.__dict__.pop(key) 

413 return 

414 key = key.upper() 

415 try: 

416 opt = getattr(SocketOption, key) 

417 except AttributeError: 

418 raise AttributeError(f"No such socket option: {key!r}") 

419 else: 

420 if opt not in self.sockopts: 

421 raise AttributeError(key) 

422 else: 

423 del self.sockopts[opt] 

424 

425 

426__all__ = ['Context']