Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/keras/src/utils/sidecar_evaluator.py: 33%

79 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2020 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Python module for evaluation loop.""" 

16 

17import re 

18 

19import tensorflow as tf 

20 

21# isort: off 

22from tensorflow.python.platform import tf_logging as logging 

23from tensorflow.python.util import deprecation 

24from keras.src.callbacks import ModelCheckpoint 

25from keras.src.optimizers import optimizer 

26from tensorflow.python.util.tf_export import keras_export 

27 

28_PRINT_EVAL_STEP_EVERY_SEC = 60.0 

29_ITERATIONS_UNINITIALIZED = -1 

30_CHECKPOINT_TIMEOUT_SEC = 30 

31 

32 

33def list_checkpoint_attributes(ckpt_dir_or_file): 

34 """Lists all the attributes in a checkpoint. 

35 

36 Checkpoint keys are paths in a checkpoint graph, and attribute is the first 

37 element in the path. e.g. with a checkpoint key 

38 "optimizer/iter/.ATTRIBUTES/VARIABLE_VALUE", optimizer is the attribute. The 

39 attribute is also used to save/restore a variable in a checkpoint, 

40 e.g. tf.train.Checkpoint(optimizer=optimizer, model=model). 

41 

42 Args: 

43 ckpt_dir_or_file: Directory with checkpoints file or path to checkpoint. 

44 

45 Returns: 

46 Set of attributes in a checkpoint. 

47 """ 

48 reader = tf.train.load_checkpoint(ckpt_dir_or_file) 

49 variable_map = reader.get_variable_to_shape_map() 

50 return {name.split("/")[0] for name in variable_map.keys()} 

51 

52 

53@keras_export("keras.utils.SidecarEvaluator", v1=[]) 

54class SidecarEvaluator: 

55 """A class designed for a dedicated evaluator task. 

56 

57 `SidecarEvaluator` is expected to be run in a process on a separate machine 

58 from the training cluster. It is meant for the purpose of a dedicated 

59 evaluator, evaluating the metric results of a training cluster which has one 

60 or more workers performing the training, and saving checkpoints. 

61 

62 The `SidecarEvaluator` API is compatible with both Custom Training Loop 

63 (CTL), and Keras `Model.fit` to be used in the training cluster. Using the 

64 model (with compiled metrics) provided at `__init__`, `SidecarEvaluator` 

65 repeatedly performs evaluation "epochs" when it finds a checkpoint that has 

66 not yet been used. Depending on the `steps` argument, an eval epoch is 

67 evaluation over all eval data, or up to certain number of steps (batches). 

68 See examples below for how the training program should save the checkpoints 

69 in order to be recognized by `SidecarEvaluator`. 

70 

71 Since under the hood, `SidecarEvaluator` uses `model.evaluate` for 

72 evaluation, it also supports arbitrary Keras callbacks. That is, if one or 

73 more callbacks are provided, their `on_test_batch_begin` and 

74 `on_test_batch_end` methods are called at the start and end of a batch, and 

75 their `on_test_begin` and `on_test_end` are called at the start and end of 

76 an evaluation epoch. Note that `SidecarEvaluator` may skip some checkpoints 

77 because it always picks up the latest checkpoint available, and during an 

78 evaluation epoch, multiple checkpoints can be produced from the training 

79 side. 

80 

81 Example: 

82 ```python 

83 model = tf.keras.models.Sequential(...) 

84 model.compile(metrics=tf.keras.metrics.SparseCategoricalAccuracy( 

85 name="eval_metrics")) 

86 data = tf.data.Dataset.from_tensor_slices(...) 

87 

88 tf.keras.SidecarEvaluator( 

89 model=model, 

90 data=data, 

91 # dir for training-saved checkpoint 

92 checkpoint_dir='/tmp/checkpoint_dir', 

93 steps=None, # Eval until dataset is exhausted 

94 max_evaluations=None, # The evaluation needs to be stopped manually 

95 callbacks=[tf.keras.callbacks.TensorBoard(log_dir='/tmp/log_dir')] 

96 ).start() 

97 ``` 

98 

99 `SidecarEvaluator.start` writes a series of summary files which can be 

100 visualized by tensorboard (which provides a webpage link): 

101 

102 ```bash 

103 $ tensorboard --logdir=/tmp/log_dir/validation 

104 ... 

105 TensorBoard 2.4.0a0 at http://host:port (Press CTRL+C to quit) 

106 ``` 

107 

108 If the training cluster uses a CTL, the `checkpoint_dir` should contain 

109 checkpoints that track both `model` and `optimizer`, to fulfill 

