Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/types/distribute.py: 76%

38 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2020 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Types specific to tf.distribute.""" 

16 

17from tensorflow.python.util.tf_export import tf_export 

18from tensorflow.tools.docs import doc_controls 

19 

20# TODO(mdan, anjalisridhar): Decide the location of this file. 

21 

22 

23class Iterable(object): 

24 """Interface for distributed objects that admit iteration/reduction.""" 

25 

26 def __iter__(self): 

27 pass 

28 

29 # TODO(mdan): Describe this contract. 

30 def reduce(self, initial_state, reduce_func): 

31 """Reduces this iterable object to a single element. 

32 

33 The transformation calls `reduce_func` successively on each element. 

34 The `initial_state` argument is used for the initial state and the final 

35 state is returned as the result. 

36 

37 Args: 

38 initial_state: An element representing the initial state of the 

39 reduction. 

40 reduce_func: A function that maps `(old_state, input_element)` to 

41 `new_state`. The structure of `new_state` must match the structure of 

42 `old_state`. For the first element, `old_state` is `initial_state`. 

43 

44 Returns: 

45 The final state of the transformation. 

46 """ 

47 

48 

49class Iterator(object): 

50 """Interface for distributed iterators.""" 

51 

52 def get_next(self): 

53 """Unlike __next__, this may use a non-raising mechanism.""" 

54 

55 def __next__(self): 

56 pass 

57 

58 def __iter__(self): 

59 pass 

60 

61 

62@tf_export("distribute.DistributedValues", v1=[]) 

63class DistributedValues(object): 

64 """Base class for representing distributed values. 

65 

66 A subclass instance of `tf.distribute.DistributedValues` is created when 

67 creating variables within a distribution strategy, iterating a 

68 `tf.distribute.DistributedDataset` or through `tf.distribute.Strategy.run`. 

69 This base class should never be instantiated directly. 

70 `tf.distribute.DistributedValues` contains a value per replica. Depending on 

71 the subclass, the values could either be synced on update, synced on demand, 

72 or never synced. 

73 

74 Two representative types of `tf.distribute.DistributedValues` are 

75 `tf.types.experimental.PerReplica` and `tf.types.experimental.Mirrored` 

76 values. 

77 

78 `PerReplica` values exist on the worker devices, with a different value for 

79 each replica. They are produced by iterating through a distributed dataset 

80 returned by `tf.distribute.Strategy.experimental_distribute_dataset` (Example 

81 1, below) and `tf.distribute.Strategy.distribute_datasets_from_function`. They 

82 are also the typical result returned by `tf.distribute.Strategy.run` (Example 

83 2). 

84 

85 `Mirrored` values are like `PerReplica` values, except we know that the value 

86 on all replicas are the same. `Mirrored` values are kept synchronized by the 

87 distribution strategy in use, while `PerReplica` values are left 

88 unsynchronized. `Mirrored` values typically represent model weights. We can 

89 safely read a `Mirrored` value in a cross-replica context by using the value 

90 on any replica, while PerReplica values should not be read or manipulated in 

91 a cross-replica context." 

92 

93 `tf.distribute.DistributedValues` can be reduced via `strategy.reduce` to 

94 obtain a single value across replicas (Example 4), used as input into 

95 `tf.distribute.Strategy.run` (Example 3), or collected to inspect the 

96 per-replica values using `tf.distribute.Strategy.experimental_local_results` 

97 (Example 5). 

98 

99 Example usages: 

100 

101 1. Created from a `tf.distribute.DistributedDataset`: 

102 

103 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

104 >>> dataset = tf.data.Dataset.from_tensor_slices([5., 6., 7., 8.]).batch(2) 

105 >>> dataset_iterator = iter(strategy.experimental_distribute_dataset(dataset)) 

106 >>> distributed_values = next(dataset_iterator) 

107 >>> distributed_values 

108 PerReplica:{ 

109 0: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([5.], dtype=float32)>, 

110 1: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([6.], dtype=float32)> 

111 } 

112 

113 2. Returned by `run`: 

114 

115 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

116 >>> @tf.function 

117 ... def run(): 

118 ... ctx = tf.distribute.get_replica_context() 

119 ... return ctx.replica_id_in_sync_group 

120 >>> distributed_values = strategy.run(run) 

121 >>> distributed_values 

122 PerReplica:{ 

123 0: <tf.Tensor: shape=(), dtype=int32, numpy=0>, 

124 1: <tf.Tensor: shape=(), dtype=int32, numpy=1> 

125 } 

