Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/utils/garbage.py: 86%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

131 statements  

1"""Garbage collection thread for representing zmq refcount of Python objects 

2used in zero-copy sends. 

3""" 

4 

5# Copyright (C) PyZMQ Developers 

6# Distributed under the terms of the Modified BSD License. 

7 

8import atexit 

9import struct 

10import warnings 

11from collections import namedtuple 

12from os import getpid 

13from threading import Event, Lock, Thread 

14 

15import zmq 

16 

17gcref = namedtuple('gcref', ['obj', 'event']) 

18 

19 

20class GarbageCollectorThread(Thread): 

21 """Thread in which garbage collection actually happens.""" 

22 

23 def __init__(self, gc): 

24 super().__init__() 

25 self.gc = gc 

26 self.daemon = True 

27 self.pid = getpid() 

28 self.ready = Event() 

29 

30 def run(self): 

31 # detect fork at beginning of the thread 

32 if getpid is None or getpid() != self.pid: 

33 self.ready.set() 

34 return 

35 try: 

36 s = self.gc.context.socket(zmq.PULL) 

37 s.linger = 0 

38 s.bind(self.gc.url) 

39 finally: 

40 self.ready.set() 

41 

42 while True: 

43 # detect fork 

44 if getpid is None or getpid() != self.pid: 

45 return 

46 msg = s.recv() 

47 if msg == b'DIE': 

48 break 

49 fmt = 'L' if len(msg) == 4 else 'Q' 

50 key = struct.unpack(fmt, msg)[0] 

51 tup = self.gc.refs.pop(key, None) 

52 if tup and tup.event: 

53 tup.event.set() 

54 del tup 

55 s.close() 

56 

57 

58class GarbageCollector: 

59 """PyZMQ Garbage Collector 

60 

61 Used for representing the reference held by libzmq during zero-copy sends. 

62 This object holds a dictionary, keyed by Python id, 

63 of the Python objects whose memory are currently in use by zeromq. 

64 

65 When zeromq is done with the memory, it sends a message on an inproc PUSH socket 

66 containing the packed size_t (32 or 64-bit unsigned int), 

67 which is the key in the dict. 

68 When the PULL socket in the gc thread receives that message, 

69 the reference is popped from the dict, 

70 and any tracker events that should be signaled fire. 

71 """ 

72 

73 refs = None 

74 _context = None 

75 _lock = None 

76 url = "inproc://pyzmq.gc.01" 

77 

78 def __init__(self, context=None): 

79 super().__init__() 

80 self.refs = {} 

81 self.pid = None 

82 self.thread = None 

83 self._context = context 

84 self._lock = Lock() 

85 self._stay_down = False 

86 self._push = None 

87 self._push_mutex = None 

88 atexit.register(self._atexit) 

89 

90 @property 

91 def context(self): 

92 if self._context is None: 

93 if Thread.__module__.startswith('gevent'): 

94 # gevent has monkey-patched Thread, use green Context 

95 from zmq import green 

96 

97 self._context = green.Context() 

98 else: 

99 self._context = zmq.Context() 

100 return self._context 

101 

102 @context.setter 

103 def context(self, ctx): 

104 if self.is_alive(): 

105 if self.refs: 

106 warnings.warn( 

107 "Replacing gc context while gc is running", RuntimeWarning 

108 ) 

109 self.stop() 

110 self._context = ctx 

111 

112 def _atexit(self): 

113 """atexit callback 

114 

115 sets _stay_down flag so that gc doesn't try to start up again in other atexit handlers 

116 """ 

117 self._stay_down = True 

118 self.stop() 

119 

120 def stop(self): 

121 """stop the garbage-collection thread""" 

122 if not self.is_alive(): 

123 return 

124 self._stop() 

125 

126 def _clear(self): 

127 """Clear state 

128 

129 called after stop or when setting up a new subprocess 

130 """ 

131 self._push = None 

132 self._push_mutex = None 

133 self.thread = None 

134 self.refs.clear() 

135 self.context = None 

136 

137 def _stop(self): 

138 push = self.context.socket(zmq.PUSH) 

139 push.connect(self.url) 

140 push.send(b'DIE') 

141 push.close() 

142 if self._push: 

143 self._push.close() 

144 self.thread.join() 

145 self.context.term() 

146 self._clear() 

147 

148 @property 

149 def _push_socket(self): 

150 """The PUSH socket for use in the zmq message destructor callback.""" 

151 if getattr(self, "_stay_down", False): 

152 raise RuntimeError("zmq gc socket requested during shutdown") 

153 if not self.is_alive() or self._push is None: 

154 self._push = self.context.socket(zmq.PUSH) 

155 self._push.connect(self.url) 

156 return self._push 

157 

158 def start(self): 

159 """Start a new garbage collection thread. 

160 

161 Creates a new zmq Context used for garbage collection. 

162 Under most circumstances, this will only be called once per process. 

163 """ 

164 if self.thread is not None and self.pid != getpid(): 

165 # It's re-starting, must free earlier thread's context 

166 # since a fork probably broke it 

167 self._clear() 

168 self.pid = getpid() 

169 self.refs = {} 

170 self.thread = GarbageCollectorThread(self) 

171 self.thread.start() 

172 self.thread.ready.wait() 

173 

174 def is_alive(self): 

175 """Is the garbage collection thread currently running? 

176 

177 Includes checks for process shutdown or fork. 

178 """ 

179 if ( 

180 getpid is None 

181 or getpid() != self.pid 

182 or self.thread is None 

183 or not self.thread.is_alive() 

184 ): 

185 return False 

186 return True 

187 

188 def store(self, obj, event=None): 

189 """store an object and (optionally) event for zero-copy""" 

190 if not self.is_alive(): 

191 if self._stay_down: 

192 return 0 

193 # safely start the gc thread 

194 # use lock and double check, 

195 # so we don't start multiple threads 

196 with self._lock: 

197 if not self.is_alive(): 

198 self.start() 

199 tup = gcref(obj, event) 

200 theid = id(tup) 

201 self.refs[theid] = tup 

202 return theid 

203 

204 def __del__(self): 

205 if not self.is_alive(): 

206 return 

207 try: 

208 self.stop() 

209 except Exception as e: 

210 raise (e) 

211 

212 

213gc = GarbageCollector()