Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/context.py: 36%
166 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""Python bindings for 0MQ."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6import atexit
7import os
8from threading import Lock
9from typing import (
10 Any,
11 Callable,
12 Dict,
13 Generic,
14 List,
15 Optional,
16 Type,
17 TypeVar,
18 Union,
19 overload,
20)
21from warnings import warn
22from weakref import WeakSet
24from zmq.backend import Context as ContextBase
25from zmq.constants import ContextOption, Errno, SocketOption
26from zmq.error import ZMQError
27from zmq.utils.interop import cast_int_addr
29from .attrsettr import AttributeSetter, OptValT
30from .socket import Socket
32# notice when exiting, to avoid triggering term on exit
33_exiting = False
36def _notice_atexit() -> None:
37 global _exiting
38 _exiting = True
41atexit.register(_notice_atexit)
43T = TypeVar('T', bound='Context')
44ST = TypeVar('ST', bound='Socket', covariant=True)
47class Context(ContextBase, AttributeSetter, Generic[ST]):
48 """Create a zmq Context
50 A zmq Context creates sockets via its ``ctx.socket`` method.
52 .. versionchanged:: 24
54 When using a Context as a context manager (``with zmq.Context()``),
55 or deleting a context without closing it first,
56 ``ctx.destroy()`` is called,
57 closing any leftover sockets,
58 instead of `ctx.term()` which requires sockets to be closed first.
60 This prevents hangs caused by `ctx.term()` if sockets are left open,
61 but means that unclean destruction of contexts
62 (with sockets left open) is not safe
63 if sockets are managed in other threads.
65 .. versionadded:: 25
67 Contexts can now be shadowed by passing another Context.
68 This helps in creating an async copy of a sync context or vice versa::
70 ctx = zmq.Context(async_ctx)
72 Which previously had to be::
74 ctx = zmq.Context.shadow(async_ctx.underlying)
75 """
77 sockopts: Dict[int, Any]
78 _instance: Any = None
79 _instance_lock = Lock()
80 _instance_pid: Optional[int] = None
81 _shadow = False
82 _shadow_obj = None
83 _warn_destroy_close = False
84 _sockets: WeakSet
85 # mypy doesn't like a default value here
86 _socket_class: Type[ST] = Socket # type: ignore
88 @overload
89 def __init__(self: "Context[Socket]", io_threads: int = 1):
90 ...
92 @overload
93 def __init__(self: "Context[Socket]", io_threads: "Context"):
94 # this should be positional-only, but that requires 3.8
95 ...
97 @overload
98 def __init__(self: "Context[Socket]", *, shadow: Union["Context", int]):
99 ...
101 def __init__(
102 self: "Context[Socket]",
103 io_threads: Union[int, "Context"] = 1,
104 shadow: Union["Context", int] = 0,
105 ) -> None:
106 if isinstance(io_threads, Context):
107 # allow positional shadow `zmq.Context(zmq.asyncio.Context())`
108 # this s
109 shadow = io_threads
110 io_threads = 1
112 shadow_address: int = 0
113 if shadow:
114 self._shadow = True
115 # hold a reference to the shadow object
116 self._shadow_obj = shadow
117 if not isinstance(shadow, int):
118 try:
119 shadow = shadow.underlying
120 except AttributeError:
121 pass
122 shadow_address = cast_int_addr(shadow)
123 else:
124 self._shadow = False
125 super().__init__(io_threads=io_threads, shadow=shadow_address)
126 self.sockopts = {}
127 self._sockets = WeakSet()
129 def __del__(self) -> None:
130 """Deleting a Context without closing it destroys it and all sockets.
132 .. versionchanged:: 24
133 Switch from threadsafe `term()` which hangs in the event of open sockets
134 to less safe `destroy()` which
135 warns about any leftover sockets and closes them.
136 """
138 # Calling locals() here conceals issue #1167 on Windows CPython 3.5.4.
139 locals()
141 if not self._shadow and not _exiting and not self.closed:
142 self._warn_destroy_close = True
143 if warn is not None and getattr(self, "_sockets", None) is not None:
144 # warn can be None during process teardown
145 warn(
146 f"Unclosed context {self}",
147 ResourceWarning,
148 stacklevel=2,
149 source=self,
150 )
151 self.destroy()
153 _repr_cls = "zmq.Context"
155 def __repr__(self) -> str:
156 cls = self.__class__
157 # look up _repr_cls on exact class, not inherited
158 _repr_cls = cls.__dict__.get("_repr_cls", None)
159 if _repr_cls is None:
160 _repr_cls = f"{cls.__module__}.{cls.__name__}"
162 closed = ' closed' if self.closed else ''
163 if getattr(self, "_sockets", None):
164 n_sockets = len(self._sockets)
165 s = 's' if n_sockets > 1 else ''
166 sockets = f"{n_sockets} socket{s}"
167 else:
168 sockets = ""
169 return f"<{_repr_cls}({sockets}) at {hex(id(self))}{closed}>"
171 def __enter__(self: T) -> T:
172 return self
174 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
175 # warn about any leftover sockets before closing them
176 self._warn_destroy_close = True
177 self.destroy()
179 def __copy__(self: T, memo: Any = None) -> T:
180 """Copying a Context creates a shadow copy"""
181 return self.__class__.shadow(self.underlying)
183 __deepcopy__ = __copy__
185 @classmethod
186 def shadow(cls: Type[T], address: Union[int, "Context"]) -> T:
187 """Shadow an existing libzmq context
189 address is a zmq.Context or an integer (or FFI pointer)
190 representing the address of the libzmq context.
192 .. versionadded:: 14.1
194 .. versionadded:: 25
195 Support for shadowing `zmq.Context` objects,
196 instead of just integer addresses.
197 """
198 return cls(shadow=address)
200 @classmethod
201 def shadow_pyczmq(cls: Type[T], ctx: Any) -> T:
202 """Shadow an existing pyczmq context
204 ctx is the FFI `zctx_t *` pointer
206 .. versionadded:: 14.1
207 """
208 from pyczmq import zctx # type: ignore
210 from zmq.utils.interop import cast_int_addr
212 underlying = zctx.underlying(ctx)
213 address = cast_int_addr(underlying)
214 return cls(shadow=address)
216 # static method copied from tornado IOLoop.instance
217 @classmethod
218 def instance(cls: Type[T], io_threads: int = 1) -> T:
219 """Returns a global Context instance.
221 Most single-threaded applications have a single, global Context.
222 Use this method instead of passing around Context instances
223 throughout your code.
225 A common pattern for classes that depend on Contexts is to use
226 a default argument to enable programs with multiple Contexts
227 but not require the argument for simpler applications::
229 class MyClass(object):
230 def __init__(self, context=None):
231 self.context = context or Context.instance()
233 .. versionchanged:: 18.1
235 When called in a subprocess after forking,
236 a new global instance is created instead of inheriting
237 a Context that won't work from the parent process.
238 """
239 if (
240 cls._instance is None
241 or cls._instance_pid != os.getpid()
242 or cls._instance.closed
243 ):
244 with cls._instance_lock:
245 if (
246 cls._instance is None
247 or cls._instance_pid != os.getpid()
248 or cls._instance.closed
249 ):
250 cls._instance = cls(io_threads=io_threads)
251 cls._instance_pid = os.getpid()
252 return cls._instance
254 def term(self) -> None:
255 """Close or terminate the context.
257 Context termination is performed in the following steps:
259 - Any blocking operations currently in progress on sockets open within context shall
260 raise :class:`zmq.ContextTerminated`.
261 With the exception of socket.close(), any further operations on sockets open within this context
262 shall raise :class:`zmq.ContextTerminated`.
263 - After interrupting all blocking calls, term shall block until the following conditions are satisfied:
264 - All sockets open within context have been closed.
265 - For each socket within context, all messages sent on the socket have either been
266 physically transferred to a network peer,
267 or the socket's linger period set with the zmq.LINGER socket option has expired.
269 For further details regarding socket linger behaviour refer to libzmq documentation for ZMQ_LINGER.
271 This can be called to close the context by hand. If this is not called,
272 the context will automatically be closed when it is garbage collected,
273 in which case you may see a ResourceWarning about the unclosed context.
274 """
275 super().term()
277 # -------------------------------------------------------------------------
278 # Hooks for ctxopt completion
279 # -------------------------------------------------------------------------
281 def __dir__(self) -> List[str]:
282 keys = dir(self.__class__)
283 keys.extend(ContextOption.__members__)
284 return keys
286 # -------------------------------------------------------------------------
287 # Creating Sockets
288 # -------------------------------------------------------------------------
290 def _add_socket(self, socket: Any) -> None:
291 """Add a weakref to a socket for Context.destroy / reference counting"""
292 self._sockets.add(socket)
294 def _rm_socket(self, socket: Any) -> None:
295 """Remove a socket for Context.destroy / reference counting"""
296 # allow _sockets to be None in case of process teardown
297 if getattr(self, "_sockets", None) is not None:
298 self._sockets.discard(socket)
300 def destroy(self, linger: Optional[int] = None) -> None:
301 """Close all sockets associated with this context and then terminate
302 the context.
304 .. warning::
306 destroy involves calling ``zmq_close()``, which is **NOT** threadsafe.
307 If there are active sockets in other threads, this must not be called.
309 Parameters
310 ----------
312 linger : int, optional
313 If specified, set LINGER on sockets prior to closing them.
314 """
315 if self.closed:
316 return
318 sockets: List[ST] = list(getattr(self, "_sockets", None) or [])
319 for s in sockets:
320 if s and not s.closed:
321 if self._warn_destroy_close and warn is not None:
322 # warn can be None during process teardown
323 warn(
324 f"Destroying context with unclosed socket {s}",
325 ResourceWarning,
326 stacklevel=3,
327 source=s,
328 )
329 if linger is not None:
330 s.setsockopt(SocketOption.LINGER, linger)
331 s.close()
333 self.term()
335 def socket(
336 self: T,
337 socket_type: int,
338 socket_class: Optional[Callable[[T, int], ST]] = None,
339 **kwargs: Any,
340 ) -> ST:
341 """Create a Socket associated with this Context.
343 Parameters
344 ----------
345 socket_type : int
346 The socket type, which can be any of the 0MQ socket types:
347 REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, etc.
349 socket_class: zmq.Socket or a subclass
350 The socket class to instantiate, if different from the default for this Context.
351 e.g. for creating an asyncio socket attached to a default Context or vice versa.
353 .. versionadded:: 25
355 kwargs:
356 will be passed to the __init__ method of the socket class.
357 """
358 if self.closed:
359 raise ZMQError(Errno.ENOTSUP)
360 if socket_class is None:
361 socket_class = self._socket_class
362 s: ST = socket_class( # set PYTHONTRACEMALLOC=2 to get the calling frame
363 self, socket_type, **kwargs
364 )
365 for opt, value in self.sockopts.items():
366 try:
367 s.setsockopt(opt, value)
368 except ZMQError:
369 # ignore ZMQErrors, which are likely for socket options
370 # that do not apply to a particular socket type, e.g.
371 # SUBSCRIBE for non-SUB sockets.
372 pass
373 self._add_socket(s)
374 return s
376 def setsockopt(self, opt: int, value: Any) -> None:
377 """set default socket options for new sockets created by this Context
379 .. versionadded:: 13.0
380 """
381 self.sockopts[opt] = value
383 def getsockopt(self, opt: int) -> OptValT:
384 """get default socket options for new sockets created by this Context
386 .. versionadded:: 13.0
387 """
388 return self.sockopts[opt]
390 def _set_attr_opt(self, name: str, opt: int, value: OptValT) -> None:
391 """set default sockopts as attributes"""
392 if name in ContextOption.__members__:
393 return self.set(opt, value)
394 elif name in SocketOption.__members__:
395 self.sockopts[opt] = value
396 else:
397 raise AttributeError(f"No such context or socket option: {name}")
399 def _get_attr_opt(self, name: str, opt: int) -> OptValT:
400 """get default sockopts as attributes"""
401 if name in ContextOption.__members__:
402 return self.get(opt)
403 else:
404 if opt not in self.sockopts:
405 raise AttributeError(name)
406 else:
407 return self.sockopts[opt]
409 def __delattr__(self, key: str) -> None:
410 """delete default sockopts as attributes"""
411 if key in self.__dict__:
412 self.__dict__.pop(key)
413 return
414 key = key.upper()
415 try:
416 opt = getattr(SocketOption, key)
417 except AttributeError:
418 raise AttributeError(f"No such socket option: {key!r}")
419 else:
420 if opt not in self.sockopts:
421 raise AttributeError(key)
422 else:
423 del self.sockopts[opt]
426__all__ = ['Context']