110 `SidecarEvaluator`'s expectation. This can be done by a 

111 `tf.train.Checkpoint` and a `tf.train.CheckpointManager`: 

112 

113 ```python 

114 # Same `checkpoint_dir` supplied to `SidecarEvaluator`. 

115 checkpoint_dir = ... 

116 checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer) 

117 checkpoint_manager = tf.train.CheckpointManager( 

118 checkpoint, checkpoint_dir=..., max_to_keep=...) 

119 checkpoint_manager.save() 

120 ``` 

121 

122 If the training cluster uses Keras `Model.fit` API, a 

123 `tf.keras.callbacks.ModelCheckpoint` should be used, with 

124 `save_weights_only=True`, and the `filepath` should have 'ckpt-{epoch}' 

125 appended: 

126 

127 ```python 

128 # Same `checkpoint_dir` supplied to `SidecarEvaluator`. 

129 checkpoint_dir = ... 

130 model_checkpoint = tf.keras.callbacks.ModelCheckpoint( 

131 filepath=os.path.join(checkpoint_dir, 'ckpt-{epoch}'), 

132 save_weights_only=True) 

133 model.fit(dataset, epochs, callbacks=[model_checkpoint]) 

134 ``` 

135 """ 

136 

137 def __init__( 

138 self, 

139 model, 

140 data, 

141 checkpoint_dir, 

142 steps=None, 

143 max_evaluations=None, 

144 callbacks=None, 

145 ): 

146 """Initializes an `SidecarEvaluator` object. 

147 

148 Args: 

149 model: Model to use for evaluation. The model object used here should 

150 be a `tf.keras.Model`, and should be the same as the one that is 

151 used in training, where `tf.keras.Model`s are checkpointed. The 

152 model should have one or more metrics compiled before using 

153 `SidecarEvaluator`. 

154 data: The input data for evaluation. `SidecarEvaluator` supports all 

155 data types that Keras `model.evaluate` supports as the input data 

156 `x`, such as a `tf.data.Dataset`. 

157 checkpoint_dir: Directory where checkpoint files are saved. 

158 steps: Number of steps to perform evaluation for, when evaluating a 

159 single checkpoint file. If `None`, evaluation continues until the 

160 dataset is exhausted. For repeated evaluation dataset, user must 

161 specify `steps` to avoid infinite evaluation loop. 

162 max_evaluations: Maximum number of the checkpoint file to be 

163 evaluated, for `SidecarEvaluator` to know when to stop. The 

164 evaluator will stop after it evaluates a checkpoint filepath ending 

165 with '<ckpt_name>-<max_evaluations>'. If using 

166 `tf.train.CheckpointManager.save` for saving checkpoints, the kth 

167 saved checkpoint has the filepath suffix '<ckpt_name>-<k>' (k=1 for 

168 the first saved), and if checkpoints are saved every epoch after 

169 training, the filepath saved at the kth epoch would end with 

170 '<ckpt_name>-<k>. Thus, if training runs for n epochs, and the 

171 evaluator should end after the training finishes, use n for this 

172 parameter. Note that this is not necessarily equal to the number of 

173 total evaluations, since some checkpoints may be skipped if 

174 evaluation is slower than checkpoint creation. If `None`, 

175 `SidecarEvaluator` will evaluate indefinitely, and the user must 

176 terminate evaluator program themselves. 

177 callbacks: List of `keras.callbacks.Callback` instances to apply 

178 during evaluation. See 

179 [callbacks](/api_docs/python/tf/keras/callbacks). 