126 

127 3. As input into `run`: 

128 

129 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

130 >>> dataset = tf.data.Dataset.from_tensor_slices([5., 6., 7., 8.]).batch(2) 

131 >>> dataset_iterator = iter(strategy.experimental_distribute_dataset(dataset)) 

132 >>> distributed_values = next(dataset_iterator) 

133 >>> @tf.function 

134 ... def run(input): 

135 ... return input + 1.0 

136 >>> updated_value = strategy.run(run, args=(distributed_values,)) 

137 >>> updated_value 

138 PerReplica:{ 

139 0: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([6.], dtype=float32)>, 

140 1: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([7.], dtype=float32)> 

141 } 

142 

143 4. As input into `reduce`: 

144 

145 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

146 >>> dataset = tf.data.Dataset.from_tensor_slices([5., 6., 7., 8.]).batch(2) 

147 >>> dataset_iterator = iter(strategy.experimental_distribute_dataset(dataset)) 

148 >>> distributed_values = next(dataset_iterator) 

149 >>> reduced_value = strategy.reduce(tf.distribute.ReduceOp.SUM, 

150 ... distributed_values, 

151 ... axis = 0) 

152 >>> reduced_value 

153 <tf.Tensor: shape=(), dtype=float32, numpy=11.0> 

154 

155 5. How to inspect per-replica values locally: 

156 

157 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

158 >>> dataset = tf.data.Dataset.from_tensor_slices([5., 6., 7., 8.]).batch(2) 

159 >>> dataset_iterator = iter(strategy.experimental_distribute_dataset(dataset)) 

160 >>> per_replica_values = strategy.experimental_local_results( 

161 ... distributed_values) 

162 >>> per_replica_values 

163 (<tf.Tensor: shape=(1,), dtype=float32, numpy=array([5.], dtype=float32)>, 

164 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([6.], dtype=float32)>) 

165 

166 """ 

167 

168 

169@tf_export("types.experimental.distributed.PerReplica", v1=[]) 

170class PerReplica(DistributedValues): 

171 """Holds a distributed value: a map from replica id to unsynchronized values. 

172 

173 `PerReplica` values exist on the worker devices, with a different value for 

174 each replica. They can be produced many ways, often by iterating through a 

175 distributed dataset returned by 

176 `tf.distribute.Strategy.experimental_distribute_dataset` and 

177 `tf.distribute.Strategy.distribute_datasets_from_function`. They are also the 

178 typical result returned by `tf.distribute.Strategy.run`. 

179 """ 

180 

181 

182@tf_export("types.experimental.distributed.Mirrored", v1=[]) 

183class Mirrored(DistributedValues): 

184 """Holds a distributed value: a map from replica id to synchronized values. 

185 

186 `Mirrored` values are `tf.distribute.DistributedValues` for which we know that 

187 the value on all replicas is the same. `Mirrored` values are kept synchronized 

188 by the distribution strategy in use, while `tf.types.experimental.PerReplica` 

189 values are left unsynchronized. `Mirrored` values typically represent model 

190 weights. We can safely read a `Mirrored` value in a cross-replica context by 

191 using the value on any replica, while `PerReplica` values should not be read 

192 or manipulated directly by the user in a cross-replica context. 

193 """ 

194 

195 

196@tf_export("distribute.DistributedIterator", v1=[]) 

197class DistributedIteratorInterface(Iterator): 

198 """An iterator over `tf.distribute.DistributedDataset`. 

199 

200 `tf.distribute.DistributedIterator` is the primary mechanism for enumerating 

201 elements of a `tf.distribute.DistributedDataset`. It supports the Python 

202 Iterator protocol, which means it can be iterated over using a for-loop or by 

203 fetching individual elements explicitly via `get_next()`. 

204 

205 You can create a `tf.distribute.DistributedIterator` by calling `iter` on 

206 a `tf.distribute.DistributedDataset` or creating a python loop over a 

207 `tf.distribute.DistributedDataset`. 

208 

