Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/distribute/coordinator/remote_value.py: 85%
13 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2023 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""RemoteValue interface class."""
17import enum
19from tensorflow.python.util.tf_export import tf_export
22class RemoteValueStatus(enum.Enum):
23 """The status of a `RemoteValue` object.
25 A `RemoteValue` object can have three states:
26 1) not ready: no value, no non-retryable error and not aborted;
27 2) aborted: i.e. the execution of function was aborted because of task
28 failure, but can be retried;
29 3) ready: i.e. has value or has non-tryable error;
31 The initial state of a `RemoteValue` is "not ready". When its corresponding
32 closure has
33 been executed at least once, it will become aborted or ready. The state
34 transitions are:
35 1) not ready -> 2) aborted:
36 when the corresponding closure is aborted due to worker failure, and the
37 worker failure is not immediately handled.
38 1) not ready -> 3) ready:
39 when the corresponding closure has been executed successfully.
40 2) aborted -> 3) ready:
41 when the `RemoteValue` is rebuilt by rerunning the corresponding closure
42 and the closure has been executed successfully.
43 3) ready -> 2) aborted:
44 when the corresponding closure had been executed successfully but later
45 the corresponding remote worker failed. This is currently only implemented
46 for resource `RemoteValue` like iterators.
47 """
48 NOT_READY = "NOT_READY"
49 ABORTED = "ABORTED"
50 READY = "READY"
53@tf_export("distribute.experimental.coordinator.RemoteValue",
54 "distribute.coordinator.RemoteValue", v1=[])
55class RemoteValue(object):
56 """An asynchronously available value of a scheduled function.
58 This class is used as the return value of
59 `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule` where
60 the underlying value becomes available at a later time once the function has
61 been executed.
63 Using `tf.distribute.experimental.coordinator.RemoteValue` as an input to
64 a subsequent function scheduled with
65 `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule` is
66 currently not supported.
68 Example:
70 ```python
71 strategy = tf.distribute.experimental.ParameterServerStrategy(
72 cluster_resolver=...)
73 coordinator = (
74 tf.distribute.experimental.coordinator.ClusterCoordinator(strategy))
76 with strategy.scope():
77 v1 = tf.Variable(initial_value=0.0)
78 v2 = tf.Variable(initial_value=1.0)
80 @tf.function
81 def worker_fn():
82 v1.assign_add(0.1)
83 v2.assign_sub(0.2)
84 return v1.read_value() / v2.read_value()
86 result = coordinator.schedule(worker_fn)
87 # Note that `fetch()` gives the actual result instead of a `tf.Tensor`.
88 assert result.fetch() == 0.125
90 for _ in range(10):
91 # `worker_fn` will be run on arbitrary workers that are available. The
92 # `result` value will be available later.
93 result = coordinator.schedule(worker_fn)
94 ```
95 """
97 def fetch(self):
98 """Wait for the result of `RemoteValue` and return the numpy result.
100 This makes the value concrete by copying the remote value to local.
102 Returns:
103 The numpy array structure of the actual output of the `tf.function`
104 associated with this `RemoteValue`, previously returned by a
105 `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule` call.
106 This can be a single value, or a structure of values, depending on the
107 output of the `tf.function`.
109 Raises:
110 tf.errors.CancelledError: If the function that produces this `RemoteValue`
111 is aborted or cancelled due to failure.
112 """
113 raise NotImplementedError("Must be implemented in subclasses.")
115 def get(self):
116 """Wait for the result of `RemoteValue` and return the tensor result.
118 This makes the value concrete by copying the remote tensor to local.
120 Returns:
121 The actual output (in the form of `tf.Tensor`s) of the `tf.function`
122 associated with this `RemoteValue`, previously returned by a
123 `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule` call.
124 This can be a single Tensor, or a structure of Tensors, depending on the
125 output of the `tf.function`.
127 Raises:
128 tf.errors.CancelledError: If the function that produces this `RemoteValue`
129 is aborted or cancelled due to failure.
130 """
131 raise NotImplementedError("Must be implemented in subclasses.")