180 """ 

181 self.model = model 

182 self.data = data 

183 self.checkpoint_dir = checkpoint_dir 

184 self._iterations = tf.Variable( 

185 name="iterations", 

186 initial_value=_ITERATIONS_UNINITIALIZED, 

187 dtype=tf.int64, 

188 ) 

189 self.max_evaluations = max_evaluations 

190 self.steps = steps 

191 self.callbacks = callbacks or [] 

192 

193 def _timeout_fn(self): 

194 logging.info( 

195 "No checkpoints appear to be found after " 

196 f"{_CHECKPOINT_TIMEOUT_SEC} seconds. " 

197 "Please check if you are properly using a " 

198 "`tf.train.Checkpoint/CheckpointManager` or " 

199 "`tf.keras.callbacks.ModelCheckpoint(save_weights_only=True)` to " 

200 "save checkpoints by the training. See " 

201 "`tf.keras.SidecarEvaluator` doc for recommended flows " 

202 "of saving checkpoints." 

203 ) 

204 return False 

205 

206 def start(self): 

207 """Starts the evaluation loop.""" 

208 if self.model.optimizer and isinstance( 

209 self.model.optimizer, optimizer.Optimizer 

210 ): 

211 checkpoint = tf.train.Checkpoint( 

212 model=self.model, optimizer=self.model.optimizer 

213 ) 

214 else: 

215 optimizer_checkpoint = tf.train.Checkpoint(iter=self._iterations) 

216 checkpoint = tf.train.Checkpoint( 

217 model=self.model, optimizer=optimizer_checkpoint 

218 ) 

219 for latest_checkpoint in tf.train.checkpoints_iterator( 

220 self.checkpoint_dir, 

221 timeout=_CHECKPOINT_TIMEOUT_SEC, 

222 timeout_fn=self._timeout_fn, 

223 ): 

224 try: 

225 # `expect_partial` because the checkpoint can have other 

226 # `Trackable`s such as `optimizer`. 

227 checkpoint.restore(latest_checkpoint).expect_partial() 

228 checkpoint_attributes = list_checkpoint_attributes( 

229 latest_checkpoint 

230 ) 

231 # The checkpoint should contain model and optimizer for 

232 # SidecarEvaluator to work. But the model weights saved by 

233 # ModelCheckpoint callback does not contain model as an 

234 # attribute. To make SidecarEvaluator compatibly work in this 

235 # case, use model.load_weights to load the model's weights, 

236 # while self._iterations is still restored by checkpoint 

237 # variable. 

238 if "model" not in checkpoint_attributes: 

239 self.model.load_weights(latest_checkpoint) 

240 # The model checkpoint might not include optimizer in cases, 

241 # e.g. using a custom training loop. Directly assign the 

242 # iterations property to be used in callbacks. 

243 if self.model.optimizer and not isinstance( 

244 self.model.optimizer, 

245 optimizer.Optimizer, 

246 ): 

247 # experimental optimizer automatically restores the 

248 # iteration value. 

249 self.model.optimizer.iterations.assign(self._iterations) 

250 except (tf.errors.OpError,) as e: 

251 if isinstance(e, tf.errors.UnavailableError): 

252 # With distribute training, worker preemption can result in 

253 # `UnavailableError`. Raise this to be handled outside the 

254 # evaluation loop. 

255 raise e 

256 

257 # A couple errors can happen here with the coordinator racing to 

258 # write checkpoint: 

259 # 1) OpError: open failed for <file path>: No such file or 

260 # directory 

261 # 2) NotFoundError (subclass of OpError): Unsuccessful 

262 # TensorSliceReader constructor. 

263 # TODO(rchao): Remove this except block once b/150954027 is 

264 # resolved. 

265 logging.info( 

266 "SidecarEvaluator encountered an error when loading the " 

267 f"checkpoint at {latest_checkpoint}. Retrying. " 

268 f"Error: {e.__class__.__name__}: {e}" 

269 ) 

270 continue 

271 if ( 

272 self._iterations.numpy() == _ITERATIONS_UNINITIALIZED 

273 and not isinstance( 

274 self.model.optimizer, 

275 optimizer.Optimizer, 

276 ) 

277 ): 

278 raise RuntimeError( 

279 "Variable `iterations` cannot be loaded from the " 

280 f"checkpoint file at {self.checkpoint_dir}. " 

281 "Please ensure `iterations` is " 

282 "included in the checkpoint saved during training." 

283 ) 

284 

285 logging.info( 

286 "Evaluation starts: Model weights loaded from latest " 

287 f"checkpoint file {latest_checkpoint}" 

288 ) 

289 self.model.evaluate( 

290 self.data, steps=self.steps, callbacks=self.callbacks, verbose=2 

291 ) 

292 

293 return_metrics = {} 

294 for metric in self.model.metrics: 

295 result = metric.result() 

296 if isinstance(result, dict): 

297 return_metrics.update(result) 

298 else: 

299 return_metrics[metric.name] = result 

300 

301 logging.info( 

302 "End of evaluation. Metrics: %s", 

303 " ".join( 

304 [ 

305 f"{name}={value.numpy()}" 

306 for name, value in return_metrics.items() 

307 ] 

308 ), 

309 ) 

310 

311 if self.max_evaluations and ( 

312 self.max_evaluations <= int(latest_checkpoint.split("-")[-1]) 

313 ): 

314 # Exit the loop because we have evaluated the final checkpoint 

315 # file. 

316 logging.info( 

317 "Last checkpoint evaluated. SidecarEvaluator stops." 

318 ) 

319 return 

320 

321 

322@keras_export("keras.experimental.SidecarEvaluator", v1=[]) 

323@deprecation.deprecated_endpoints("keras.experimental.SidecarEvaluator") 

324class SidecarEvaluatorExperimental(SidecarEvaluator): 

325 """Deprecated. Please use `tf.keras.utils.SidecarEvaluator` instead. 

