Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/keras/src/utils/sidecar_evaluator.py: 33%
79 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Python module for evaluation loop."""
17import re
19import tensorflow as tf
21# isort: off
22from tensorflow.python.platform import tf_logging as logging
23from tensorflow.python.util import deprecation
24from keras.src.callbacks import ModelCheckpoint
25from keras.src.optimizers import optimizer
26from tensorflow.python.util.tf_export import keras_export
28_PRINT_EVAL_STEP_EVERY_SEC = 60.0
29_ITERATIONS_UNINITIALIZED = -1
30_CHECKPOINT_TIMEOUT_SEC = 30
33def list_checkpoint_attributes(ckpt_dir_or_file):
34 """Lists all the attributes in a checkpoint.
36 Checkpoint keys are paths in a checkpoint graph, and attribute is the first
37 element in the path. e.g. with a checkpoint key
38 "optimizer/iter/.ATTRIBUTES/VARIABLE_VALUE", optimizer is the attribute. The
39 attribute is also used to save/restore a variable in a checkpoint,
40 e.g. tf.train.Checkpoint(optimizer=optimizer, model=model).
42 Args:
43 ckpt_dir_or_file: Directory with checkpoints file or path to checkpoint.
45 Returns:
46 Set of attributes in a checkpoint.
47 """
48 reader = tf.train.load_checkpoint(ckpt_dir_or_file)
49 variable_map = reader.get_variable_to_shape_map()
50 return {name.split("/")[0] for name in variable_map.keys()}
53@keras_export("keras.utils.SidecarEvaluator", v1=[])
54class SidecarEvaluator:
55 """A class designed for a dedicated evaluator task.
57 `SidecarEvaluator` is expected to be run in a process on a separate machine
58 from the training cluster. It is meant for the purpose of a dedicated
59 evaluator, evaluating the metric results of a training cluster which has one
60 or more workers performing the training, and saving checkpoints.
62 The `SidecarEvaluator` API is compatible with both Custom Training Loop
63 (CTL), and Keras `Model.fit` to be used in the training cluster. Using the
64 model (with compiled metrics) provided at `__init__`, `SidecarEvaluator`
65 repeatedly performs evaluation "epochs" when it finds a checkpoint that has
66 not yet been used. Depending on the `steps` argument, an eval epoch is
67 evaluation over all eval data, or up to certain number of steps (batches).
68 See examples below for how the training program should save the checkpoints
69 in order to be recognized by `SidecarEvaluator`.
71 Since under the hood, `SidecarEvaluator` uses `model.evaluate` for
72 evaluation, it also supports arbitrary Keras callbacks. That is, if one or
73 more callbacks are provided, their `on_test_batch_begin` and
74 `on_test_batch_end` methods are called at the start and end of a batch, and
75 their `on_test_begin` and `on_test_end` are called at the start and end of
76 an evaluation epoch. Note that `SidecarEvaluator` may skip some checkpoints
77 because it always picks up the latest checkpoint available, and during an
78 evaluation epoch, multiple checkpoints can be produced from the training
79 side.
81 Example:
82 ```python
83 model = tf.keras.models.Sequential(...)
84 model.compile(metrics=tf.keras.metrics.SparseCategoricalAccuracy(
85 name="eval_metrics"))
86 data = tf.data.Dataset.from_tensor_slices(...)
88 tf.keras.SidecarEvaluator(
89 model=model,
90 data=data,
91 # dir for training-saved checkpoint
92 checkpoint_dir='/tmp/checkpoint_dir',
93 steps=None, # Eval until dataset is exhausted
94 max_evaluations=None, # The evaluation needs to be stopped manually
95 callbacks=[tf.keras.callbacks.TensorBoard(log_dir='/tmp/log_dir')]
96 ).start()
97 ```
99 `SidecarEvaluator.start` writes a series of summary files which can be
100 visualized by tensorboard (which provides a webpage link):
102 ```bash
103 $ tensorboard --logdir=/tmp/log_dir/validation
104 ...
105 TensorBoard 2.4.0a0 at http://host:port (Press CTRL+C to quit)
106 ```
108 If the training cluster uses a CTL, the `checkpoint_dir` should contain
109 checkpoints that track both `model` and `optimizer`, to fulfill
110 `SidecarEvaluator`'s expectation. This can be done by a
111 `tf.train.Checkpoint` and a `tf.train.CheckpointManager`:
113 ```python
114 # Same `checkpoint_dir` supplied to `SidecarEvaluator`.
115 checkpoint_dir = ...
116 checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer)
117 checkpoint_manager = tf.train.CheckpointManager(
118 checkpoint, checkpoint_dir=..., max_to_keep=...)
119 checkpoint_manager.save()
120 ```
122 If the training cluster uses Keras `Model.fit` API, a
123 `tf.keras.callbacks.ModelCheckpoint` should be used, with
124 `save_weights_only=True`, and the `filepath` should have 'ckpt-{epoch}'
125 appended:
127 ```python
128 # Same `checkpoint_dir` supplied to `SidecarEvaluator`.
129 checkpoint_dir = ...
130 model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
131 filepath=os.path.join(checkpoint_dir, 'ckpt-{epoch}'),
132 save_weights_only=True)
133 model.fit(dataset, epochs, callbacks=[model_checkpoint])
134 ```
135 """
137 def __init__(
138 self,
139 model,
140 data,
141 checkpoint_dir,
142 steps=None,
143 max_evaluations=None,
144 callbacks=None,
145 ):
146 """Initializes an `SidecarEvaluator` object.
148 Args:
149 model: Model to use for evaluation. The model object used here should
150 be a `tf.keras.Model`, and should be the same as the one that is
151 used in training, where `tf.keras.Model`s are checkpointed. The
152 model should have one or more metrics compiled before using
153 `SidecarEvaluator`.
154 data: The input data for evaluation. `SidecarEvaluator` supports all
155 data types that Keras `model.evaluate` supports as the input data
156 `x`, such as a `tf.data.Dataset`.
157 checkpoint_dir: Directory where checkpoint files are saved.
158 steps: Number of steps to perform evaluation for, when evaluating a
159 single checkpoint file. If `None`, evaluation continues until the
160 dataset is exhausted. For repeated evaluation dataset, user must
161 specify `steps` to avoid infinite evaluation loop.
162 max_evaluations: Maximum number of the checkpoint file to be
163 evaluated, for `SidecarEvaluator` to know when to stop. The
164 evaluator will stop after it evaluates a checkpoint filepath ending
165 with '<ckpt_name>-<max_evaluations>'. If using
166 `tf.train.CheckpointManager.save` for saving checkpoints, the kth
167 saved checkpoint has the filepath suffix '<ckpt_name>-<k>' (k=1 for
168 the first saved), and if checkpoints are saved every epoch after
169 training, the filepath saved at the kth epoch would end with
170 '<ckpt_name>-<k>. Thus, if training runs for n epochs, and the
171 evaluator should end after the training finishes, use n for this
172 parameter. Note that this is not necessarily equal to the number of
173 total evaluations, since some checkpoints may be skipped if
174 evaluation is slower than checkpoint creation. If `None`,
175 `SidecarEvaluator` will evaluate indefinitely, and the user must
176 terminate evaluator program themselves.
177 callbacks: List of `keras.callbacks.Callback` instances to apply
178 during evaluation. See
179 [callbacks](/api_docs/python/tf/keras/callbacks).
180 """
181 self.model = model
182 self.data = data
183 self.checkpoint_dir = checkpoint_dir
184 self._iterations = tf.Variable(
185 name="iterations",
186 initial_value=_ITERATIONS_UNINITIALIZED,
187 dtype=tf.int64,
188 )
189 self.max_evaluations = max_evaluations
190 self.steps = steps
191 self.callbacks = callbacks or []
193 def _timeout_fn(self):
194 logging.info(
195 "No checkpoints appear to be found after "
196 f"{_CHECKPOINT_TIMEOUT_SEC} seconds. "
197 "Please check if you are properly using a "
198 "`tf.train.Checkpoint/CheckpointManager` or "
199 "`tf.keras.callbacks.ModelCheckpoint(save_weights_only=True)` to "
200 "save checkpoints by the training. See "
201 "`tf.keras.SidecarEvaluator` doc for recommended flows "
202 "of saving checkpoints."
203 )
204 return False
206 def start(self):
207 """Starts the evaluation loop."""
208 if self.model.optimizer and isinstance(
209 self.model.optimizer, optimizer.Optimizer
210 ):
211 checkpoint = tf.train.Checkpoint(
212 model=self.model, optimizer=self.model.optimizer
213 )
214 else:
215 optimizer_checkpoint = tf.train.Checkpoint(iter=self._iterations)
216 checkpoint = tf.train.Checkpoint(
217 model=self.model, optimizer=optimizer_checkpoint
218 )
219 for latest_checkpoint in tf.train.checkpoints_iterator(
220 self.checkpoint_dir,
221 timeout=_CHECKPOINT_TIMEOUT_SEC,
222 timeout_fn=self._timeout_fn,
223 ):
224 try:
225 # `expect_partial` because the checkpoint can have other
226 # `Trackable`s such as `optimizer`.
227 checkpoint.restore(latest_checkpoint).expect_partial()
228 checkpoint_attributes = list_checkpoint_attributes(
229 latest_checkpoint
230 )
231 # The checkpoint should contain model and optimizer for
232 # SidecarEvaluator to work. But the model weights saved by
233 # ModelCheckpoint callback does not contain model as an
234 # attribute. To make SidecarEvaluator compatibly work in this
235 # case, use model.load_weights to load the model's weights,
236 # while self._iterations is still restored by checkpoint
237 # variable.
238 if "model" not in checkpoint_attributes:
239 self.model.load_weights(latest_checkpoint)
240 # The model checkpoint might not include optimizer in cases,
241 # e.g. using a custom training loop. Directly assign the
242 # iterations property to be used in callbacks.
243 if self.model.optimizer and not isinstance(
244 self.model.optimizer,
245 optimizer.Optimizer,
246 ):
247 # experimental optimizer automatically restores the
248 # iteration value.
249 self.model.optimizer.iterations.assign(self._iterations)
250 except (tf.errors.OpError,) as e:
251 if isinstance(e, tf.errors.UnavailableError):
252 # With distribute training, worker preemption can result in
253 # `UnavailableError`. Raise this to be handled outside the
254 # evaluation loop.
255 raise e
257 # A couple errors can happen here with the coordinator racing to
258 # write checkpoint:
259 # 1) OpError: open failed for <file path>: No such file or
260 # directory
261 # 2) NotFoundError (subclass of OpError): Unsuccessful
262 # TensorSliceReader constructor.
263 # TODO(rchao): Remove this except block once b/150954027 is
264 # resolved.
265 logging.info(
266 "SidecarEvaluator encountered an error when loading the "
267 f"checkpoint at {latest_checkpoint}. Retrying. "
268 f"Error: {e.__class__.__name__}: {e}"
269 )
270 continue
271 if (
272 self._iterations.numpy() == _ITERATIONS_UNINITIALIZED
273 and not isinstance(
274 self.model.optimizer,
275 optimizer.Optimizer,
276 )
277 ):
278 raise RuntimeError(
279 "Variable `iterations` cannot be loaded from the "
280 f"checkpoint file at {self.checkpoint_dir}. "
281 "Please ensure `iterations` is "
282 "included in the checkpoint saved during training."
283 )
285 logging.info(
286 "Evaluation starts: Model weights loaded from latest "
287 f"checkpoint file {latest_checkpoint}"
288 )
289 self.model.evaluate(
290 self.data, steps=self.steps, callbacks=self.callbacks, verbose=2
291 )
293 return_metrics = {}
294 for metric in self.model.metrics:
295 result = metric.result()
296 if isinstance(result, dict):
297 return_metrics.update(result)
298 else:
299 return_metrics[metric.name] = result
301 logging.info(
302 "End of evaluation. Metrics: %s",
303 " ".join(
304 [
305 f"{name}={value.numpy()}"
306 for name, value in return_metrics.items()
307 ]
308 ),
309 )
311 if self.max_evaluations and (
312 self.max_evaluations <= int(latest_checkpoint.split("-")[-1])
313 ):
314 # Exit the loop because we have evaluated the final checkpoint
315 # file.
316 logging.info(
317 "Last checkpoint evaluated. SidecarEvaluator stops."
318 )
319 return
322@keras_export("keras.experimental.SidecarEvaluator", v1=[])
323@deprecation.deprecated_endpoints("keras.experimental.SidecarEvaluator")
324class SidecarEvaluatorExperimental(SidecarEvaluator):
325 """Deprecated. Please use `tf.keras.utils.SidecarEvaluator` instead.
327 Caution: `tf.keras.experimental.SidecarEvaluator` endpoint is
328 deprecated and will be removed in a future release. Please use
329 `tf.keras.utils.SidecarEvaluator`.
330 """
332 def __init__(self, *args, **kwargs):
333 logging.warning(
334 "`tf.keras.experimental.SidecarEvaluator` endpoint is "
335 "deprecated and will be removed in a future release. Please use "
336 "`tf.keras.utils.SidecarEvaluator`."
337 )
338 super().__init__(*args, **kwargs)
341@keras_export("keras.callbacks.SidecarEvaluatorModelExport")
342class SidecarEvaluatorModelExport(ModelCheckpoint):
343 """Callback to save the best Keras model.
345 It expands the functionality of the existing ModelCheckpoint callback to
346 enable exporting the best models after evaluation with validation dataset.
348 When using the `SidecarEvaluatorModelExport` callback in conjunction with
349 `keras.utils.SidecarEvaluator`, users should provide the `filepath`, which
350 is the path for this callback to export model or save weights to, and
351 `ckpt_filepath`, which is where the checkpoint is available to extract
352 the epoch number from. The callback will then export the model that the
353 evaluator deems as the best (among the checkpoints saved by the training
354 counterpart) to the specified `filepath`. This callback is intended to be
355 used by SidecarEvaluator only.
357 Example:
359 ```python
360 model.compile(loss=..., optimizer=...,
361 metrics=['accuracy'])
362 sidecar_evaluator = keras.utils.SidecarEvaluator(
363 model=model,
364 data=dataset,
365 checkpoint_dir=checkpoint_dir,
366 max_evaluations=1,
367 callbacks=[
368 SidecarEvaluatorModelExport(
369 export_filepath=os.path.join(checkpoint_dir,
370 'best_model_eval',
371 'best-model-{epoch:04d}'),
372 checkpoint_filepath=os.path.join(checkpoint_dir,
373 'ckpt-{epoch:04d}'),
374 save_freq="eval",
375 save_weights_only=True,
376 monitor="loss",
377 mode="min",
378 verbose=1,
379 ),
380 ],
381 )
382 sidecar_evaluator.start()
383 # Model weights are saved if evaluator deems it's the best seen so far.
385 Args:
386 export_filepath: Path where best models should be saved by this
387 `SidecarEvaluatorModelExport` callback. Epoch formatting options, such
388 as `os.path.join(best_model_dir, 'best-model-{epoch:04d}')`, can be
389 used to allow saved model to preserve epoch information in the file
390 name. SidecarEvaluatorModelExport will use the "training epoch" at
391 which the checkpoint was saved by training to fill the epoch
392 placeholder in the path.
393 checkpoint_filepath: Path where checkpoints were saved by training. This
394 should be the same as what is provided to `filepath` argument of
395 `ModelCheckpoint` on the training side, such as
396 `os.path.join(checkpoint_dir, 'ckpt-{epoch:04d}')`.
397 """
399 def __init__(self, export_filepath, checkpoint_filepath, **kwargs):
400 super().__init__(
401 filepath=export_filepath,
402 save_best_only=True,
403 **kwargs,
404 )
406 self._checkpoint_filepath = checkpoint_filepath
408 def on_test_begin(self, logs=None):
409 """Updates export_index to the latest checkpoint."""
411 most_recent_filepath = (
412 self._get_most_recently_modified_file_matching_pattern(
413 self._checkpoint_filepath
414 )
415 )
416 if most_recent_filepath is not None:
417 self.export_index = (
418 int(
419 re.match(r".*ckpt-(?P<ckpt>\d+)", most_recent_filepath)[
420 "ckpt"
421 ]
422 )
423 - 1
424 )
425 else:
426 self.export_index = 0
428 def on_test_end(self, logs):
429 """Saves best model at the end of an evaluation epoch."""
431 self.epochs_since_last_save += 1
432 self._save_model(epoch=self.export_index, batch=None, logs=logs)