1"""Python bindings for 0MQ."""
2
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
5
6from __future__ import annotations
7
8import atexit
9import os
10from threading import Lock
11from typing import Any, Callable, Generic, TypeVar, overload
12from warnings import warn
13from weakref import WeakSet
14
15import zmq
16from zmq._typing import TypeAlias
17from zmq.backend import Context as ContextBase
18from zmq.constants import ContextOption, Errno, SocketOption
19from zmq.error import ZMQError
20from zmq.utils.interop import cast_int_addr
21
22from .attrsettr import AttributeSetter, OptValT
23from .socket import Socket, SyncSocket
24
25# notice when exiting, to avoid triggering term on exit
26_exiting = False
27
28
29def _notice_atexit() -> None:
30 global _exiting
31 _exiting = True
32
33
34atexit.register(_notice_atexit)
35
36_ContextType = TypeVar('_ContextType', bound='Context')
37_SocketType = TypeVar('_SocketType', bound='Socket', covariant=True)
38
39
40class Context(ContextBase, AttributeSetter, Generic[_SocketType]):
41 """Create a zmq Context
42
43 A zmq Context creates sockets via its ``ctx.socket`` method.
44
45 .. versionchanged:: 24
46
47 When using a Context as a context manager (``with zmq.Context()``),
48 or deleting a context without closing it first,
49 ``ctx.destroy()`` is called,
50 closing any leftover sockets,
51 instead of `ctx.term()` which requires sockets to be closed first.
52
53 This prevents hangs caused by `ctx.term()` if sockets are left open,
54 but means that unclean destruction of contexts
55 (with sockets left open) is not safe
56 if sockets are managed in other threads.
57
58 .. versionadded:: 25
59
60 Contexts can now be shadowed by passing another Context.
61 This helps in creating an async copy of a sync context or vice versa::
62
63 ctx = zmq.Context(async_ctx)
64
65 Which previously had to be::
66
67 ctx = zmq.Context.shadow(async_ctx.underlying)
68 """
69
70 sockopts: dict[int, Any]
71 _instance: Any = None
72 _instance_lock = Lock()
73 _instance_pid: int | None = None
74 _shadow = False
75 _shadow_obj = None
76 _warn_destroy_close = False
77 _sockets: WeakSet
78 # mypy doesn't like a default value here
79 _socket_class: type[_SocketType] = Socket # type: ignore
80
81 @overload
82 def __init__(self: SyncContext, io_threads: int = 1): ...
83
84 @overload
85 def __init__(self: SyncContext, io_threads: Context):
86 # this should be positional-only, but that requires 3.8
87 ...
88
89 @overload
90 def __init__(self: SyncContext, *, shadow: Context | int): ...
91
92 def __init__(
93 self: SyncContext,
94 io_threads: int | Context = 1,
95 shadow: Context | int = 0,
96 ) -> None:
97 if isinstance(io_threads, Context):
98 # allow positional shadow `zmq.Context(zmq.asyncio.Context())`
99 # this s
100 shadow = io_threads
101 io_threads = 1
102
103 shadow_address: int = 0
104 if shadow:
105 self._shadow = True
106 # hold a reference to the shadow object
107 self._shadow_obj = shadow
108 if not isinstance(shadow, int):
109 try:
110 shadow = shadow.underlying
111 except AttributeError:
112 pass
113 shadow_address = cast_int_addr(shadow)
114 else:
115 self._shadow = False
116 super().__init__(io_threads=io_threads, shadow=shadow_address)
117 self.sockopts = {}
118 self._sockets = WeakSet()
119
120 def __del__(self) -> None:
121 """Deleting a Context without closing it destroys it and all sockets.
122
123 .. versionchanged:: 24
124 Switch from threadsafe `term()` which hangs in the event of open sockets
125 to less safe `destroy()` which
126 warns about any leftover sockets and closes them.
127 """
128
129 # Calling locals() here conceals issue #1167 on Windows CPython 3.5.4.
130 locals()
131
132 if not self._shadow and not _exiting and not self.closed:
133 self._warn_destroy_close = True
134 if warn is not None and getattr(self, "_sockets", None) is not None:
135 # warn can be None during process teardown
136 warn(
137 f"Unclosed context {self}",
138 ResourceWarning,
139 stacklevel=2,
140 source=self,
141 )
142 self.destroy()
143
144 _repr_cls = "zmq.Context"
145
146 def __repr__(self) -> str:
147 cls = self.__class__
148 # look up _repr_cls on exact class, not inherited
149 _repr_cls = cls.__dict__.get("_repr_cls", None)
150 if _repr_cls is None:
151 _repr_cls = f"{cls.__module__}.{cls.__name__}"
152
153 closed = ' closed' if self.closed else ''
154 if getattr(self, "_sockets", None):
155 n_sockets = len(self._sockets)
156 s = 's' if n_sockets > 1 else ''
157 sockets = f"{n_sockets} socket{s}"
158 else:
159 sockets = ""
160 return f"<{_repr_cls}({sockets}) at {hex(id(self))}{closed}>"
161
162 def __enter__(self: _ContextType) -> _ContextType:
163 return self
164
165 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
166 # warn about any leftover sockets before closing them
167 self._warn_destroy_close = True
168 self.destroy()
169
170 def __copy__(self: _ContextType, memo: Any = None) -> _ContextType:
171 """Copying a Context creates a shadow copy"""
172 return self.__class__.shadow(self.underlying)
173
174 __deepcopy__ = __copy__
175
176 @classmethod
177 def shadow(cls: type[_ContextType], address: int | zmq.Context) -> _ContextType:
178 """Shadow an existing libzmq context
179
180 address is a zmq.Context or an integer (or FFI pointer)
181 representing the address of the libzmq context.
182
183 .. versionadded:: 14.1
184
185 .. versionadded:: 25
186 Support for shadowing `zmq.Context` objects,
187 instead of just integer addresses.
188 """
189 return cls(shadow=address)
190
191 @classmethod
192 def shadow_pyczmq(cls: type[_ContextType], ctx: Any) -> _ContextType:
193 """Shadow an existing pyczmq context
194
195 ctx is the FFI `zctx_t *` pointer
196
197 .. versionadded:: 14.1
198 """
199 from pyczmq import zctx # type: ignore
200
201 from zmq.utils.interop import cast_int_addr
202
203 underlying = zctx.underlying(ctx)
204 address = cast_int_addr(underlying)
205 return cls(shadow=address)
206
207 # static method copied from tornado IOLoop.instance
208 @classmethod
209 def instance(cls: type[_ContextType], io_threads: int = 1) -> _ContextType:
210 """Returns a global Context instance.
211
212 Most single-process applications have a single, global Context.
213 Use this method instead of passing around Context instances
214 throughout your code.
215
216 A common pattern for classes that depend on Contexts is to use
217 a default argument to enable programs with multiple Contexts
218 but not require the argument for simpler applications::
219
220 class MyClass(object):
221 def __init__(self, context=None):
222 self.context = context or Context.instance()
223
224 .. versionchanged:: 18.1
225
226 When called in a subprocess after forking,
227 a new global instance is created instead of inheriting
228 a Context that won't work from the parent process.
229 """
230 if (
231 cls._instance is None
232 or cls._instance_pid != os.getpid()
233 or cls._instance.closed
234 ):
235 with cls._instance_lock:
236 if (
237 cls._instance is None
238 or cls._instance_pid != os.getpid()
239 or cls._instance.closed
240 ):
241 cls._instance = cls(io_threads=io_threads)
242 cls._instance_pid = os.getpid()
243 return cls._instance
244
245 def term(self) -> None:
246 """Close or terminate the context.
247
248 Context termination is performed in the following steps:
249
250 - Any blocking operations currently in progress on sockets open within context shall
251 raise :class:`zmq.ContextTerminated`.
252 With the exception of socket.close(), any further operations on sockets open within this context
253 shall raise :class:`zmq.ContextTerminated`.
254 - After interrupting all blocking calls, term shall block until the following conditions are satisfied:
255 - All sockets open within context have been closed.
256 - For each socket within context, all messages sent on the socket have either been
257 physically transferred to a network peer,
258 or the socket's linger period set with the zmq.LINGER socket option has expired.
259
260 For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER.
261
262 This can be called to close the context by hand. If this is not called,
263 the context will automatically be closed when it is garbage collected,
264 in which case you may see a ResourceWarning about the unclosed context.
265 """
266 super().term()
267
268 # -------------------------------------------------------------------------
269 # Hooks for ctxopt completion
270 # -------------------------------------------------------------------------
271
272 def __dir__(self) -> list[str]:
273 keys = dir(self.__class__)
274 keys.extend(ContextOption.__members__)
275 return keys
276
277 # -------------------------------------------------------------------------
278 # Creating Sockets
279 # -------------------------------------------------------------------------
280
281 def _add_socket(self, socket: Any) -> None:
282 """Add a weakref to a socket for Context.destroy / reference counting"""
283 self._sockets.add(socket)
284
285 def _rm_socket(self, socket: Any) -> None:
286 """Remove a socket for Context.destroy / reference counting"""
287 # allow _sockets to be None in case of process teardown
288 if getattr(self, "_sockets", None) is not None:
289 self._sockets.discard(socket)
290
291 def destroy(self, linger: int | None = None) -> None:
292 """Close all sockets associated with this context and then terminate
293 the context.
294
295 .. warning::
296
297 destroy involves calling :meth:`Socket.close`, which is **NOT** threadsafe.
298 If there are active sockets in other threads, this must not be called.
299
300 Parameters
301 ----------
302
303 linger : int, optional
304 If specified, set LINGER on sockets prior to closing them.
305 """
306 if self.closed:
307 return
308
309 sockets: list[_SocketType] = list(getattr(self, "_sockets", None) or [])
310 for s in sockets:
311 if s and not s.closed:
312 if self._warn_destroy_close and warn is not None:
313 # warn can be None during process teardown
314 warn(
315 f"Destroying context with unclosed socket {s}",
316 ResourceWarning,
317 stacklevel=3,
318 source=s,
319 )
320 if linger is not None:
321 s.setsockopt(SocketOption.LINGER, linger)
322 s.close()
323
324 self.term()
325
326 def socket(
327 self: _ContextType,
328 socket_type: int,
329 socket_class: Callable[[_ContextType, int], _SocketType] | None = None,
330 **kwargs: Any,
331 ) -> _SocketType:
332 """Create a Socket associated with this Context.
333
334 Parameters
335 ----------
336 socket_type : int
337 The socket type, which can be any of the 0MQ socket types:
338 REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc.
339
340 socket_class: zmq.Socket
341 The socket class to instantiate, if different from the default for this Context.
342 e.g. for creating an asyncio socket attached to a default Context or vice versa.
343
344 .. versionadded:: 25
345
346 kwargs:
347 will be passed to the __init__ method of the socket class.
348 """
349 if self.closed:
350 raise ZMQError(Errno.ENOTSUP)
351 if socket_class is None:
352 socket_class = self._socket_class
353 s: _SocketType = (
354 socket_class( # set PYTHONTRACEMALLOC=2 to get the calling frame
355 self, socket_type, **kwargs
356 )
357 )
358 for opt, value in self.sockopts.items():
359 try:
360 s.setsockopt(opt, value)
361 except ZMQError:
362 # ignore ZMQErrors, which are likely for socket options
363 # that do not apply to a particular socket type, e.g.
364 # SUBSCRIBE for non-SUB sockets.
365 pass
366 self._add_socket(s)
367 return s
368
369 def setsockopt(self, opt: int, value: Any) -> None:
370 """set default socket options for new sockets created by this Context
371
372 .. versionadded:: 13.0
373 """
374 self.sockopts[opt] = value
375
376 def getsockopt(self, opt: int) -> OptValT:
377 """get default socket options for new sockets created by this Context
378
379 .. versionadded:: 13.0
380 """
381 return self.sockopts[opt]
382
383 def _set_attr_opt(self, name: str, opt: int, value: OptValT) -> None:
384 """set default sockopts as attributes"""
385 if name in ContextOption.__members__:
386 return self.set(opt, value)
387 elif name in SocketOption.__members__:
388 self.sockopts[opt] = value
389 else:
390 raise AttributeError(f"No such context or socket option: {name}")
391
392 def _get_attr_opt(self, name: str, opt: int) -> OptValT:
393 """get default sockopts as attributes"""
394 if name in ContextOption.__members__:
395 return self.get(opt)
396 else:
397 if opt not in self.sockopts:
398 raise AttributeError(name)
399 else:
400 return self.sockopts[opt]
401
402 def __delattr__(self, key: str) -> None:
403 """delete default sockopts as attributes"""
404 if key in self.__dict__:
405 self.__dict__.pop(key)
406 return
407 key = key.upper()
408 try:
409 opt = getattr(SocketOption, key)
410 except AttributeError:
411 raise AttributeError(f"No such socket option: {key!r}")
412 else:
413 if opt not in self.sockopts:
414 raise AttributeError(key)
415 else:
416 del self.sockopts[opt]
417
418
419SyncContext: TypeAlias = Context[SyncSocket]
420
421
422__all__ = ['Context', 'SyncContext']