326 

327 Caution: `tf.keras.experimental.SidecarEvaluator` endpoint is 

328 deprecated and will be removed in a future release. Please use 

329 `tf.keras.utils.SidecarEvaluator`. 

330 """ 

331 

332 def __init__(self, *args, **kwargs): 

333 logging.warning( 

334 "`tf.keras.experimental.SidecarEvaluator` endpoint is " 

335 "deprecated and will be removed in a future release. Please use " 

336 "`tf.keras.utils.SidecarEvaluator`." 

337 ) 

338 super().__init__(*args, **kwargs) 

339 

340 

341@keras_export("keras.callbacks.SidecarEvaluatorModelExport") 

342class SidecarEvaluatorModelExport(ModelCheckpoint): 

343 """Callback to save the best Keras model. 

344 

345 It expands the functionality of the existing ModelCheckpoint callback to 

346 enable exporting the best models after evaluation with validation dataset. 

347 

348 When using the `SidecarEvaluatorModelExport` callback in conjunction with 

349 `keras.utils.SidecarEvaluator`, users should provide the `filepath`, which 

350 is the path for this callback to export model or save weights to, and 

351 `ckpt_filepath`, which is where the checkpoint is available to extract 

352 the epoch number from. The callback will then export the model that the 

353 evaluator deems as the best (among the checkpoints saved by the training 

354 counterpart) to the specified `filepath`. This callback is intended to be 

355 used by SidecarEvaluator only. 

356 

357 Example: 

358 

359 ```python 

360 model.compile(loss=..., optimizer=..., 

361 metrics=['accuracy']) 

362 sidecar_evaluator = keras.utils.SidecarEvaluator( 

363 model=model, 

364 data=dataset, 

365 checkpoint_dir=checkpoint_dir, 

366 max_evaluations=1, 

367 callbacks=[ 

368 SidecarEvaluatorModelExport( 

369 export_filepath=os.path.join(checkpoint_dir, 

370 'best_model_eval', 

371 'best-model-{epoch:04d}'), 

372 checkpoint_filepath=os.path.join(checkpoint_dir, 

373 'ckpt-{epoch:04d}'), 

374 save_freq="eval", 

375 save_weights_only=True, 

376 monitor="loss", 

377 mode="min", 

378 verbose=1, 

379 ), 

380 ], 

381 ) 

382 sidecar_evaluator.start() 

383 # Model weights are saved if evaluator deems it's the best seen so far. 

384 

385 Args: 

386 export_filepath: Path where best models should be saved by this 

387 `SidecarEvaluatorModelExport` callback. Epoch formatting options, such 

388 as `os.path.join(best_model_dir, 'best-model-{epoch:04d}')`, can be 

389 used to allow saved model to preserve epoch information in the file 

390 name. SidecarEvaluatorModelExport will use the "training epoch" at 

391 which the checkpoint was saved by training to fill the epoch 

392 placeholder in the path. 

393 checkpoint_filepath: Path where checkpoints were saved by training. This 

394 should be the same as what is provided to `filepath` argument of 

395 `ModelCheckpoint` on the training side, such as 

396 `os.path.join(checkpoint_dir, 'ckpt-{epoch:04d}')`. 

397 """ 

398 

399 def __init__(self, export_filepath, checkpoint_filepath, **kwargs): 

400 super().__init__( 

401 filepath=export_filepath, 

402 save_best_only=True, 

403 **kwargs, 

404 ) 

405 

406 self._checkpoint_filepath = checkpoint_filepath 

407 

408 def on_test_begin(self, logs=None): 

409 """Updates export_index to the latest checkpoint.""" 

410 

411 most_recent_filepath = ( 

412 self._get_most_recently_modified_file_matching_pattern( 

413 self._checkpoint_filepath 

414 ) 

415 ) 

416 if most_recent_filepath is not None: 

417 self.export_index = ( 

418 int( 

419 re.match(r".*ckpt-(?P<ckpt>\d+)", most_recent_filepath)[ 

420 "ckpt" 

421 ] 

422 ) 

423 - 1 

424 ) 

425 else: 

426 self.export_index = 0 

427 

428 def on_test_end(self, logs): 

429 """Saves best model at the end of an evaluation epoch.""" 

430 

431 self.epochs_since_last_save += 1 

432 self._save_model(epoch=self.export_index, batch=None, logs=logs) 

433