Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorboard/summary/writer/event_file_writer.py: 26%

121 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Writes events to disk in a logdir.""" 

16 

17 

18import os 

19import queue 

20import socket 

21import threading 

22import time 

23 

24from tensorboard.compat import tf 

25from tensorboard.compat.proto import event_pb2 

26from tensorboard.summary.writer.record_writer import RecordWriter 

27 

28 

29class AtomicCounter: 

30 def __init__(self, initial_value): 

31 self._value = initial_value 

32 self._lock = threading.Lock() 

33 

34 def get(self): 

35 with self._lock: 

36 try: 

37 return self._value 

38 finally: 

39 self._value += 1 

40 

41 

42_global_uid = AtomicCounter(0) 

43 

44 

45class EventFileWriter: 

46 """Writes `Event` protocol buffers to an event file. 

47 

48 The `EventFileWriter` class creates an event file in the specified 

49 directory, and asynchronously writes Event protocol buffers to the 

50 file. The Event file is encoded using the tfrecord format, which is 

51 similar to RecordIO. 

52 """ 

53 

54 def __init__( 

55 self, logdir, max_queue_size=10, flush_secs=120, filename_suffix="" 

56 ): 

57 """Creates a `EventFileWriter` and an event file to write to. 

58 

59 On construction the summary writer creates a new event file in `logdir`. 

60 This event file will contain `Event` protocol buffers, which are written to 

61 disk via the add_event method. 

62 The other arguments to the constructor control the asynchronous writes to 

63 the event file: 

64 

65 Args: 

66 logdir: A string. Directory where event file will be written. 

67 max_queue_size: Integer. Size of the queue for pending events and summaries. 

68 flush_secs: Number. How often, in seconds, to flush the 

69 pending events and summaries to disk. 

70 """ 

71 self._logdir = logdir 

72 tf.io.gfile.makedirs(logdir) 

73 self._file_name = ( 

74 os.path.join( 

75 logdir, 

76 "events.out.tfevents.%010d.%s.%s.%s" 

77 % ( 

78 time.time(), 

79 socket.gethostname(), 

80 os.getpid(), 

81 _global_uid.get(), 

82 ), 

83 ) 

84 + filename_suffix 

85 ) # noqa E128 

86 self._general_file_writer = tf.io.gfile.GFile(self._file_name, "wb") 

87 self._async_writer = _AsyncWriter( 

88 RecordWriter(self._general_file_writer), max_queue_size, flush_secs 

89 ) 

90 

91 # Initialize an event instance. 

92 _event = event_pb2.Event( 

93 wall_time=time.time(), 

94 file_version="brain.Event:2", 

95 source_metadata=event_pb2.SourceMetadata( 

96 writer="tensorboard.summary.writer.event_file_writer" 

97 ), 

98 ) 

99 self.add_event(_event) 

100 self.flush() 

101 

102 def get_logdir(self): 

103 """Returns the directory where event file will be written.""" 

104 return self._logdir 

105 

106 def add_event(self, event): 

107 """Adds an event to the event file. 

108 

109 Args: 

110 event: An `Event` protocol buffer. 

111 """ 

112 if not isinstance(event, event_pb2.Event): 

113 raise TypeError( 

114 "Expected an event_pb2.Event proto, " 

115 " but got %s" % type(event) 

116 ) 

117 self._async_writer.write(event.SerializeToString()) 

118 

119 def flush(self): 

120 """Flushes the event file to disk. 

121 

122 Call this method to make sure that all pending events have been 

123 written to disk. 

124 """ 

125 self._async_writer.flush() 

126 

127 def close(self): 

128 """Performs a final flush of the event file to disk, stops the 

129 write/flush worker and closes the file. 

130 

131 Call this method when you do not need the summary writer 

132 anymore. 

133 """ 

134 self._async_writer.close() 

135 

136 

137class _AsyncWriter: 

138 """Writes bytes to a file.""" 

139 

140 def __init__(self, record_writer, max_queue_size=20, flush_secs=120): 

141 """Writes bytes to a file asynchronously. An instance of this class 

142 holds a queue to keep the incoming data temporarily. Data passed to the 

143 `write` function will be put to the queue and the function returns 

144 immediately. This class also maintains a thread to write data in the 

145 queue to disk. The first initialization parameter is an instance of 

146 `tensorboard.summary.record_writer` which computes the CRC checksum and 

147 then write the combined result to the disk. So we use an async approach 

148 to improve performance. 

149 

150 Args: 

151 record_writer: A RecordWriter instance 

152 max_queue_size: Integer. Size of the queue for pending bytestrings. 

153 flush_secs: Number. How often, in seconds, to flush the 

