Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/types/distribute.py: 76%
38 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Types specific to tf.distribute."""
17from tensorflow.python.util.tf_export import tf_export
18from tensorflow.tools.docs import doc_controls
20# TODO(mdan, anjalisridhar): Decide the location of this file.
23class Iterable(object):
24 """Interface for distributed objects that admit iteration/reduction."""
26 def __iter__(self):
27 pass
29 # TODO(mdan): Describe this contract.
30 def reduce(self, initial_state, reduce_func):
31 """Reduces this iterable object to a single element.
33 The transformation calls `reduce_func` successively on each element.
34 The `initial_state` argument is used for the initial state and the final
35 state is returned as the result.
37 Args:
38 initial_state: An element representing the initial state of the
39 reduction.
40 reduce_func: A function that maps `(old_state, input_element)` to
41 `new_state`. The structure of `new_state` must match the structure of
42 `old_state`. For the first element, `old_state` is `initial_state`.
44 Returns:
45 The final state of the transformation.
46 """
49class Iterator(object):
50 """Interface for distributed iterators."""
52 def get_next(self):
53 """Unlike __next__, this may use a non-raising mechanism."""
55 def __next__(self):
56 pass
58 def __iter__(self):
59 pass
62@tf_export("distribute.DistributedValues", v1=[])
63class DistributedValues(object):
64 """Base class for representing distributed values.
66 A subclass instance of `tf.distribute.DistributedValues` is created when
67 creating variables within a distribution strategy, iterating a
68 `tf.distribute.DistributedDataset` or through `tf.distribute.Strategy.run`.
69 This base class should never be instantiated directly.
70 `tf.distribute.DistributedValues` contains a value per replica. Depending on
71 the subclass, the values could either be synced on update, synced on demand,
72 or never synced.
74 Two representative types of `tf.distribute.DistributedValues` are
75 `tf.types.experimental.PerReplica` and `tf.types.experimental.Mirrored`
76 values.
78 `PerReplica` values exist on the worker devices, with a different value for
79 each replica. They are produced by iterating through a distributed dataset
80 returned by `tf.distribute.Strategy.experimental_distribute_dataset` (Example
81 1, below) and `tf.distribute.Strategy.distribute_datasets_from_function`. They
82 are also the typical result returned by `tf.distribute.Strategy.run` (Example
83 2).
85 `Mirrored` values are like `PerReplica` values, except we know that the value
86 on all replicas are the same. `Mirrored` values are kept synchronized by the
87 distribution strategy in use, while `PerReplica` values are left
88 unsynchronized. `Mirrored` values typically represent model weights. We can
89 safely read a `Mirrored` value in a cross-replica context by using the value
90 on any replica, while PerReplica values should not be read or manipulated in
91 a cross-replica context."
93 `tf.distribute.DistributedValues` can be reduced via `strategy.reduce` to
94 obtain a single value across replicas (Example 4), used as input into
95 `tf.distribute.Strategy.run` (Example 3), or collected to inspect the
96 per-replica values using `tf.distribute.Strategy.experimental_local_results`
97 (Example 5).
99 Example usages:
101 1. Created from a `tf.distribute.DistributedDataset`:
103 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
104 >>> dataset = tf.data.Dataset.from_tensor_slices([5., 6., 7., 8.]).batch(2)
105 >>> dataset_iterator = iter(strategy.experimental_distribute_dataset(dataset))
106 >>> distributed_values = next(dataset_iterator)
107 >>> distributed_values
108 PerReplica:{
109 0: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([5.], dtype=float32)>,
110 1: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([6.], dtype=float32)>
111 }
113 2. Returned by `run`:
115 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
116 >>> @tf.function
117 ... def run():
118 ... ctx = tf.distribute.get_replica_context()
119 ... return ctx.replica_id_in_sync_group
120 >>> distributed_values = strategy.run(run)
121 >>> distributed_values
122 PerReplica:{
123 0: <tf.Tensor: shape=(), dtype=int32, numpy=0>,
124 1: <tf.Tensor: shape=(), dtype=int32, numpy=1>
125 }
127 3. As input into `run`:
129 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
130 >>> dataset = tf.data.Dataset.from_tensor_slices([5., 6., 7., 8.]).batch(2)
131 >>> dataset_iterator = iter(strategy.experimental_distribute_dataset(dataset))
132 >>> distributed_values = next(dataset_iterator)
133 >>> @tf.function
134 ... def run(input):
135 ... return input + 1.0
136 >>> updated_value = strategy.run(run, args=(distributed_values,))
137 >>> updated_value
138 PerReplica:{
139 0: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([6.], dtype=float32)>,
140 1: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([7.], dtype=float32)>
141 }
143 4. As input into `reduce`:
145 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
146 >>> dataset = tf.data.Dataset.from_tensor_slices([5., 6., 7., 8.]).batch(2)
147 >>> dataset_iterator = iter(strategy.experimental_distribute_dataset(dataset))
148 >>> distributed_values = next(dataset_iterator)
149 >>> reduced_value = strategy.reduce(tf.distribute.ReduceOp.SUM,
150 ... distributed_values,
151 ... axis = 0)
152 >>> reduced_value
153 <tf.Tensor: shape=(), dtype=float32, numpy=11.0>
155 5. How to inspect per-replica values locally:
157 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
158 >>> dataset = tf.data.Dataset.from_tensor_slices([5., 6., 7., 8.]).batch(2)
159 >>> dataset_iterator = iter(strategy.experimental_distribute_dataset(dataset))
160 >>> per_replica_values = strategy.experimental_local_results(
161 ... distributed_values)
162 >>> per_replica_values
163 (<tf.Tensor: shape=(1,), dtype=float32, numpy=array([5.], dtype=float32)>,
164 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([6.], dtype=float32)>)
166 """
169@tf_export("types.experimental.distributed.PerReplica", v1=[])
170class PerReplica(DistributedValues):
171 """Holds a distributed value: a map from replica id to unsynchronized values.
173 `PerReplica` values exist on the worker devices, with a different value for
174 each replica. They can be produced many ways, often by iterating through a
175 distributed dataset returned by
176 `tf.distribute.Strategy.experimental_distribute_dataset` and
177 `tf.distribute.Strategy.distribute_datasets_from_function`. They are also the
178 typical result returned by `tf.distribute.Strategy.run`.
179 """
182@tf_export("types.experimental.distributed.Mirrored", v1=[])
183class Mirrored(DistributedValues):
184 """Holds a distributed value: a map from replica id to synchronized values.
186 `Mirrored` values are `tf.distribute.DistributedValues` for which we know that
187 the value on all replicas is the same. `Mirrored` values are kept synchronized
188 by the distribution strategy in use, while `tf.types.experimental.PerReplica`
189 values are left unsynchronized. `Mirrored` values typically represent model
190 weights. We can safely read a `Mirrored` value in a cross-replica context by
191 using the value on any replica, while `PerReplica` values should not be read
192 or manipulated directly by the user in a cross-replica context.
193 """
196@tf_export("distribute.DistributedIterator", v1=[])
197class DistributedIteratorInterface(Iterator):
198 """An iterator over `tf.distribute.DistributedDataset`.
200 `tf.distribute.DistributedIterator` is the primary mechanism for enumerating
201 elements of a `tf.distribute.DistributedDataset`. It supports the Python
202 Iterator protocol, which means it can be iterated over using a for-loop or by
203 fetching individual elements explicitly via `get_next()`.
205 You can create a `tf.distribute.DistributedIterator` by calling `iter` on
206 a `tf.distribute.DistributedDataset` or creating a python loop over a
207 `tf.distribute.DistributedDataset`.
209 Visit the [tutorial](https://www.tensorflow.org/tutorials/distribute/input)
210 on distributed input for more examples and caveats.
211 """
213 def get_next(self):
214 """Returns the next input from the iterator for all replicas.
216 Example use:
218 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
219 >>> dataset = tf.data.Dataset.range(100).batch(2)
220 >>> dist_dataset = strategy.experimental_distribute_dataset(dataset)
221 >>> dist_dataset_iterator = iter(dist_dataset)
222 >>> @tf.function
223 ... def one_step(input):
224 ... return input
225 >>> step_num = 5
226 >>> for _ in range(step_num):
227 ... strategy.run(one_step, args=(dist_dataset_iterator.get_next(),))
228 >>> strategy.experimental_local_results(dist_dataset_iterator.get_next())
229 (<tf.Tensor: shape=(1,), dtype=int64, numpy=array([10])>,
230 <tf.Tensor: shape=(1,), dtype=int64, numpy=array([11])>)
232 Returns:
233 A single `tf.Tensor` or a `tf.distribute.DistributedValues` which contains
234 the next input for all replicas.
236 Raises:
237 `tf.errors.OutOfRangeError`: If the end of the iterator has been reached.
238 """
239 raise NotImplementedError(
240 "DistributedIterator.get_next() must be implemented in descendants.")
242 @property
243 def element_spec(self):
244 # pylint: disable=line-too-long
245 """The type specification of an element of `tf.distribute.DistributedIterator`.
247 Example usage:
249 >>> global_batch_size = 16
250 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
251 >>> dataset = tf.data.Dataset.from_tensors(([1.],[2])).repeat(100).batch(global_batch_size)
252 >>> distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
253 >>> distributed_iterator.element_spec
254 (PerReplicaSpec(TensorSpec(shape=(None, 1), dtype=tf.float32, name=None),
255 TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)),
256 PerReplicaSpec(TensorSpec(shape=(None, 1), dtype=tf.int32, name=None),
257 TensorSpec(shape=(None, 1), dtype=tf.int32, name=None)))
259 Returns:
260 A nested structure of `tf.TypeSpec` objects matching the structure of an
261 element of this `tf.distribute.DistributedIterator`. This returned value
262 is typically a `tf.distribute.DistributedValues` object and specifies the
263 `tf.TensorSpec` of individual components.
264 """
265 raise NotImplementedError(
266 "DistributedIterator.element_spec() must be implemented in descendants")
268 def get_next_as_optional(self):
269 # pylint: disable=line-too-long
270 """Returns a `tf.experimental.Optional` that contains the next value for all replicas.
272 If the `tf.distribute.DistributedIterator` has reached the end of the
273 sequence, the returned `tf.experimental.Optional` will have no value.
275 Example usage:
277 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
278 >>> global_batch_size = 2
279 >>> steps_per_loop = 2
280 >>> dataset = tf.data.Dataset.range(10).batch(global_batch_size)
281 >>> distributed_iterator = iter(
282 ... strategy.experimental_distribute_dataset(dataset))
283 >>> def step_fn(x):
284 ... # train the model with inputs
285 ... return x
286 >>> @tf.function
287 ... def train_fn(distributed_iterator):
288 ... for _ in tf.range(steps_per_loop):
289 ... optional_data = distributed_iterator.get_next_as_optional()
290 ... if not optional_data.has_value():
291 ... break
292 ... per_replica_results = strategy.run(step_fn, args=(optional_data.get_value(),))
293 ... tf.print(strategy.experimental_local_results(per_replica_results))
294 >>> train_fn(distributed_iterator)
295 ... # ([0 1], [2 3])
296 ... # ([4], [])
298 Returns:
299 An `tf.experimental.Optional` object representing the next value from the
300 `tf.distribute.DistributedIterator` (if it has one) or no value.
301 """
302 # pylint: enable=line-too-long
303 raise NotImplementedError(
304 "get_next_as_optional() not implemented in descendants")
307@tf_export("distribute.DistributedDataset", v1=[])
308class DistributedDatasetInterface(Iterable):
309 # pylint: disable=line-too-long
310 """Represents a dataset distributed among devices and machines.
312 A `tf.distribute.DistributedDataset` could be thought of as a "distributed"
313 dataset. When you use `tf.distribute` API to scale training to multiple
314 devices or machines, you also need to distribute the input data, which leads
315 to a `tf.distribute.DistributedDataset` instance, instead of a
316 `tf.data.Dataset` instance in the non-distributed case. In TF 2.x,
317 `tf.distribute.DistributedDataset` objects are Python iterables.
319 Note: `tf.distribute.DistributedDataset` instances are *not* of type
320 `tf.data.Dataset`. It only supports two usages we will mention below:
321 iteration and `element_spec`. We don't support any other APIs to transform or
322 inspect the dataset.
324 There are two APIs to create a `tf.distribute.DistributedDataset` object:
325 `tf.distribute.Strategy.experimental_distribute_dataset(dataset)`and
326 `tf.distribute.Strategy.distribute_datasets_from_function(dataset_fn)`.
327 *When to use which?* When you have a `tf.data.Dataset` instance, and the
328 regular batch splitting (i.e. re-batch the input `tf.data.Dataset` instance
329 with a new batch size that is equal to the global batch size divided by the
330 number of replicas in sync) and autosharding (i.e. the
331 `tf.data.experimental.AutoShardPolicy` options) work for you, use the former
332 API. Otherwise, if you are *not* using a canonical `tf.data.Dataset` instance,
333 or you would like to customize the batch splitting or sharding, you can wrap
334 these logic in a `dataset_fn` and use the latter API. Both API handles
335 prefetch to device for the user. For more details and examples, follow the
336 links to the APIs.
339 There are two main usages of a `DistributedDataset` object:
341 1. Iterate over it to generate the input for a single device or multiple
342 devices, which is a `tf.distribute.DistributedValues` instance. To do this,
343 you can:
345 * use a pythonic for-loop construct:
347 >>> global_batch_size = 4
348 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
349 >>> dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(4).batch(global_batch_size)
350 >>> dist_dataset = strategy.experimental_distribute_dataset(dataset)
351 >>> @tf.function
352 ... def train_step(input):
353 ... features, labels = input
354 ... return labels - 0.3 * features
355 >>> for x in dist_dataset:
356 ... # train_step trains the model using the dataset elements
357 ... loss = strategy.run(train_step, args=(x,))
358 ... print("Loss is", loss)
359 Loss is PerReplica:{
360 0: tf.Tensor(
361 [[0.7]
362 [0.7]], shape=(2, 1), dtype=float32),
363 1: tf.Tensor(
364 [[0.7]
365 [0.7]], shape=(2, 1), dtype=float32)
366 }
368 Placing the loop inside a `tf.function` will give a performance boost.
369 However `break` and `return` are currently not supported if the loop is
370 placed inside a `tf.function`. We also don't support placing the loop
371 inside a `tf.function` when using
372 `tf.distribute.experimental.MultiWorkerMirroredStrategy` or
373 `tf.distribute.experimental.TPUStrategy` with multiple workers.
375 * use `__iter__` to create an explicit iterator, which is of type
376 `tf.distribute.DistributedIterator`
378 >>> global_batch_size = 4
379 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
380 >>> train_dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(50).batch(global_batch_size)
381 >>> train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
382 >>> @tf.function
383 ... def distributed_train_step(dataset_inputs):
384 ... def train_step(input):
385 ... loss = tf.constant(0.1)
386 ... return loss
387 ... per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
388 ... return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,axis=None)
389 >>> EPOCHS = 2
390 >>> STEPS = 3
391 >>> for epoch in range(EPOCHS):
392 ... total_loss = 0.0
393 ... num_batches = 0
394 ... dist_dataset_iterator = iter(train_dist_dataset)
395 ... for _ in range(STEPS):
396 ... total_loss += distributed_train_step(next(dist_dataset_iterator))
397 ... num_batches += 1
398 ... average_train_loss = total_loss / num_batches
399 ... template = ("Epoch {}, Loss: {:.4f}")
400 ... print (template.format(epoch+1, average_train_loss))
401 Epoch 1, Loss: 0.2000
402 Epoch 2, Loss: 0.2000
405 To achieve a performance improvement, you can also wrap the `strategy.run`
406 call with a `tf.range` inside a `tf.function`. This runs multiple steps in a
407 `tf.function`. Autograph will convert it to a `tf.while_loop` on the worker.
408 However, it is less flexible comparing with running a single step inside
409 `tf.function`. For example, you cannot run things eagerly or arbitrary
410 python code within the steps.
413 2. Inspect the `tf.TypeSpec` of the data generated by `DistributedDataset`.
415 `tf.distribute.DistributedDataset` generates
416 `tf.distribute.DistributedValues` as input to the devices. If you pass the
417 input to a `tf.function` and would like to specify the shape and type of
418 each Tensor argument to the function, you can pass a `tf.TypeSpec` object to
419 the `input_signature` argument of the `tf.function`. To get the
420 `tf.TypeSpec` of the input, you can use the `element_spec` property of the
421 `tf.distribute.DistributedDataset` or `tf.distribute.DistributedIterator`
422 object.
424 For example:
426 >>> global_batch_size = 4
427 >>> epochs = 1
428 >>> steps_per_epoch = 1
429 >>> mirrored_strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
430 >>> dataset = tf.data.Dataset.from_tensors(([2.])).repeat(100).batch(global_batch_size)
431 >>> dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
432 >>> @tf.function(input_signature=[dist_dataset.element_spec])
433 ... def train_step(per_replica_inputs):
434 ... def step_fn(inputs):
435 ... return tf.square(inputs)
436 ... return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
437 >>> for _ in range(epochs):
438 ... iterator = iter(dist_dataset)
439 ... for _ in range(steps_per_epoch):
440 ... output = train_step(next(iterator))
441 ... print(output)
442 PerReplica:{
443 0: tf.Tensor(
444 [[4.]
445 [4.]], shape=(2, 1), dtype=float32),
446 1: tf.Tensor(
447 [[4.]
448 [4.]], shape=(2, 1), dtype=float32)
449 }
452 Visit the [tutorial](https://www.tensorflow.org/tutorials/distribute/input)
453 on distributed input for more examples and caveats.
454 """
456 def __iter__(self):
457 """Creates an iterator for the `tf.distribute.DistributedDataset`.
459 The returned iterator implements the Python Iterator protocol.
461 Example usage:
463 >>> global_batch_size = 4
464 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
465 >>> dataset = tf.data.Dataset.from_tensor_slices([1, 2, 3, 4]).repeat().batch(global_batch_size)
466 >>> distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
467 >>> print(next(distributed_iterator))
468 PerReplica:{
469 0: tf.Tensor([1 2], shape=(2,), dtype=int32),
470 1: tf.Tensor([3 4], shape=(2,), dtype=int32)
471 }
473 Returns:
474 An `tf.distribute.DistributedIterator` instance for the given
475 `tf.distribute.DistributedDataset` object to enumerate over the
476 distributed data.
477 """
478 raise NotImplementedError("Must be implemented in descendants")
480 @property
481 def element_spec(self):
482 """The type specification of an element of this `tf.distribute.DistributedDataset`.
484 Example usage:
486 >>> global_batch_size = 16
487 >>> strategy = tf.distribute.MirroredStrategy(["GPU:0", "GPU:1"])
488 >>> dataset = tf.data.Dataset.from_tensors(([1.],[2])).repeat(100).batch(global_batch_size)
489 >>> dist_dataset = strategy.experimental_distribute_dataset(dataset)
490 >>> dist_dataset.element_spec
491 (PerReplicaSpec(TensorSpec(shape=(None, 1), dtype=tf.float32, name=None),
492 TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)),
493 PerReplicaSpec(TensorSpec(shape=(None, 1), dtype=tf.int32, name=None),
494 TensorSpec(shape=(None, 1), dtype=tf.int32, name=None)))
496 Returns:
497 A nested structure of `tf.TypeSpec` objects matching the structure of an
498 element of this `tf.distribute.DistributedDataset`. This returned value is
499 typically a `tf.distribute.DistributedValues` object and specifies the
500 `tf.TensorSpec` of individual components.
501 """
502 raise NotImplementedError(
503 "DistributedDataset.element_spec must be implemented in descendants.")
505 @doc_controls.do_not_generate_docs
506 def reduce(self, initial_state, reduce_func):
507 raise NotImplementedError(
508 "DistributedDataset.reduce must be implemented in descendants.")