Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/utils/garbage.py: 87%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Garbage collection thread for representing zmq refcount of Python objects
2used in zero-copy sends.
3"""
5# Copyright (C) PyZMQ Developers
6# Distributed under the terms of the Modified BSD License.
8from __future__ import annotations
10import atexit
11import struct
12import warnings
13from os import getpid
14from threading import Event, Lock, Thread
15from typing import Final, NamedTuple
17import zmq
20class gcref(NamedTuple):
21 obj: object
22 event: Event | None
25class GarbageCollectorThread(Thread):
26 """Thread in which garbage collection actually happens."""
28 gc: GarbageCollector
29 daemon: bool
30 pid: int
31 ready: Event
33 def __init__(self, gc: GarbageCollector) -> None:
34 super().__init__()
35 self.gc = gc
36 self.daemon = True
37 self.pid = getpid()
38 self.ready = Event()
40 def run(self) -> None:
41 # detect fork at beginning of the thread
42 if getpid is None or getpid() != self.pid:
43 self.ready.set()
44 return
45 try:
46 s = self.gc.context.socket(zmq.PULL)
47 s.linger = 0
48 s.bind(self.gc.url)
49 finally:
50 self.ready.set()
52 while True:
53 # detect fork
54 if getpid is None or getpid() != self.pid:
55 return
56 msg = s.recv()
57 if msg == b'DIE':
58 break
59 fmt = 'L' if len(msg) == 4 else 'Q'
60 key = struct.unpack(fmt, msg)[0]
61 tup = self.gc.refs.pop(key, None)
62 if tup and tup.event:
63 tup.event.set()
64 del tup
65 s.close()
68class GarbageCollector:
69 """PyZMQ Garbage Collector
71 Used for representing the reference held by libzmq during zero-copy sends.
72 This object holds a dictionary, keyed by Python id,
73 of the Python objects whose memory are currently in use by zeromq.
75 When zeromq is done with the memory, it sends a message on an inproc PUSH socket
76 containing the packed size_t (32 or 64-bit unsigned int),
77 which is the key in the dict.
78 When the PULL socket in the gc thread receives that message,
79 the reference is popped from the dict,
80 and any tracker events that should be signaled fire.
81 """
83 # refs = None
84 _context: zmq.Context | None = None
85 # _lock = None
86 url = "inproc://pyzmq.gc.01"
88 refs: dict[int, gcref]
89 _lock: Lock
90 _push: zmq.Socket | None
92 def __init__(self, context: zmq.Context | None = None) -> None:
93 super().__init__()
94 self.refs = {}
95 self.pid: int | None = None
96 self.thread: GarbageCollectorThread | None = None
97 self._context = context
98 self._lock = Lock()
99 self._stay_down = False
100 self._push = None
101 self._push_mutex = None
102 atexit.register(self._atexit)
104 @property
105 def context(self) -> zmq.Context:
106 if self._context is None:
107 if Thread.__module__.startswith('gevent'):
108 # gevent has monkey-patched Thread, use green Context
109 from zmq import green
111 self._context = green.Context()
112 else:
113 self._context = zmq.Context()
114 return self._context
116 @context.setter
117 def context(self, ctx: zmq.Context | None) -> None:
118 if self.is_alive():
119 if self.refs:
120 warnings.warn(
121 "Replacing gc context while gc is running", RuntimeWarning
122 )
123 self.stop()
124 self._context = ctx
126 def _atexit(self) -> None:
127 """atexit callback
129 sets _stay_down flag so that gc doesn't try to start up again in other atexit handlers
130 """
131 self._stay_down = True
132 self.stop()
134 def stop(self) -> None:
135 """stop the garbage-collection thread"""
136 if not self.is_alive():
137 return
138 self._stop()
140 def _clear(self) -> None:
141 """Clear state
143 called after stop or when setting up a new subprocess
144 """
145 self._push = None
146 self._push_mutex = None
147 self.thread = None
148 self.refs.clear()
149 self.context = None
151 def _stop(self) -> None:
152 push = self.context.socket(zmq.PUSH)
153 push.connect(self.url)
154 push.send(b'DIE')
155 push.close()
156 if self._push:
157 self._push.close()
158 self.thread.join() # type:ignore[union-attr]
159 self.context.term()
160 self._clear()
162 @property
163 def _push_socket(self) -> zmq.Socket:
164 """The PUSH socket for use in the zmq message destructor callback."""
165 if getattr(self, "_stay_down", False):
166 raise RuntimeError("zmq gc socket requested during shutdown")
167 if not self.is_alive() or self._push is None:
168 self._push = self.context.socket(zmq.PUSH)
169 self._push.connect(self.url)
170 return self._push
172 def start(self) -> None:
173 """Start a new garbage collection thread.
175 Creates a new zmq Context used for garbage collection.
176 Under most circumstances, this will only be called once per process.
177 """
178 if self.thread is not None and self.pid != getpid():
179 # It's re-starting, must free earlier thread's context
180 # since a fork probably broke it
181 self._clear()
182 self.pid = getpid()
183 self.refs = {}
184 self.thread = GarbageCollectorThread(self)
185 self.thread.start()
186 self.thread.ready.wait()
188 def is_alive(self) -> bool:
189 """Is the garbage collection thread currently running?
191 Includes checks for process shutdown or fork.
192 """
193 if (
194 getpid is None
195 or getpid() != self.pid
196 or self.thread is None
197 or not self.thread.is_alive()
198 ):
199 return False
200 return True
202 def store(self, obj: object, event: Event | None = None) -> int:
203 """store an object and (optionally) event for zero-copy"""
204 if not self.is_alive():
205 if self._stay_down:
206 return 0
207 # safely start the gc thread
208 # use lock and double check,
209 # so we don't start multiple threads
210 with self._lock:
211 if not self.is_alive():
212 self.start()
213 tup = gcref(obj, event)
214 theid = id(tup)
215 self.refs[theid] = tup
216 return theid
218 def __del__(self) -> None:
219 if not self.is_alive():
220 return
221 try:
222 self.stop()
223 except Exception as e:
224 raise (e)
227gc: Final[GarbageCollector] = GarbageCollector()