154 pending bytestrings to disk. 

155 """ 

156 self._writer = record_writer 

157 self._closed = False 

158 self._byte_queue = queue.Queue(max_queue_size) 

159 self._worker = _AsyncWriterThread( 

160 self._byte_queue, self._writer, flush_secs 

161 ) 

162 self._lock = threading.Lock() 

163 self._worker.start() 

164 

165 def write(self, bytestring): 

166 """Enqueue the given bytes to be written asychronously.""" 

167 with self._lock: 

168 # Status of the worker should be checked under the lock to avoid 

169 # multiple threads passing the check and then switching just before 

170 # blocking on putting to the queue which might result in a deadlock. 

171 self._check_worker_status() 

172 if self._closed: 

173 raise IOError("Writer is closed") 

174 self._byte_queue.put(bytestring) 

175 # Check the status again in case the background worker thread has 

176 # failed in the meantime to avoid waiting until the next call to 

177 # surface the error. 

178 self._check_worker_status() 

179 

180 def flush(self): 

181 """Write all the enqueued bytestring before this flush call to disk. 

182 

183 Block until all the above bytestring are written. 

184 """ 

185 with self._lock: 

186 self._check_worker_status() 

187 if self._closed: 

188 raise IOError("Writer is closed") 

189 self._byte_queue.join() 

190 self._writer.flush() 

191 # Check the status again in case the background worker thread has 

192 # failed in the meantime to avoid waiting until the next call to 

193 # surface the error. 

194 self._check_worker_status() 

195 

196 def close(self): 

197 """Closes the underlying writer, flushing any pending writes first.""" 

198 if not self._closed: 

199 with self._lock: 

200 if not self._closed: 

201 self._closed = True 

202 self._worker.stop() 

203 self._writer.flush() 

204 self._writer.close() 

205 

206 def _check_worker_status(self): 

207 """Makes sure the worker thread is still running and raises exception 

208 thrown in the worker thread otherwise. 

209 """ 

210 exception = self._worker.exception 

211 if exception is not None: 

212 raise exception 

213 

214 

215class _AsyncWriterThread(threading.Thread): 

216 """Thread that processes asynchronous writes for _AsyncWriter.""" 

217 

218 def __init__(self, queue, record_writer, flush_secs): 

219 """Creates an _AsyncWriterThread. 

220 

221 Args: 

222 queue: A Queue from which to dequeue data. 

223 record_writer: An instance of record_writer writer. 

224 flush_secs: How often, in seconds, to flush the 

225 pending file to disk. 

226 """ 

227 threading.Thread.__init__(self) 

228 self.daemon = True 

229 self.exception = None 

230 self._queue = queue 

231 self._record_writer = record_writer 

232 self._flush_secs = flush_secs 

233 # The first data will be flushed immediately. 

234 self._next_flush_time = 0 

235 self._has_pending_data = False 

236 self._shutdown_signal = object() 

237 

238 def stop(self): 

239 self._queue.put(self._shutdown_signal) 

240 self.join() 

241 

242 def run(self): 

243 try: 

244 self._run() 

245 except Exception as ex: 

246 self.exception = ex 

247 try: 

248 # In case there's a thread blocked on putting an item into the 

249 # queue or a thread blocked on flushing, pop all items from the 

250 # queue to let the foreground thread proceed. 

251 while True: 

252 self._queue.get(False) 

253 self._queue.task_done() 

254 except queue.Empty: 

255 pass 

256 raise 

257 

258 def _run(self): 

259 # Here wait on the queue until an data appears, or till the next 

260 # time to flush the writer, whichever is earlier. If we have an 

261 # data, write it. If not, an empty queue exception will be raised 

262 # and we can proceed to flush the writer. 

263 while True: 

264 now = time.time() 

265 queue_wait_duration = self._next_flush_time - now 

266 data = None 

267 try: 

268 if queue_wait_duration > 0: 

269 data = self._queue.get(True, queue_wait_duration) 

270 else: 

271 data = self._queue.get(False) 

272 

273 if data is self._shutdown_signal: 

274 return 

275 self._record_writer.write(data) 

276 self._has_pending_data = True 

277 except queue.Empty: 

278 pass 

279 finally: 

280 if data: 

281 self._queue.task_done() 

282 

283 now = time.time() 

284 if now > self._next_flush_time: 

285 if self._has_pending_data: 

286 # Small optimization - if there are no pending data, 

287 # there's no need to flush, since each flush can be 

288 # expensive (e.g. uploading a new file to a server). 

289 self._record_writer.flush() 

290 self._has_pending_data = False 

291 # Do it again in flush_secs. 

292 self._next_flush_time = now + self._flush_secs