Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/sugar/context.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Python bindings for 0MQ."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6from __future__ import annotations
8import atexit
9import os
10from threading import Lock
11from typing import Any, Callable, Generic, TypeVar, overload
12from warnings import warn
13from weakref import WeakSet
15import zmq
16from zmq._typing import TypeAlias
17from zmq.backend import Context as ContextBase
18from zmq.constants import ContextOption, Errno, SocketOption
19from zmq.error import ZMQError
20from zmq.utils.interop import cast_int_addr
22from .attrsettr import AttributeSetter, OptValT
23from .socket import Socket, SyncSocket
25# notice when exiting, to avoid triggering term on exit
26_exiting = False
29def _notice_atexit() -> None:
30 global _exiting
31 _exiting = True
34atexit.register(_notice_atexit)
36_ContextType = TypeVar('_ContextType', bound='Context')
37_SocketType = TypeVar('_SocketType', bound='Socket', covariant=True)
40class Context(ContextBase, AttributeSetter, Generic[_SocketType]):
41 """Create a zmq Context
43 A zmq Context creates sockets via its ``ctx.socket`` method.
45 .. versionchanged:: 24
47 When using a Context as a context manager (``with zmq.Context()``),
48 or deleting a context without closing it first,
49 ``ctx.destroy()`` is called,
50 closing any leftover sockets,
51 instead of `ctx.term()` which requires sockets to be closed first.
53 This prevents hangs caused by `ctx.term()` if sockets are left open,
54 but means that unclean destruction of contexts
55 (with sockets left open) is not safe
56 if sockets are managed in other threads.
58 .. versionadded:: 25
60 Contexts can now be shadowed by passing another Context.
61 This helps in creating an async copy of a sync context or vice versa::
63 ctx = zmq.Context(async_ctx)
65 Which previously had to be::
67 ctx = zmq.Context.shadow(async_ctx.underlying)
68 """
70 sockopts: dict[int, Any]
71 _instance: Any = None
72 _instance_lock = Lock()
73 _instance_pid: int | None = None
74 _shadow = False
75 _shadow_obj = None
76 _warn_destroy_close = False
77 _sockets: WeakSet
78 # mypy doesn't like a default value here
79 _socket_class: type[_SocketType] = Socket # type: ignore
81 @overload
82 def __init__(self: SyncContext, io_threads: int = 1): ...
84 @overload
85 def __init__(self: SyncContext, io_threads: Context, /): ...
87 @overload
88 def __init__(self: SyncContext, *, shadow: Context | int): ...
90 def __init__(
91 self: SyncContext,
92 io_threads: int | Context = 1,
93 shadow: Context | int = 0,
94 ) -> None:
95 if isinstance(io_threads, Context):
96 # allow positional shadow `zmq.Context(zmq.asyncio.Context())`
97 # this s
98 shadow = io_threads
99 io_threads = 1
101 shadow_address: int = 0
102 if shadow:
103 self._shadow = True
104 # hold a reference to the shadow object
105 self._shadow_obj = shadow
106 if not isinstance(shadow, int):
107 try:
108 shadow = shadow.underlying
109 except AttributeError:
110 pass
111 shadow_address = cast_int_addr(shadow)
112 else:
113 self._shadow = False
114 super().__init__(io_threads=io_threads, shadow=shadow_address)
115 self.sockopts = {}
116 self._sockets = WeakSet()
118 def __del__(self) -> None:
119 """Deleting a Context without closing it destroys it and all sockets.
121 .. versionchanged:: 24
122 Switch from threadsafe `term()` which hangs in the event of open sockets
123 to less safe `destroy()` which
124 warns about any leftover sockets and closes them.
125 """
127 # Calling locals() here conceals issue #1167 on Windows CPython 3.5.4.
128 locals()
130 if not self._shadow and not _exiting and not self.closed:
131 self._warn_destroy_close = True
132 if warn is not None and getattr(self, "_sockets", None) is not None:
133 # warn can be None during process teardown
134 warn(
135 f"Unclosed context {self}",
136 ResourceWarning,
137 stacklevel=2,
138 source=self,
139 )
140 self.destroy()
142 _repr_cls = "zmq.Context"
144 def __repr__(self) -> str:
145 cls = self.__class__
146 # look up _repr_cls on exact class, not inherited
147 _repr_cls = cls.__dict__.get("_repr_cls", None)
148 if _repr_cls is None:
149 _repr_cls = f"{cls.__module__}.{cls.__name__}"
151 closed = ' closed' if self.closed else ''
152 if getattr(self, "_sockets", None):
153 n_sockets = len(self._sockets)
154 s = 's' if n_sockets > 1 else ''
155 sockets = f"{n_sockets} socket{s}"
156 else:
157 sockets = ""
158 return f"<{_repr_cls}({sockets}) at {hex(id(self))}{closed}>"
160 def __enter__(self: _ContextType) -> _ContextType:
161 return self
163 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
164 # warn about any leftover sockets before closing them
165 self._warn_destroy_close = True
166 self.destroy()
168 def __copy__(self: _ContextType, memo: Any = None) -> _ContextType:
169 """Copying a Context creates a shadow copy"""
170 return self.__class__.shadow(self.underlying)
172 __deepcopy__ = __copy__
174 @classmethod
175 def shadow(cls: type[_ContextType], address: int | zmq.Context) -> _ContextType:
176 """Shadow an existing libzmq context
178 address is a zmq.Context or an integer (or FFI pointer)
179 representing the address of the libzmq context.
181 .. versionadded:: 14.1
183 .. versionadded:: 25
184 Support for shadowing `zmq.Context` objects,
185 instead of just integer addresses.
186 """
187 return cls(shadow=address)
189 @classmethod
190 def shadow_pyczmq(cls: type[_ContextType], ctx: Any) -> _ContextType:
191 """Shadow an existing pyczmq context
193 ctx is the FFI `zctx_t *` pointer
195 .. versionadded:: 14.1
196 """
197 from pyczmq import zctx # type: ignore
199 from zmq.utils.interop import cast_int_addr
201 underlying = zctx.underlying(ctx)
202 address = cast_int_addr(underlying)
203 return cls(shadow=address)
205 # static method copied from tornado IOLoop.instance
206 @classmethod
207 def instance(cls: type[_ContextType], io_threads: int = 1) -> _ContextType:
208 """Returns a global Context instance.
210 Most single-process applications have a single, global Context.
211 Use this method instead of passing around Context instances
212 throughout your code.
214 A common pattern for classes that depend on Contexts is to use
215 a default argument to enable programs with multiple Contexts
216 but not require the argument for simpler applications::
218 class MyClass(object):
219 def __init__(self, context=None):
220 self.context = context or Context.instance()
222 .. versionchanged:: 18.1
224 When called in a subprocess after forking,
225 a new global instance is created instead of inheriting
226 a Context that won't work from the parent process.
227 """
228 if (
229 cls._instance is None
230 or cls._instance_pid != os.getpid()
231 or cls._instance.closed
232 ):
233 with cls._instance_lock:
234 if (
235 cls._instance is None
236 or cls._instance_pid != os.getpid()
237 or cls._instance.closed
238 ):
239 cls._instance = cls(io_threads=io_threads)
240 cls._instance_pid = os.getpid()
241 return cls._instance
243 def term(self) -> None:
244 """Close or terminate the context.
246 Context termination is performed in the following steps:
248 - Any blocking operations currently in progress on sockets open within context shall
249 raise :class:`zmq.ContextTerminated`.
250 With the exception of socket.close(), any further operations on sockets open within this context
251 shall raise :class:`zmq.ContextTerminated`.
252 - After interrupting all blocking calls, term shall block until the following conditions are satisfied:
253 - All sockets open within context have been closed.
254 - For each socket within context, all messages sent on the socket have either been
255 physically transferred to a network peer,
256 or the socket's linger period set with the zmq.LINGER socket option has expired.
258 For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER.
260 This can be called to close the context by hand. If this is not called,
261 the context will automatically be closed when it is garbage collected,
262 in which case you may see a ResourceWarning about the unclosed context.
263 """
264 super().term()
266 # -------------------------------------------------------------------------
267 # Hooks for ctxopt completion
268 # -------------------------------------------------------------------------
270 def __dir__(self) -> list[str]:
271 keys = dir(self.__class__)
272 keys.extend(ContextOption.__members__)
273 return keys
275 # -------------------------------------------------------------------------
276 # Creating Sockets
277 # -------------------------------------------------------------------------
279 def _add_socket(self, socket: Any) -> None:
280 """Add a weakref to a socket for Context.destroy / reference counting"""
281 self._sockets.add(socket)
283 def _rm_socket(self, socket: Any) -> None:
284 """Remove a socket for Context.destroy / reference counting"""
285 # allow _sockets to be None in case of process teardown
286 if getattr(self, "_sockets", None) is not None:
287 self._sockets.discard(socket)
289 def destroy(self, linger: int | None = None) -> None:
290 """Close all sockets associated with this context and then terminate
291 the context.
293 .. warning::
295 destroy involves calling :meth:`Socket.close`, which is **NOT** threadsafe.
296 If there are active sockets in other threads, this must not be called.
298 Parameters
299 ----------
301 linger : int, optional
302 If specified, set LINGER on sockets prior to closing them.
303 """
304 if self.closed:
305 return
307 sockets: list[_SocketType] = list(getattr(self, "_sockets", None) or [])
308 for s in sockets:
309 if s and not s.closed:
310 if self._warn_destroy_close and warn is not None:
311 # warn can be None during process teardown
312 warn(
313 f"Destroying context with unclosed socket {s}",
314 ResourceWarning,
315 stacklevel=3,
316 source=s,
317 )
318 if linger is not None:
319 s.setsockopt(SocketOption.LINGER, linger)
320 s.close()
322 self.term()
324 def socket(
325 self: _ContextType,
326 socket_type: int,
327 socket_class: Callable[[_ContextType, int], _SocketType] | None = None,
328 **kwargs: Any,
329 ) -> _SocketType:
330 """Create a Socket associated with this Context.
332 Parameters
333 ----------
334 socket_type : int
335 The socket type, which can be any of the 0MQ socket types:
336 REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc.
338 socket_class: zmq.Socket
339 The socket class to instantiate, if different from the default for this Context.
340 e.g. for creating an asyncio socket attached to a default Context or vice versa.
342 .. versionadded:: 25
344 kwargs:
345 will be passed to the __init__ method of the socket class.
346 """
347 if self.closed:
348 raise ZMQError(Errno.ENOTSUP)
349 if socket_class is None:
350 socket_class = self._socket_class
351 s: _SocketType = (
352 socket_class( # set PYTHONTRACEMALLOC=2 to get the calling frame
353 self, socket_type, **kwargs
354 )
355 )
356 for opt, value in self.sockopts.items():
357 try:
358 s.setsockopt(opt, value)
359 except ZMQError:
360 # ignore ZMQErrors, which are likely for socket options
361 # that do not apply to a particular socket type, e.g.
362 # SUBSCRIBE for non-SUB sockets.
363 pass
364 self._add_socket(s)
365 return s
367 def setsockopt(self, opt: int, value: Any) -> None:
368 """set default socket options for new sockets created by this Context
370 .. versionadded:: 13.0
371 """
372 self.sockopts[opt] = value
374 def getsockopt(self, opt: int) -> OptValT:
375 """get default socket options for new sockets created by this Context
377 .. versionadded:: 13.0
378 """
379 return self.sockopts[opt]
381 def _set_attr_opt(self, name: str, opt: int, value: OptValT) -> None:
382 """set default sockopts as attributes"""
383 if name in ContextOption.__members__:
384 return self.set(opt, value)
385 elif name in SocketOption.__members__:
386 self.sockopts[opt] = value
387 else:
388 raise AttributeError(f"No such context or socket option: {name}")
390 def _get_attr_opt(self, name: str, opt: int) -> OptValT:
391 """get default sockopts as attributes"""
392 if name in ContextOption.__members__:
393 return self.get(opt)
394 else:
395 if opt not in self.sockopts:
396 raise AttributeError(name)
397 else:
398 return self.sockopts[opt]
400 def __delattr__(self, key: str) -> None:
401 """delete default sockopts as attributes"""
402 if key in self.__dict__:
403 self.__dict__.pop(key)
404 return
405 key = key.upper()
406 try:
407 opt = getattr(SocketOption, key)
408 except AttributeError:
409 raise AttributeError(f"No such socket option: {key!r}")
410 else:
411 if opt not in self.sockopts:
412 raise AttributeError(key)
413 else:
414 del self.sockopts[opt]
417SyncContext: TypeAlias = Context[SyncSocket]
420__all__ = ['Context', 'SyncContext']