Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorboard/summary/writer/event_file_writer.py: 26%
121 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Writes events to disk in a logdir."""
18import os
19import queue
20import socket
21import threading
22import time
24from tensorboard.compat import tf
25from tensorboard.compat.proto import event_pb2
26from tensorboard.summary.writer.record_writer import RecordWriter
29class AtomicCounter:
30 def __init__(self, initial_value):
31 self._value = initial_value
32 self._lock = threading.Lock()
34 def get(self):
35 with self._lock:
36 try:
37 return self._value
38 finally:
39 self._value += 1
42_global_uid = AtomicCounter(0)
45class EventFileWriter:
46 """Writes `Event` protocol buffers to an event file.
48 The `EventFileWriter` class creates an event file in the specified
49 directory, and asynchronously writes Event protocol buffers to the
50 file. The Event file is encoded using the tfrecord format, which is
51 similar to RecordIO.
52 """
54 def __init__(
55 self, logdir, max_queue_size=10, flush_secs=120, filename_suffix=""
56 ):
57 """Creates a `EventFileWriter` and an event file to write to.
59 On construction the summary writer creates a new event file in `logdir`.
60 This event file will contain `Event` protocol buffers, which are written to
61 disk via the add_event method.
62 The other arguments to the constructor control the asynchronous writes to
63 the event file:
65 Args:
66 logdir: A string. Directory where event file will be written.
67 max_queue_size: Integer. Size of the queue for pending events and summaries.
68 flush_secs: Number. How often, in seconds, to flush the
69 pending events and summaries to disk.
70 """
71 self._logdir = logdir
72 tf.io.gfile.makedirs(logdir)
73 self._file_name = (
74 os.path.join(
75 logdir,
76 "events.out.tfevents.%010d.%s.%s.%s"
77 % (
78 time.time(),
79 socket.gethostname(),
80 os.getpid(),
81 _global_uid.get(),
82 ),
83 )
84 + filename_suffix
85 ) # noqa E128
86 self._general_file_writer = tf.io.gfile.GFile(self._file_name, "wb")
87 self._async_writer = _AsyncWriter(
88 RecordWriter(self._general_file_writer), max_queue_size, flush_secs
89 )
91 # Initialize an event instance.
92 _event = event_pb2.Event(
93 wall_time=time.time(),
94 file_version="brain.Event:2",
95 source_metadata=event_pb2.SourceMetadata(
96 writer="tensorboard.summary.writer.event_file_writer"
97 ),
98 )
99 self.add_event(_event)
100 self.flush()
102 def get_logdir(self):
103 """Returns the directory where event file will be written."""
104 return self._logdir
106 def add_event(self, event):
107 """Adds an event to the event file.
109 Args:
110 event: An `Event` protocol buffer.
111 """
112 if not isinstance(event, event_pb2.Event):
113 raise TypeError(
114 "Expected an event_pb2.Event proto, "
115 " but got %s" % type(event)
116 )
117 self._async_writer.write(event.SerializeToString())
119 def flush(self):
120 """Flushes the event file to disk.
122 Call this method to make sure that all pending events have been
123 written to disk.
124 """
125 self._async_writer.flush()
127 def close(self):
128 """Performs a final flush of the event file to disk, stops the
129 write/flush worker and closes the file.
131 Call this method when you do not need the summary writer
132 anymore.
133 """
134 self._async_writer.close()
137class _AsyncWriter:
138 """Writes bytes to a file."""
140 def __init__(self, record_writer, max_queue_size=20, flush_secs=120):
141 """Writes bytes to a file asynchronously. An instance of this class
142 holds a queue to keep the incoming data temporarily. Data passed to the
143 `write` function will be put to the queue and the function returns
144 immediately. This class also maintains a thread to write data in the
145 queue to disk. The first initialization parameter is an instance of
146 `tensorboard.summary.record_writer` which computes the CRC checksum and
147 then write the combined result to the disk. So we use an async approach
148 to improve performance.
150 Args:
151 record_writer: A RecordWriter instance
152 max_queue_size: Integer. Size of the queue for pending bytestrings.
153 flush_secs: Number. How often, in seconds, to flush the
154 pending bytestrings to disk.
155 """
156 self._writer = record_writer
157 self._closed = False
158 self._byte_queue = queue.Queue(max_queue_size)
159 self._worker = _AsyncWriterThread(
160 self._byte_queue, self._writer, flush_secs
161 )
162 self._lock = threading.Lock()
163 self._worker.start()
165 def write(self, bytestring):
166 """Enqueue the given bytes to be written asychronously."""
167 with self._lock:
168 # Status of the worker should be checked under the lock to avoid
169 # multiple threads passing the check and then switching just before
170 # blocking on putting to the queue which might result in a deadlock.
171 self._check_worker_status()
172 if self._closed:
173 raise IOError("Writer is closed")
174 self._byte_queue.put(bytestring)
175 # Check the status again in case the background worker thread has
176 # failed in the meantime to avoid waiting until the next call to
177 # surface the error.
178 self._check_worker_status()
180 def flush(self):
181 """Write all the enqueued bytestring before this flush call to disk.
183 Block until all the above bytestring are written.
184 """
185 with self._lock:
186 self._check_worker_status()
187 if self._closed:
188 raise IOError("Writer is closed")
189 self._byte_queue.join()
190 self._writer.flush()
191 # Check the status again in case the background worker thread has
192 # failed in the meantime to avoid waiting until the next call to
193 # surface the error.
194 self._check_worker_status()
196 def close(self):
197 """Closes the underlying writer, flushing any pending writes first."""
198 if not self._closed:
199 with self._lock:
200 if not self._closed:
201 self._closed = True
202 self._worker.stop()
203 self._writer.flush()
204 self._writer.close()
206 def _check_worker_status(self):
207 """Makes sure the worker thread is still running and raises exception
208 thrown in the worker thread otherwise.
209 """
210 exception = self._worker.exception
211 if exception is not None:
212 raise exception
215class _AsyncWriterThread(threading.Thread):
216 """Thread that processes asynchronous writes for _AsyncWriter."""
218 def __init__(self, queue, record_writer, flush_secs):
219 """Creates an _AsyncWriterThread.
221 Args:
222 queue: A Queue from which to dequeue data.
223 record_writer: An instance of record_writer writer.
224 flush_secs: How often, in seconds, to flush the
225 pending file to disk.
226 """
227 threading.Thread.__init__(self)
228 self.daemon = True
229 self.exception = None
230 self._queue = queue
231 self._record_writer = record_writer
232 self._flush_secs = flush_secs
233 # The first data will be flushed immediately.
234 self._next_flush_time = 0
235 self._has_pending_data = False
236 self._shutdown_signal = object()
238 def stop(self):
239 self._queue.put(self._shutdown_signal)
240 self.join()
242 def run(self):
243 try:
244 self._run()
245 except Exception as ex:
246 self.exception = ex
247 try:
248 # In case there's a thread blocked on putting an item into the
249 # queue or a thread blocked on flushing, pop all items from the
250 # queue to let the foreground thread proceed.
251 while True:
252 self._queue.get(False)
253 self._queue.task_done()
254 except queue.Empty:
255 pass
256 raise
258 def _run(self):
259 # Here wait on the queue until an data appears, or till the next
260 # time to flush the writer, whichever is earlier. If we have an
261 # data, write it. If not, an empty queue exception will be raised
262 # and we can proceed to flush the writer.
263 while True:
264 now = time.time()
265 queue_wait_duration = self._next_flush_time - now
266 data = None
267 try:
268 if queue_wait_duration > 0:
269 data = self._queue.get(True, queue_wait_duration)
270 else:
271 data = self._queue.get(False)
273 if data is self._shutdown_signal:
274 return
275 self._record_writer.write(data)
276 self._has_pending_data = True
277 except queue.Empty:
278 pass
279 finally:
280 if data:
281 self._queue.task_done()
283 now = time.time()
284 if now > self._next_flush_time:
285 if self._has_pending_data:
286 # Small optimization - if there are no pending data,
287 # there's no need to flush, since each flush can be
288 # expensive (e.g. uploading a new file to a server).
289 self._record_writer.flush()
290 self._has_pending_data = False
291 # Do it again in flush_secs.
292 self._next_flush_time = now + self._flush_secs