1"""Python bindings for 0MQ."""
2
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
5
6from __future__ import annotations
7
8import atexit
9import os
10from threading import Lock
11from typing import Any, Callable, Generic, TypeVar, overload
12from warnings import warn
13from weakref import WeakSet
14
15import zmq
16from zmq._typing import TypeAlias
17from zmq.backend import Context as ContextBase
18from zmq.constants import ContextOption, Errno, SocketOption
19from zmq.error import ZMQError
20from zmq.utils.interop import cast_int_addr
21
22from .attrsettr import AttributeSetter, OptValT
23from .socket import Socket, SyncSocket
24
25# notice when exiting, to avoid triggering term on exit
26_exiting = False
27
28
29def _notice_atexit() -> None:
30 global _exiting
31 _exiting = True
32
33
34atexit.register(_notice_atexit)
35
36_ContextType = TypeVar('_ContextType', bound='Context')
37_SocketType = TypeVar('_SocketType', bound='Socket', covariant=True)
38
39
40class Context(ContextBase, AttributeSetter, Generic[_SocketType]):
41 """Create a zmq Context
42
43 A zmq Context creates sockets via its ``ctx.socket`` method.
44
45 .. versionchanged:: 24
46
47 When using a Context as a context manager (``with zmq.Context()``),
48 or deleting a context without closing it first,
49 ``ctx.destroy()`` is called,
50 closing any leftover sockets,
51 instead of `ctx.term()` which requires sockets to be closed first.
52
53 This prevents hangs caused by `ctx.term()` if sockets are left open,
54 but means that unclean destruction of contexts
55 (with sockets left open) is not safe
56 if sockets are managed in other threads.
57
58 .. versionadded:: 25
59
60 Contexts can now be shadowed by passing another Context.
61 This helps in creating an async copy of a sync context or vice versa::
62
63 ctx = zmq.Context(async_ctx)
64
65 Which previously had to be::
66
67 ctx = zmq.Context.shadow(async_ctx.underlying)
68 """
69
70 sockopts: dict[int, Any]
71 _instance: Any = None
72 _instance_lock = Lock()
73 _instance_pid: int | None = None
74 _shadow = False
75 _shadow_obj = None
76 _warn_destroy_close = False
77 _sockets: WeakSet
78 # mypy doesn't like a default value here
79 _socket_class: type[_SocketType] = Socket # type: ignore
80
81 @overload
82 def __init__(self: SyncContext, io_threads: int = 1): ...
83
84 @overload
85 def __init__(self: SyncContext, io_threads: Context, /): ...
86
87 @overload
88 def __init__(self: SyncContext, *, shadow: Context | int): ...
89
90 def __init__(
91 self: SyncContext,
92 io_threads: int | Context = 1,
93 shadow: Context | int = 0,
94 ) -> None:
95 if isinstance(io_threads, Context):
96 # allow positional shadow `zmq.Context(zmq.asyncio.Context())`
97 # this s
98 shadow = io_threads
99 io_threads = 1
100
101 shadow_address: int = 0
102 if shadow:
103 self._shadow = True
104 # hold a reference to the shadow object
105 self._shadow_obj = shadow
106 if not isinstance(shadow, int):
107 try:
108 shadow = shadow.underlying
109 except AttributeError:
110 pass
111 shadow_address = cast_int_addr(shadow)
112 else:
113 self._shadow = False
114 super().__init__(io_threads=io_threads, shadow=shadow_address)
115 self.sockopts = {}
116 self._sockets = WeakSet()
117
118 def __del__(self) -> None:
119 """Deleting a Context without closing it destroys it and all sockets.
120
121 .. versionchanged:: 24
122 Switch from threadsafe `term()` which hangs in the event of open sockets
123 to less safe `destroy()` which
124 warns about any leftover sockets and closes them.
125 """
126
127 # Calling locals() here conceals issue #1167 on Windows CPython 3.5.4.
128 locals()
129
130 if not self._shadow and not _exiting and not self.closed:
131 self._warn_destroy_close = True
132 if warn is not None and getattr(self, "_sockets", None) is not None:
133 # warn can be None during process teardown
134 warn(
135 f"Unclosed context {self}",
136 ResourceWarning,
137 stacklevel=2,
138 source=self,
139 )
140 self.destroy()
141
142 _repr_cls = "zmq.Context"
143
144 def __repr__(self) -> str:
145 cls = self.__class__
146 # look up _repr_cls on exact class, not inherited
147 _repr_cls = cls.__dict__.get("_repr_cls", None)
148 if _repr_cls is None:
149 _repr_cls = f"{cls.__module__}.{cls.__name__}"
150
151 closed = ' closed' if self.closed else ''
152 if getattr(self, "_sockets", None):
153 n_sockets = len(self._sockets)
154 s = 's' if n_sockets > 1 else ''
155 sockets = f"{n_sockets} socket{s}"
156 else:
157 sockets = ""
158 return f"<{_repr_cls}({sockets}) at {hex(id(self))}{closed}>"
159
160 def __enter__(self: _ContextType) -> _ContextType:
161 return self
162
163 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
164 # warn about any leftover sockets before closing them
165 self._warn_destroy_close = True
166 self.destroy()
167
168 def __copy__(self: _ContextType, memo: Any = None) -> _ContextType:
169 """Copying a Context creates a shadow copy"""
170 return self.__class__.shadow(self.underlying)
171
172 __deepcopy__ = __copy__
173
174 @classmethod
175 def shadow(cls: type[_ContextType], address: int | zmq.Context) -> _ContextType:
176 """Shadow an existing libzmq context
177
178 address is a zmq.Context or an integer (or FFI pointer)
179 representing the address of the libzmq context.
180
181 .. versionadded:: 14.1
182
183 .. versionadded:: 25
184 Support for shadowing `zmq.Context` objects,
185 instead of just integer addresses.
186 """
187 return cls(shadow=address)
188
189 @classmethod
190 def shadow_pyczmq(cls: type[_ContextType], ctx: Any) -> _ContextType:
191 """Shadow an existing pyczmq context
192
193 ctx is the FFI `zctx_t *` pointer
194
195 .. versionadded:: 14.1
196 """
197 from pyczmq import zctx # type: ignore
198
199 from zmq.utils.interop import cast_int_addr
200
201 underlying = zctx.underlying(ctx)
202 address = cast_int_addr(underlying)
203 return cls(shadow=address)
204
205 # static method copied from tornado IOLoop.instance
206 @classmethod
207 def instance(cls: type[_ContextType], io_threads: int = 1) -> _ContextType:
208 """Returns a global Context instance.
209
210 Most single-process applications have a single, global Context.
211 Use this method instead of passing around Context instances
212 throughout your code.
213
214 A common pattern for classes that depend on Contexts is to use
215 a default argument to enable programs with multiple Contexts
216 but not require the argument for simpler applications::
217
218 class MyClass(object):
219 def __init__(self, context=None):
220 self.context = context or Context.instance()
221
222 .. versionchanged:: 18.1
223
224 When called in a subprocess after forking,
225 a new global instance is created instead of inheriting
226 a Context that won't work from the parent process.
227 """
228 if (
229 cls._instance is None
230 or cls._instance_pid != os.getpid()
231 or cls._instance.closed
232 ):
233 with cls._instance_lock:
234 if (
235 cls._instance is None
236 or cls._instance_pid != os.getpid()
237 or cls._instance.closed
238 ):
239 cls._instance = cls(io_threads=io_threads)
240 cls._instance_pid = os.getpid()
241 return cls._instance
242
243 def term(self) -> None:
244 """Close or terminate the context.
245
246 Context termination is performed in the following steps:
247
248 - Any blocking operations currently in progress on sockets open within context shall
249 raise :class:`zmq.ContextTerminated`.
250 With the exception of socket.close(), any further operations on sockets open within this context
251 shall raise :class:`zmq.ContextTerminated`.
252 - After interrupting all blocking calls, term shall block until the following conditions are satisfied:
253 - All sockets open within context have been closed.
254 - For each socket within context, all messages sent on the socket have either been
255 physically transferred to a network peer,
256 or the socket's linger period set with the zmq.LINGER socket option has expired.
257
258 For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER.
259
260 This can be called to close the context by hand. If this is not called,
261 the context will automatically be closed when it is garbage collected,
262 in which case you may see a ResourceWarning about the unclosed context.
263 """
264 super().term()
265
266 # -------------------------------------------------------------------------
267 # Hooks for ctxopt completion
268 # -------------------------------------------------------------------------
269
270 def __dir__(self) -> list[str]:
271 keys = dir(self.__class__)
272 keys.extend(ContextOption.__members__)
273 return keys
274
275 # -------------------------------------------------------------------------
276 # Creating Sockets
277 # -------------------------------------------------------------------------
278
279 def _add_socket(self, socket: Any) -> None:
280 """Add a weakref to a socket for Context.destroy / reference counting"""
281 self._sockets.add(socket)
282
283 def _rm_socket(self, socket: Any) -> None:
284 """Remove a socket for Context.destroy / reference counting"""
285 # allow _sockets to be None in case of process teardown
286 if getattr(self, "_sockets", None) is not None:
287 self._sockets.discard(socket)
288
289 def destroy(self, linger: int | None = None) -> None:
290 """Close all sockets associated with this context and then terminate
291 the context.
292
293 .. warning::
294
295 destroy involves calling :meth:`Socket.close`, which is **NOT** threadsafe.
296 If there are active sockets in other threads, this must not be called.
297
298 Parameters
299 ----------
300
301 linger : int, optional
302 If specified, set LINGER on sockets prior to closing them.
303 """
304 if self.closed:
305 return
306
307 sockets: list[_SocketType] = list(getattr(self, "_sockets", None) or [])
308 for s in sockets:
309 if s and not s.closed:
310 if self._warn_destroy_close and warn is not None:
311 # warn can be None during process teardown
312 warn(
313 f"Destroying context with unclosed socket {s}",
314 ResourceWarning,
315 stacklevel=3,
316 source=s,
317 )
318 if linger is not None:
319 s.setsockopt(SocketOption.LINGER, linger)
320 s.close()
321
322 self.term()
323
324 def socket(
325 self: _ContextType,
326 socket_type: int,
327 socket_class: Callable[[_ContextType, int], _SocketType] | None = None,
328 **kwargs: Any,
329 ) -> _SocketType:
330 """Create a Socket associated with this Context.
331
332 Parameters
333 ----------
334 socket_type : int
335 The socket type, which can be any of the 0MQ socket types:
336 REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc.
337
338 socket_class: zmq.Socket
339 The socket class to instantiate, if different from the default for this Context.
340 e.g. for creating an asyncio socket attached to a default Context or vice versa.
341
342 .. versionadded:: 25
343
344 kwargs:
345 will be passed to the __init__ method of the socket class.
346 """
347 if self.closed:
348 raise ZMQError(Errno.ENOTSUP)
349 if socket_class is None:
350 socket_class = self._socket_class
351 s: _SocketType = (
352 socket_class( # set PYTHONTRACEMALLOC=2 to get the calling frame
353 self, socket_type, **kwargs
354 )
355 )
356 for opt, value in self.sockopts.items():
357 try:
358 s.setsockopt(opt, value)
359 except ZMQError:
360 # ignore ZMQErrors, which are likely for socket options
361 # that do not apply to a particular socket type, e.g.
362 # SUBSCRIBE for non-SUB sockets.
363 pass
364 self._add_socket(s)
365 return s
366
367 def setsockopt(self, opt: int, value: Any) -> None:
368 """set default socket options for new sockets created by this Context
369
370 .. versionadded:: 13.0
371 """
372 self.sockopts[opt] = value
373
374 def getsockopt(self, opt: int) -> OptValT:
375 """get default socket options for new sockets created by this Context
376
377 .. versionadded:: 13.0
378 """
379 return self.sockopts[opt]
380
381 def _set_attr_opt(self, name: str, opt: int, value: OptValT) -> None:
382 """set default sockopts as attributes"""
383 if name in ContextOption.__members__:
384 return self.set(opt, value)
385 elif name in SocketOption.__members__:
386 self.sockopts[opt] = value
387 else:
388 raise AttributeError(f"No such context or socket option: {name}")
389
390 def _get_attr_opt(self, name: str, opt: int) -> OptValT:
391 """get default sockopts as attributes"""
392 if name in ContextOption.__members__:
393 return self.get(opt)
394 else:
395 if opt not in self.sockopts:
396 raise AttributeError(name)
397 else:
398 return self.sockopts[opt]
399
400 def __delattr__(self, key: str) -> None:
401 """delete default sockopts as attributes"""
402 if key in self.__dict__:
403 self.__dict__.pop(key)
404 return
405 key = key.upper()
406 try:
407 opt = getattr(SocketOption, key)
408 except AttributeError:
409 raise AttributeError(f"No such socket option: {key!r}")
410 else:
411 if opt not in self.sockopts:
412 raise AttributeError(key)
413 else:
414 del self.sockopts[opt]
415
416
417SyncContext: TypeAlias = Context[SyncSocket]
418
419
420__all__ = ['Context', 'SyncContext']