209 Visit the [tutorial](https://www.tensorflow.org/tutorials/distribute/input) 

210 on distributed input for more examples and caveats. 

211 """ 

212 

213 def get_next(self): 

214 """Returns the next input from the iterator for all replicas. 

215 

216 Example use: 

217 

218 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

219 >>> dataset = tf.data.Dataset.range(100).batch(2) 

220 >>> dist_dataset = strategy.experimental_distribute_dataset(dataset) 

221 >>> dist_dataset_iterator = iter(dist_dataset) 

222 >>> @tf.function 

223 ... def one_step(input): 

224 ... return input 

225 >>> step_num = 5 

226 >>> for _ in range(step_num): 

227 ... strategy.run(one_step, args=(dist_dataset_iterator.get_next(),)) 

228 >>> strategy.experimental_local_results(dist_dataset_iterator.get_next()) 

229 (<tf.Tensor: shape=(1,), dtype=int64, numpy=array([10])>, 

230 <tf.Tensor: shape=(1,), dtype=int64, numpy=array([11])>) 

231 

232 Returns: 

233 A single `tf.Tensor` or a `tf.distribute.DistributedValues` which contains 

234 the next input for all replicas. 

235 

236 Raises: 

237 `tf.errors.OutOfRangeError`: If the end of the iterator has been reached. 

238 """ 

239 raise NotImplementedError( 

240 "DistributedIterator.get_next() must be implemented in descendants.") 

241 

242 @property 

243 def element_spec(self): 

244 # pylint: disable=line-too-long 

245 """The type specification of an element of `tf.distribute.DistributedIterator`. 

246 

247 Example usage: 

248 

249 >>> global_batch_size = 16 

250 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

251 >>> dataset = tf.data.Dataset.from_tensors(([1.],[2])).repeat(100).batch(global_batch_size) 

252 >>> distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset)) 

253 >>> distributed_iterator.element_spec 

254 (PerReplicaSpec(TensorSpec(shape=(None, 1), dtype=tf.float32, name=None), 

255 TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)), 

256 PerReplicaSpec(TensorSpec(shape=(None, 1), dtype=tf.int32, name=None), 

257 TensorSpec(shape=(None, 1), dtype=tf.int32, name=None))) 

258 

259 Returns: 

260 A nested structure of `tf.TypeSpec` objects matching the structure of an 

261 element of this `tf.distribute.DistributedIterator`. This returned value 

262 is typically a `tf.distribute.DistributedValues` object and specifies the 

263 `tf.TensorSpec` of individual components. 

264 """ 

265 raise NotImplementedError( 

266 "DistributedIterator.element_spec() must be implemented in descendants") 

267 

268 def get_next_as_optional(self): 

269 # pylint: disable=line-too-long 

270 """Returns a `tf.experimental.Optional` that contains the next value for all replicas. 

271 

272 If the `tf.distribute.DistributedIterator` has reached the end of the 

273 sequence, the returned `tf.experimental.Optional` will have no value. 

274 

275 Example usage: 

276 

277 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

278 >>> global_batch_size = 2 

279 >>> steps_per_loop = 2 

280 >>> dataset = tf.data.Dataset.range(10).batch(global_batch_size) 

281 >>> distributed_iterator = iter( 

282 ... strategy.experimental_distribute_dataset(dataset)) 

283 >>> def step_fn(x): 

284 ... # train the model with inputs 

285 ... return x 

286 >>> @tf.function 

287 ... def train_fn(distributed_iterator): 

288 ... for _ in tf.range(steps_per_loop): 

289 ... optional_data = distributed_iterator.get_next_as_optional() 

290 ... if not optional_data.has_value(): 

291 ... break 

292 ... per_replica_results = strategy.run(step_fn, args=(optional_data.get_value(),)) 

293 ... tf.print(strategy.experimental_local_results(per_replica_results)) 

294 >>> train_fn(distributed_iterator) 

295 ... # ([0 1], [2 3]) 

296 ... # ([4], []) 

297 

298 Returns: 

299 An `tf.experimental.Optional` object representing the next value from the 

300 `tf.distribute.DistributedIterator` (if it has one) or no value. 

301 """ 

302 # pylint: enable=line-too-long 

303 raise NotImplementedError( 

304 "get_next_as_optional() not implemented in descendants") 

305 

306 

307@tf_export("distribute.DistributedDataset", v1=[]) 

308class DistributedDatasetInterface(Iterable): 

309 # pylint: disable=line-too-long 

310 """Represents a dataset distributed among devices and machines. 

311 

312 A `tf.distribute.DistributedDataset` could be thought of as a "distributed" 

313 dataset. When you use `tf.distribute` API to scale training to multiple 

314 devices or machines, you also need to distribute the input data, which leads 

315 to a `tf.distribute.DistributedDataset` instance, instead of a 

316 `tf.data.Dataset` instance in the non-distributed case. In TF 2.x, 

