1"""Garbage collection thread for representing zmq refcount of Python objects
2used in zero-copy sends.
3"""
4
5# Copyright (C) PyZMQ Developers
6# Distributed under the terms of the Modified BSD License.
7
8import atexit
9import struct
10import warnings
11from collections import namedtuple
12from os import getpid
13from threading import Event, Lock, Thread
14
15import zmq
16
17gcref = namedtuple('gcref', ['obj', 'event'])
18
19
20class GarbageCollectorThread(Thread):
21 """Thread in which garbage collection actually happens."""
22
23 def __init__(self, gc):
24 super().__init__()
25 self.gc = gc
26 self.daemon = True
27 self.pid = getpid()
28 self.ready = Event()
29
30 def run(self):
31 # detect fork at beginning of the thread
32 if getpid is None or getpid() != self.pid:
33 self.ready.set()
34 return
35 try:
36 s = self.gc.context.socket(zmq.PULL)
37 s.linger = 0
38 s.bind(self.gc.url)
39 finally:
40 self.ready.set()
41
42 while True:
43 # detect fork
44 if getpid is None or getpid() != self.pid:
45 return
46 msg = s.recv()
47 if msg == b'DIE':
48 break
49 fmt = 'L' if len(msg) == 4 else 'Q'
50 key = struct.unpack(fmt, msg)[0]
51 tup = self.gc.refs.pop(key, None)
52 if tup and tup.event:
53 tup.event.set()
54 del tup
55 s.close()
56
57
58class GarbageCollector:
59 """PyZMQ Garbage Collector
60
61 Used for representing the reference held by libzmq during zero-copy sends.
62 This object holds a dictionary, keyed by Python id,
63 of the Python objects whose memory are currently in use by zeromq.
64
65 When zeromq is done with the memory, it sends a message on an inproc PUSH socket
66 containing the packed size_t (32 or 64-bit unsigned int),
67 which is the key in the dict.
68 When the PULL socket in the gc thread receives that message,
69 the reference is popped from the dict,
70 and any tracker events that should be signaled fire.
71 """
72
73 refs = None
74 _context = None
75 _lock = None
76 url = "inproc://pyzmq.gc.01"
77
78 def __init__(self, context=None):
79 super().__init__()
80 self.refs = {}
81 self.pid = None
82 self.thread = None
83 self._context = context
84 self._lock = Lock()
85 self._stay_down = False
86 self._push = None
87 self._push_mutex = None
88 atexit.register(self._atexit)
89
90 @property
91 def context(self):
92 if self._context is None:
93 if Thread.__module__.startswith('gevent'):
94 # gevent has monkey-patched Thread, use green Context
95 from zmq import green
96
97 self._context = green.Context()
98 else:
99 self._context = zmq.Context()
100 return self._context
101
102 @context.setter
103 def context(self, ctx):
104 if self.is_alive():
105 if self.refs:
106 warnings.warn(
107 "Replacing gc context while gc is running", RuntimeWarning
108 )
109 self.stop()
110 self._context = ctx
111
112 def _atexit(self):
113 """atexit callback
114
115 sets _stay_down flag so that gc doesn't try to start up again in other atexit handlers
116 """
117 self._stay_down = True
118 self.stop()
119
120 def stop(self):
121 """stop the garbage-collection thread"""
122 if not self.is_alive():
123 return
124 self._stop()
125
126 def _clear(self):
127 """Clear state
128
129 called after stop or when setting up a new subprocess
130 """
131 self._push = None
132 self._push_mutex = None
133 self.thread = None
134 self.refs.clear()
135 self.context = None
136
137 def _stop(self):
138 push = self.context.socket(zmq.PUSH)
139 push.connect(self.url)
140 push.send(b'DIE')
141 push.close()
142 if self._push:
143 self._push.close()
144 self.thread.join()
145 self.context.term()
146 self._clear()
147
148 @property
149 def _push_socket(self):
150 """The PUSH socket for use in the zmq message destructor callback."""
151 if getattr(self, "_stay_down", False):
152 raise RuntimeError("zmq gc socket requested during shutdown")
153 if not self.is_alive() or self._push is None:
154 self._push = self.context.socket(zmq.PUSH)
155 self._push.connect(self.url)
156 return self._push
157
158 def start(self):
159 """Start a new garbage collection thread.
160
161 Creates a new zmq Context used for garbage collection.
162 Under most circumstances, this will only be called once per process.
163 """
164 if self.thread is not None and self.pid != getpid():
165 # It's re-starting, must free earlier thread's context
166 # since a fork probably broke it
167 self._clear()
168 self.pid = getpid()
169 self.refs = {}
170 self.thread = GarbageCollectorThread(self)
171 self.thread.start()
172 self.thread.ready.wait()
173
174 def is_alive(self):
175 """Is the garbage collection thread currently running?
176
177 Includes checks for process shutdown or fork.
178 """
179 if (
180 getpid is None
181 or getpid() != self.pid
182 or self.thread is None
183 or not self.thread.is_alive()
184 ):
185 return False
186 return True
187
188 def store(self, obj, event=None):
189 """store an object and (optionally) event for zero-copy"""
190 if not self.is_alive():
191 if self._stay_down:
192 return 0
193 # safely start the gc thread
194 # use lock and double check,
195 # so we don't start multiple threads
196 with self._lock:
197 if not self.is_alive():
198 self.start()
199 tup = gcref(obj, event)
200 theid = id(tup)
201 self.refs[theid] = tup
202 return theid
203
204 def __del__(self):
205 if not self.is_alive():
206 return
207 try:
208 self.stop()
209 except Exception as e:
210 raise (e)
211
212
213gc = GarbageCollector()