Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/summary/writer/event_file_writer.py: 23%

125 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Writes events to disk in a logdir.""" 

16 

17import collections 

18import os.path 

19import sys 

20import threading 

21import time 

22 

23from tensorflow.python.client import _pywrap_events_writer 

24from tensorflow.python.platform import gfile 

25from tensorflow.python.platform import tf_logging as logging 

26from tensorflow.python.util import compat 

27 

28 

29class EventFileWriter: 

30 """Writes `Event` protocol buffers to an event file. 

31 

32 The `EventFileWriter` class creates an event file in the specified directory, 

33 and asynchronously writes Event protocol buffers to the file. The Event file 

34 is encoded using the tfrecord format, which is similar to RecordIO. 

35 

36 This class is not thread-safe. 

37 """ 

38 

39 def __init__(self, logdir, max_queue=10, flush_secs=120, 

40 filename_suffix=None): 

41 """Creates a `EventFileWriter` and an event file to write to. 

42 

43 On construction the summary writer creates a new event file in `logdir`. 

44 This event file will contain `Event` protocol buffers, which are written to 

45 disk via the add_event method. 

46 

47 The other arguments to the constructor control the asynchronous writes to 

48 the event file: 

49 

50 * `flush_secs`: How often, in seconds, to flush the added summaries 

51 and events to disk. 

52 * `max_queue`: Maximum number of summaries or events pending to be 

53 written to disk before one of the 'add' calls block. 

54 

55 Args: 

56 logdir: A string. Directory where event file will be written. 

57 max_queue: Integer. Size of the queue for pending events and summaries. 

58 flush_secs: Number. How often, in seconds, to flush the 

59 pending events and summaries to disk. 

60 filename_suffix: A string. Every event file's name is suffixed with 

61 `filename_suffix`. 

62 """ 

63 self._logdir = str(logdir) 

64 gfile.MakeDirs(self._logdir) 

65 self._max_queue = max_queue 

66 self._flush_secs = flush_secs 

67 self._flush_complete = threading.Event() 

68 self._flush_sentinel = object() 

69 self._close_sentinel = object() 

70 self._ev_writer = _pywrap_events_writer.EventsWriter( 

71 compat.as_bytes(os.path.join(self._logdir, "events"))) 

72 if filename_suffix: 

73 self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix)) 

74 self._initialize() 

75 self._closed = False 

76 

77 def _initialize(self): 

78 """Initializes or re-initializes the queue and writer thread. 

79 

80 The EventsWriter itself does not need to be re-initialized explicitly, 

81 because it will auto-initialize itself if used after being closed. 

82 """ 

83 self._event_queue = CloseableQueue(self._max_queue) 

84 self._worker = _EventLoggerThread(self._event_queue, self._ev_writer, 

85 self._flush_secs, self._flush_complete, 

86 self._flush_sentinel, 

87 self._close_sentinel) 

88 

89 self._worker.start() 

90 

91 def get_logdir(self): 

92 """Returns the directory where event file will be written.""" 

93 return self._logdir 

94 

95 def reopen(self): 

96 """Reopens the EventFileWriter. 

97 

98 Can be called after `close()` to add more events in the same directory. 

99 The events will go into a new events file. 

100 

101 Does nothing if the EventFileWriter was not closed. 

102 """ 

103 if self._closed: 

104 self._initialize() 

105 self._closed = False 

106 

107 def add_event(self, event): 

108 """Adds an event to the event file. 

109 

110 Args: 

111 event: An `Event` protocol buffer. 

112 """ 

113 if not self._closed: 

114 self._try_put(event) 

115 

116 def _try_put(self, item): 

117 """Attempts to enqueue an item to the event queue. 

118 

119 If the queue is closed, this will close the EventFileWriter and reraise the 

120 exception that caused the queue closure, if one exists. 

121 

122 Args: 

123 item: the item to enqueue 

124 """ 

125 try: 

126 self._event_queue.put(item) 

127 except QueueClosedError: 

128 self._internal_close() 

129 if self._worker.failure_exc_info: 

130 _, exception, _ = self._worker.failure_exc_info 

131 raise exception from None 

132 

133 def flush(self): 

134 """Flushes the event file to disk. 

135 

136 Call this method to make sure that all pending events have been written to 

137 disk. 

138 """ 

139 if not self._closed: 

140 # Request a flush operation by enqueing a sentinel and then waiting for 

141 # the writer thread to mark the flush as complete. 

142 self._flush_complete.clear() 

143 self._try_put(self._flush_sentinel) 

144 self._flush_complete.wait() 

145 if self._worker.failure_exc_info: 

146 self._internal_close() 

147 _, exception, _ = self._worker.failure_exc_info 

148 raise exception 

149 

150 def close(self): 

151 """Flushes the event file to disk and close the file. 

152 

153 Call this method when you do not need the summary writer anymore. 