317 `tf.distribute.DistributedDataset` objects are Python iterables. 

318 

319 Note: `tf.distribute.DistributedDataset` instances are *not* of type 

320 `tf.data.Dataset`. It only supports two usages we will mention below: 

321 iteration and `element_spec`. We don't support any other APIs to transform or 

322 inspect the dataset. 

323 

324 There are two APIs to create a `tf.distribute.DistributedDataset` object: 

325 `tf.distribute.Strategy.experimental_distribute_dataset(dataset)`and 

326 `tf.distribute.Strategy.distribute_datasets_from_function(dataset_fn)`. 

327 *When to use which?* When you have a `tf.data.Dataset` instance, and the 

328 regular batch splitting (i.e. re-batch the input `tf.data.Dataset` instance 

329 with a new batch size that is equal to the global batch size divided by the 

330 number of replicas in sync) and autosharding (i.e. the 

331 `tf.data.experimental.AutoShardPolicy` options) work for you, use the former 

332 API. Otherwise, if you are *not* using a canonical `tf.data.Dataset` instance, 

333 or you would like to customize the batch splitting or sharding, you can wrap 

334 these logic in a `dataset_fn` and use the latter API. Both API handles 

335 prefetch to device for the user. For more details and examples, follow the 

336 links to the APIs. 

337 

338 

339 There are two main usages of a `DistributedDataset` object: 

340 

341 1. Iterate over it to generate the input for a single device or multiple 

342 devices, which is a `tf.distribute.DistributedValues` instance. To do this, 

343 you can: 

344 

345 * use a pythonic for-loop construct: 

346 

347 >>> global_batch_size = 4 

348 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

349 >>> dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(4).batch(global_batch_size) 

350 >>> dist_dataset = strategy.experimental_distribute_dataset(dataset) 

351 >>> @tf.function 

352 ... def train_step(input): 

353 ... features, labels = input 

354 ... return labels - 0.3 * features 

355 >>> for x in dist_dataset: 

356 ... # train_step trains the model using the dataset elements 

357 ... loss = strategy.run(train_step, args=(x,)) 

358 ... print("Loss is", loss) 

359 Loss is PerReplica:{ 

360 0: tf.Tensor( 

361 [[0.7] 

362 [0.7]], shape=(2, 1), dtype=float32), 

363 1: tf.Tensor( 

364 [[0.7] 

365 [0.7]], shape=(2, 1), dtype=float32) 

366 } 

367 

368 Placing the loop inside a `tf.function` will give a performance boost. 

369 However `break` and `return` are currently not supported if the loop is 

370 placed inside a `tf.function`. We also don't support placing the loop 

371 inside a `tf.function` when using 

372 `tf.distribute.experimental.MultiWorkerMirroredStrategy` or 

373 `tf.distribute.experimental.TPUStrategy` with multiple workers. 

374 

375 * use `__iter__` to create an explicit iterator, which is of type 

376 `tf.distribute.DistributedIterator` 

377 

378 >>> global_batch_size = 4 

379 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

380 >>> train_dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(50).batch(global_batch_size) 

381 >>> train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset) 

382 >>> @tf.function 

383 ... def distributed_train_step(dataset_inputs): 

384 ... def train_step(input): 

385 ... loss = tf.constant(0.1) 

386 ... return loss 

387 ... per_replica_losses = strategy.run(train_step, args=(dataset_inputs,)) 

388 ... return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,axis=None) 

389 >>> EPOCHS = 2 

390 >>> STEPS = 3 

391 >>> for epoch in range(EPOCHS): 

392 ... total_loss = 0.0 

393 ... num_batches = 0 

394 ... dist_dataset_iterator = iter(train_dist_dataset) 

395 ... for _ in range(STEPS): 

396 ... total_loss += distributed_train_step(next(dist_dataset_iterator)) 

397 ... num_batches += 1 

398 ... average_train_loss = total_loss / num_batches 

399 ... template = ("Epoch {}, Loss: {:.4f}") 

400 ... print (template.format(epoch+1, average_train_loss)) 

401 Epoch 1, Loss: 0.2000 

402 Epoch 2, Loss: 0.2000 

403 

404 

405 To achieve a performance improvement, you can also wrap the `strategy.run` 

406 call with a `tf.range` inside a `tf.function`. This runs multiple steps in a 

407 `tf.function`. Autograph will convert it to a `tf.while_loop` on the worker. 

408 However, it is less flexible comparing with running a single step inside 

