Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/summary/writer/event_file_writer.py: 23%
125 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Writes events to disk in a logdir."""
17import collections
18import os.path
19import sys
20import threading
21import time
23from tensorflow.python.client import _pywrap_events_writer
24from tensorflow.python.platform import gfile
25from tensorflow.python.platform import tf_logging as logging
26from tensorflow.python.util import compat
29class EventFileWriter:
30 """Writes `Event` protocol buffers to an event file.
32 The `EventFileWriter` class creates an event file in the specified directory,
33 and asynchronously writes Event protocol buffers to the file. The Event file
34 is encoded using the tfrecord format, which is similar to RecordIO.
36 This class is not thread-safe.
37 """
39 def __init__(self, logdir, max_queue=10, flush_secs=120,
40 filename_suffix=None):
41 """Creates a `EventFileWriter` and an event file to write to.
43 On construction the summary writer creates a new event file in `logdir`.
44 This event file will contain `Event` protocol buffers, which are written to
45 disk via the add_event method.
47 The other arguments to the constructor control the asynchronous writes to
48 the event file:
50 * `flush_secs`: How often, in seconds, to flush the added summaries
51 and events to disk.
52 * `max_queue`: Maximum number of summaries or events pending to be
53 written to disk before one of the 'add' calls block.
55 Args:
56 logdir: A string. Directory where event file will be written.
57 max_queue: Integer. Size of the queue for pending events and summaries.
58 flush_secs: Number. How often, in seconds, to flush the
59 pending events and summaries to disk.
60 filename_suffix: A string. Every event file's name is suffixed with
61 `filename_suffix`.
62 """
63 self._logdir = str(logdir)
64 gfile.MakeDirs(self._logdir)
65 self._max_queue = max_queue
66 self._flush_secs = flush_secs
67 self._flush_complete = threading.Event()
68 self._flush_sentinel = object()
69 self._close_sentinel = object()
70 self._ev_writer = _pywrap_events_writer.EventsWriter(
71 compat.as_bytes(os.path.join(self._logdir, "events")))
72 if filename_suffix:
73 self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix))
74 self._initialize()
75 self._closed = False
77 def _initialize(self):
78 """Initializes or re-initializes the queue and writer thread.
80 The EventsWriter itself does not need to be re-initialized explicitly,
81 because it will auto-initialize itself if used after being closed.
82 """
83 self._event_queue = CloseableQueue(self._max_queue)
84 self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
85 self._flush_secs, self._flush_complete,
86 self._flush_sentinel,
87 self._close_sentinel)
89 self._worker.start()
91 def get_logdir(self):
92 """Returns the directory where event file will be written."""
93 return self._logdir
95 def reopen(self):
96 """Reopens the EventFileWriter.
98 Can be called after `close()` to add more events in the same directory.
99 The events will go into a new events file.
101 Does nothing if the EventFileWriter was not closed.
102 """
103 if self._closed:
104 self._initialize()
105 self._closed = False
107 def add_event(self, event):
108 """Adds an event to the event file.
110 Args:
111 event: An `Event` protocol buffer.
112 """
113 if not self._closed:
114 self._try_put(event)
116 def _try_put(self, item):
117 """Attempts to enqueue an item to the event queue.
119 If the queue is closed, this will close the EventFileWriter and reraise the
120 exception that caused the queue closure, if one exists.
122 Args:
123 item: the item to enqueue
124 """
125 try:
126 self._event_queue.put(item)
127 except QueueClosedError:
128 self._internal_close()
129 if self._worker.failure_exc_info:
130 _, exception, _ = self._worker.failure_exc_info
131 raise exception from None
133 def flush(self):
134 """Flushes the event file to disk.
136 Call this method to make sure that all pending events have been written to
137 disk.
138 """
139 if not self._closed:
140 # Request a flush operation by enqueing a sentinel and then waiting for
141 # the writer thread to mark the flush as complete.
142 self._flush_complete.clear()
143 self._try_put(self._flush_sentinel)
144 self._flush_complete.wait()
145 if self._worker.failure_exc_info:
146 self._internal_close()
147 _, exception, _ = self._worker.failure_exc_info
148 raise exception
150 def close(self):
151 """Flushes the event file to disk and close the file.
153 Call this method when you do not need the summary writer anymore.
154 """
155 if not self._closed:
156 self.flush()
157 self._try_put(self._close_sentinel)
158 self._internal_close()
160 def _internal_close(self):
161 self._closed = True
162 self._worker.join()
163 self._ev_writer.Close()
166class _EventLoggerThread(threading.Thread):
167 """Thread that logs events."""
169 def __init__(self, queue, ev_writer, flush_secs, flush_complete,
170 flush_sentinel, close_sentinel):
171 """Creates an _EventLoggerThread.
173 Args:
174 queue: A CloseableQueue from which to dequeue events. The queue will be
175 closed just before the thread exits, whether due to `close_sentinel` or
176 any exception raised in the writing loop.
177 ev_writer: An event writer. Used to log brain events for
178 the visualizer.
179 flush_secs: How often, in seconds, to flush the
180 pending file to disk.
181 flush_complete: A threading.Event that will be set whenever a flush
182 operation requested via `flush_sentinel` has been completed.
183 flush_sentinel: A sentinel element in queue that tells this thread to
184 flush the writer and mark the current flush operation complete.
185 close_sentinel: A sentinel element in queue that tells this thread to
186 terminate and close the queue.
187 """
188 threading.Thread.__init__(self, name="EventLoggerThread")
189 self.daemon = True
190 self._queue = queue
191 self._ev_writer = ev_writer
192 self._flush_secs = flush_secs
193 # The first event will be flushed immediately.
194 self._next_event_flush_time = 0
195 self._flush_complete = flush_complete
196 self._flush_sentinel = flush_sentinel
197 self._close_sentinel = close_sentinel
198 # Populated when writing logic raises an exception and kills the thread.
199 self.failure_exc_info = ()
201 def run(self):
202 try:
203 while True:
204 event = self._queue.get()
205 if event is self._close_sentinel:
206 return
207 elif event is self._flush_sentinel:
208 self._ev_writer.Flush()
209 self._flush_complete.set()
210 else:
211 self._ev_writer.WriteEvent(event)
212 # Flush the event writer every so often.
213 now = time.time()
214 if now > self._next_event_flush_time:
215 self._ev_writer.Flush()
216 self._next_event_flush_time = now + self._flush_secs
217 except Exception as e:
218 logging.error("EventFileWriter writer thread error: %s", e)
219 self.failure_exc_info = sys.exc_info()
220 raise
221 finally:
222 # When exiting the thread, always complete any pending flush operation
223 # (to unblock flush() calls) and close the queue (to unblock add_event()
224 # calls, including those used by flush() and close()), which ensures that
225 # code using EventFileWriter doesn't deadlock if this thread dies.
226 self._flush_complete.set()
227 self._queue.close()
230class CloseableQueue:
231 """Stripped-down fork of the standard library Queue that is closeable."""
233 def __init__(self, maxsize=0):
234 """Create a queue object with a given maximum size.
236 Args:
237 maxsize: int size of queue. If <= 0, the queue size is infinite.
238 """
239 self._maxsize = maxsize
240 self._queue = collections.deque()
241 self._closed = False
242 # Mutex must be held whenever queue is mutating; shared by conditions.
243 self._mutex = threading.Lock()
244 # Notify not_empty whenever an item is added to the queue; a
245 # thread waiting to get is notified then.
246 self._not_empty = threading.Condition(self._mutex)
247 # Notify not_full whenever an item is removed from the queue;
248 # a thread waiting to put is notified then.
249 self._not_full = threading.Condition(self._mutex)
251 def get(self):
252 """Remove and return an item from the queue.
254 If the queue is empty, blocks until an item is available.
256 Returns:
257 an item from the queue
258 """
259 with self._not_empty:
260 while not self._queue:
261 self._not_empty.wait()
262 item = self._queue.popleft()
263 self._not_full.notify()
264 return item
266 def put(self, item):
267 """Put an item into the queue.
269 If the queue is closed, fails immediately.
271 If the queue is full, blocks until space is available or until the queue
272 is closed by a call to close(), at which point this call fails.
274 Args:
275 item: an item to add to the queue
277 Raises:
278 QueueClosedError: if insertion failed because the queue is closed
279 """
280 with self._not_full:
281 if self._closed:
282 raise QueueClosedError()
283 if self._maxsize > 0:
284 while len(self._queue) == self._maxsize:
285 self._not_full.wait()
286 if self._closed:
287 raise QueueClosedError()
288 self._queue.append(item)
289 self._not_empty.notify()
291 def close(self):
292 """Closes the queue, causing any pending or future `put()` calls to fail."""
293 with self._not_full:
294 self._closed = True
295 self._not_full.notify_all()
298class QueueClosedError(Exception):
299 """Raised when CloseableQueue.put() fails because the queue is closed."""