154 """ 

155 if not self._closed: 

156 self.flush() 

157 self._try_put(self._close_sentinel) 

158 self._internal_close() 

159 

160 def _internal_close(self): 

161 self._closed = True 

162 self._worker.join() 

163 self._ev_writer.Close() 

164 

165 

166class _EventLoggerThread(threading.Thread): 

167 """Thread that logs events.""" 

168 

169 def __init__(self, queue, ev_writer, flush_secs, flush_complete, 

170 flush_sentinel, close_sentinel): 

171 """Creates an _EventLoggerThread. 

172 

173 Args: 

174 queue: A CloseableQueue from which to dequeue events. The queue will be 

175 closed just before the thread exits, whether due to `close_sentinel` or 

176 any exception raised in the writing loop. 

177 ev_writer: An event writer. Used to log brain events for 

178 the visualizer. 

179 flush_secs: How often, in seconds, to flush the 

180 pending file to disk. 

181 flush_complete: A threading.Event that will be set whenever a flush 

182 operation requested via `flush_sentinel` has been completed. 

183 flush_sentinel: A sentinel element in queue that tells this thread to 

184 flush the writer and mark the current flush operation complete. 

185 close_sentinel: A sentinel element in queue that tells this thread to 

186 terminate and close the queue. 

187 """ 

188 threading.Thread.__init__(self, name="EventLoggerThread") 

189 self.daemon = True 

190 self._queue = queue 

191 self._ev_writer = ev_writer 

192 self._flush_secs = flush_secs 

193 # The first event will be flushed immediately. 

194 self._next_event_flush_time = 0 

195 self._flush_complete = flush_complete 

196 self._flush_sentinel = flush_sentinel 

197 self._close_sentinel = close_sentinel 

198 # Populated when writing logic raises an exception and kills the thread. 

199 self.failure_exc_info = () 

200 

201 def run(self): 

202 try: 

203 while True: 

204 event = self._queue.get() 

205 if event is self._close_sentinel: 

206 return 

207 elif event is self._flush_sentinel: 

208 self._ev_writer.Flush() 

209 self._flush_complete.set() 

210 else: 

211 self._ev_writer.WriteEvent(event) 

212 # Flush the event writer every so often. 

213 now = time.time() 

214 if now > self._next_event_flush_time: 

215 self._ev_writer.Flush() 

216 self._next_event_flush_time = now + self._flush_secs 

217 except Exception as e: 

218 logging.error("EventFileWriter writer thread error: %s", e) 

219 self.failure_exc_info = sys.exc_info() 

220 raise 

221 finally: 

222 # When exiting the thread, always complete any pending flush operation 

223 # (to unblock flush() calls) and close the queue (to unblock add_event() 

224 # calls, including those used by flush() and close()), which ensures that 

225 # code using EventFileWriter doesn't deadlock if this thread dies. 

226 self._flush_complete.set() 

227 self._queue.close() 

228 

229 

230class CloseableQueue: 

231 """Stripped-down fork of the standard library Queue that is closeable.""" 

232 

233 def __init__(self, maxsize=0): 

234 """Create a queue object with a given maximum size. 

235 

236 Args: 

237 maxsize: int size of queue. If <= 0, the queue size is infinite. 

238 """ 

239 self._maxsize = maxsize 

240 self._queue = collections.deque() 

241 self._closed = False 

242 # Mutex must be held whenever queue is mutating; shared by conditions. 

243 self._mutex = threading.Lock() 

244 # Notify not_empty whenever an item is added to the queue; a 

245 # thread waiting to get is notified then. 

246 self._not_empty = threading.Condition(self._mutex) 

247 # Notify not_full whenever an item is removed from the queue; 

248 # a thread waiting to put is notified then. 

249 self._not_full = threading.Condition(self._mutex) 

250 

251 def get(self): 

252 """Remove and return an item from the queue. 

253 

254 If the queue is empty, blocks until an item is available. 

255 

256 Returns: 

257 an item from the queue 

258 """ 

259 with self._not_empty: 

260 while not self._queue: 

261 self._not_empty.wait() 

262 item = self._queue.popleft() 

263 self._not_full.notify() 

264 return item 

265 

266 def put(self, item): 

267 """Put an item into the queue. 

268 

269 If the queue is closed, fails immediately. 

270 

271 If the queue is full, blocks until space is available or until the queue 

272 is closed by a call to close(), at which point this call fails. 

273 

274 Args: 

275 item: an item to add to the queue 

276 

277 Raises: 

278 QueueClosedError: if insertion failed because the queue is closed 

279 """ 

280 with self._not_full: 

281 if self._closed: 

282 raise QueueClosedError() 

283 if self._maxsize > 0: 

284 while len(self._queue) == self._maxsize: 

285 self._not_full.wait() 

286 if self._closed: 

287 raise QueueClosedError() 

288 self._queue.append(item) 

289 self._not_empty.notify() 

290 

291 def close(self): 

292 """Closes the queue, causing any pending or future `put()` calls to fail.""" 

293 with self._not_full: 

294 self._closed = True 

295 self._not_full.notify_all() 

296 

297 

298class QueueClosedError(Exception): 

299 """Raised when CloseableQueue.put() fails because the queue is closed."""