409 `tf.function`. For example, you cannot run things eagerly or arbitrary 

410 python code within the steps. 

411 

412 

413 2. Inspect the `tf.TypeSpec` of the data generated by `DistributedDataset`. 

414 

415 `tf.distribute.DistributedDataset` generates 

416 `tf.distribute.DistributedValues` as input to the devices. If you pass the 

417 input to a `tf.function` and would like to specify the shape and type of 

418 each Tensor argument to the function, you can pass a `tf.TypeSpec` object to 

419 the `input_signature` argument of the `tf.function`. To get the 

420 `tf.TypeSpec` of the input, you can use the `element_spec` property of the 

421 `tf.distribute.DistributedDataset` or `tf.distribute.DistributedIterator` 

422 object. 

423 

424 For example: 

425 

426 >>> global_batch_size = 4 

427 >>> epochs = 1 

428 >>> steps_per_epoch = 1 

429 >>> mirrored_strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

430 >>> dataset = tf.data.Dataset.from_tensors(([2.])).repeat(100).batch(global_batch_size) 

431 >>> dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) 

432 >>> @tf.function(input_signature=[dist_dataset.element_spec]) 

433 ... def train_step(per_replica_inputs): 

434 ... def step_fn(inputs): 

435 ... return tf.square(inputs) 

436 ... return mirrored_strategy.run(step_fn, args=(per_replica_inputs,)) 

437 >>> for _ in range(epochs): 

438 ... iterator = iter(dist_dataset) 

439 ... for _ in range(steps_per_epoch): 

440 ... output = train_step(next(iterator)) 

441 ... print(output) 

442 PerReplica:{ 

443 0: tf.Tensor( 

444 [[4.] 

445 [4.]], shape=(2, 1), dtype=float32), 

446 1: tf.Tensor( 

447 [[4.] 

448 [4.]], shape=(2, 1), dtype=float32) 

449 } 

450 

451 

452 Visit the [tutorial](https://www.tensorflow.org/tutorials/distribute/input) 

453 on distributed input for more examples and caveats. 

454 """ 

455 

456 def __iter__(self): 

457 """Creates an iterator for the `tf.distribute.DistributedDataset`. 

458 

459 The returned iterator implements the Python Iterator protocol. 

460 

461 Example usage: 

462 

463 >>> global_batch_size = 4 

464 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

465 >>> dataset = tf.data.Dataset.from_tensor_slices([1, 2, 3, 4]).repeat().batch(global_batch_size) 

466 >>> distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset)) 

467 >>> print(next(distributed_iterator)) 

468 PerReplica:{ 

469 0: tf.Tensor([1 2], shape=(2,), dtype=int32), 

470 1: tf.Tensor([3 4], shape=(2,), dtype=int32) 

471 } 

472 

473 Returns: 

474 An `tf.distribute.DistributedIterator` instance for the given 

475 `tf.distribute.DistributedDataset` object to enumerate over the 

476 distributed data. 

477 """ 

478 raise NotImplementedError("Must be implemented in descendants") 

479 

480 @property 

481 def element_spec(self): 

482 """The type specification of an element of this `tf.distribute.DistributedDataset`. 

483 

484 Example usage: 

485 

486 >>> global_batch_size = 16 

487 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"]) 

488 >>> dataset = tf.data.Dataset.from_tensors(([1.],[2])).repeat(100).batch(global_batch_size) 

489 >>> dist_dataset = strategy.experimental_distribute_dataset(dataset) 

490 >>> dist_dataset.element_spec 

491 (PerReplicaSpec(TensorSpec(shape=(None, 1), dtype=tf.float32, name=None), 

492 TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)), 

493 PerReplicaSpec(TensorSpec(shape=(None, 1), dtype=tf.int32, name=None), 

494 TensorSpec(shape=(None, 1), dtype=tf.int32, name=None))) 

495 

496 Returns: 

497 A nested structure of `tf.TypeSpec` objects matching the structure of an 

498 element of this `tf.distribute.DistributedDataset`. This returned value is 

499 typically a `tf.distribute.DistributedValues` object and specifies the 

500 `tf.TensorSpec` of individual components. 

501 """ 

502 raise NotImplementedError( 

503 "DistributedDataset.element_spec must be implemented in descendants.") 

504 

505 @doc_controls.do_not_generate_docs 

506 def reduce(self, initial_state, reduce_func): 

507 raise NotImplementedError( 

508 "DistributedDataset.reduce must be implemented in descendants.")