Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/utils/garbage.py: 87%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

139 statements  

1"""Garbage collection thread for representing zmq refcount of Python objects 

2used in zero-copy sends. 

3""" 

4 

5# Copyright (C) PyZMQ Developers 

6# Distributed under the terms of the Modified BSD License. 

7 

8from __future__ import annotations 

9 

10import atexit 

11import struct 

12import warnings 

13from os import getpid 

14from threading import Event, Lock, Thread 

15from typing import Final, NamedTuple 

16 

17import zmq 

18 

19 

20class gcref(NamedTuple): 

21 obj: object 

22 event: Event | None 

23 

24 

25class GarbageCollectorThread(Thread): 

26 """Thread in which garbage collection actually happens.""" 

27 

28 gc: GarbageCollector 

29 daemon: bool 

30 pid: int 

31 ready: Event 

32 

33 def __init__(self, gc: GarbageCollector) -> None: 

34 super().__init__() 

35 self.gc = gc 

36 self.daemon = True 

37 self.pid = getpid() 

38 self.ready = Event() 

39 

40 def run(self) -> None: 

41 # detect fork at beginning of the thread 

42 if getpid is None or getpid() != self.pid: 

43 self.ready.set() 

44 return 

45 try: 

46 s = self.gc.context.socket(zmq.PULL) 

47 s.linger = 0 

48 s.bind(self.gc.url) 

49 finally: 

50 self.ready.set() 

51 

52 while True: 

53 # detect fork 

54 if getpid is None or getpid() != self.pid: 

55 return 

56 msg = s.recv() 

57 if msg == b'DIE': 

58 break 

59 fmt = 'L' if len(msg) == 4 else 'Q' 

60 key = struct.unpack(fmt, msg)[0] 

61 tup = self.gc.refs.pop(key, None) 

62 if tup and tup.event: 

63 tup.event.set() 

64 del tup 

65 s.close() 

66 

67 

68class GarbageCollector: 

69 """PyZMQ Garbage Collector 

70 

71 Used for representing the reference held by libzmq during zero-copy sends. 

72 This object holds a dictionary, keyed by Python id, 

73 of the Python objects whose memory are currently in use by zeromq. 

74 

75 When zeromq is done with the memory, it sends a message on an inproc PUSH socket 

76 containing the packed size_t (32 or 64-bit unsigned int), 

77 which is the key in the dict. 

78 When the PULL socket in the gc thread receives that message, 

79 the reference is popped from the dict, 

80 and any tracker events that should be signaled fire. 

81 """ 

82 

83 # refs = None 

84 _context: zmq.Context | None = None 

85 # _lock = None 

86 url = "inproc://pyzmq.gc.01" 

87 

88 refs: dict[int, gcref] 

89 _lock: Lock 

90 _push: zmq.Socket | None 

91 

92 def __init__(self, context: zmq.Context | None = None) -> None: 

93 super().__init__() 

94 self.refs = {} 

95 self.pid: int | None = None 

96 self.thread: GarbageCollectorThread | None = None 

97 self._context = context 

98 self._lock = Lock() 

99 self._stay_down = False 

100 self._push = None 

101 self._push_mutex = None 

102 atexit.register(self._atexit) 

103 

104 @property 

105 def context(self) -> zmq.Context: 

106 if self._context is None: 

107 if Thread.__module__.startswith('gevent'): 

108 # gevent has monkey-patched Thread, use green Context 

109 from zmq import green 

110 

111 self._context = green.Context() 

112 else: 

113 self._context = zmq.Context() 

114 return self._context 

115 

116 @context.setter 

117 def context(self, ctx: zmq.Context | None) -> None: 

118 if self.is_alive(): 

119 if self.refs: 

120 warnings.warn( 

121 "Replacing gc context while gc is running", RuntimeWarning 

122 ) 

123 self.stop() 

124 self._context = ctx 

125 

126 def _atexit(self) -> None: 

127 """atexit callback 

128 

129 sets _stay_down flag so that gc doesn't try to start up again in other atexit handlers 

130 """ 

131 self._stay_down = True 

132 self.stop() 

133 

134 def stop(self) -> None: 

135 """stop the garbage-collection thread""" 

136 if not self.is_alive(): 

137 return 

138 self._stop() 

139 

140 def _clear(self) -> None: 

141 """Clear state 

142 

143 called after stop or when setting up a new subprocess 

144 """ 

145 self._push = None 

146 self._push_mutex = None 

147 self.thread = None 

148 self.refs.clear() 

149 self.context = None 

150 

151 def _stop(self) -> None: 

152 push = self.context.socket(zmq.PUSH) 

153 push.connect(self.url) 

154 push.send(b'DIE') 

155 push.close() 

156 if self._push: 

157 self._push.close() 

158 self.thread.join() # type:ignore[union-attr] 

159 self.context.term() 

160 self._clear() 

161 

162 @property 

163 def _push_socket(self) -> zmq.Socket: 

164 """The PUSH socket for use in the zmq message destructor callback.""" 

165 if getattr(self, "_stay_down", False): 

166 raise RuntimeError("zmq gc socket requested during shutdown") 

167 if not self.is_alive() or self._push is None: 

168 self._push = self.context.socket(zmq.PUSH) 

169 self._push.connect(self.url) 

170 return self._push 

171 

172 def start(self) -> None: 

173 """Start a new garbage collection thread. 

174 

175 Creates a new zmq Context used for garbage collection. 

176 Under most circumstances, this will only be called once per process. 

177 """ 

178 if self.thread is not None and self.pid != getpid(): 

179 # It's re-starting, must free earlier thread's context 

180 # since a fork probably broke it 

181 self._clear() 

182 self.pid = getpid() 

183 self.refs = {} 

184 self.thread = GarbageCollectorThread(self) 

185 self.thread.start() 

186 self.thread.ready.wait() 

187 

188 def is_alive(self) -> bool: 

189 """Is the garbage collection thread currently running? 

190 

191 Includes checks for process shutdown or fork. 

192 """ 

193 if ( 

194 getpid is None 

195 or getpid() != self.pid 

196 or self.thread is None 

197 or not self.thread.is_alive() 

198 ): 

199 return False 

200 return True 

201 

202 def store(self, obj: object, event: Event | None = None) -> int: 

203 """store an object and (optionally) event for zero-copy""" 

204 if not self.is_alive(): 

205 if self._stay_down: 

206 return 0 

207 # safely start the gc thread 

208 # use lock and double check, 

209 # so we don't start multiple threads 

210 with self._lock: 

211 if not self.is_alive(): 

212 self.start() 

213 tup = gcref(obj, event) 

214 theid = id(tup) 

215 self.refs[theid] = tup 

216 return theid 

217 

218 def __del__(self) -> None: 

219 if not self.is_alive(): 

220 return 

221 try: 

222 self.stop() 

223 except Exception as e: 

224 raise (e) 

225 

226 

227gc: Final[GarbageCollector] = GarbageCollector()