Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/distribute/coordinator/remote_value.py: 85%

13 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2023 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""RemoteValue interface class.""" 

16 

17import enum 

18 

19from tensorflow.python.util.tf_export import tf_export 

20 

21 

22class RemoteValueStatus(enum.Enum): 

23 """The status of a `RemoteValue` object. 

24 

25 A `RemoteValue` object can have three states: 

26 1) not ready: no value, no non-retryable error and not aborted; 

27 2) aborted: i.e. the execution of function was aborted because of task 

28 failure, but can be retried; 

29 3) ready: i.e. has value or has non-tryable error; 

30 

31 The initial state of a `RemoteValue` is "not ready". When its corresponding 

32 closure has 

33 been executed at least once, it will become aborted or ready. The state 

34 transitions are: 

35 1) not ready -> 2) aborted: 

36 when the corresponding closure is aborted due to worker failure, and the 

37 worker failure is not immediately handled. 

38 1) not ready -> 3) ready: 

39 when the corresponding closure has been executed successfully. 

40 2) aborted -> 3) ready: 

41 when the `RemoteValue` is rebuilt by rerunning the corresponding closure 

42 and the closure has been executed successfully. 

43 3) ready -> 2) aborted: 

44 when the corresponding closure had been executed successfully but later 

45 the corresponding remote worker failed. This is currently only implemented 

46 for resource `RemoteValue` like iterators. 

47 """ 

48 NOT_READY = "NOT_READY" 

49 ABORTED = "ABORTED" 

50 READY = "READY" 

51 

52 

53@tf_export("distribute.experimental.coordinator.RemoteValue", 

54 "distribute.coordinator.RemoteValue", v1=[]) 

55class RemoteValue(object): 

56 """An asynchronously available value of a scheduled function. 

57 

58 This class is used as the return value of 

59 `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule` where 

60 the underlying value becomes available at a later time once the function has 

61 been executed. 

62 

63 Using `tf.distribute.experimental.coordinator.RemoteValue` as an input to 

64 a subsequent function scheduled with 

65 `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule` is 

66 currently not supported. 

67 

68 Example: 

69 

70 ```python 

71 strategy = tf.distribute.experimental.ParameterServerStrategy( 

72 cluster_resolver=...) 

73 coordinator = ( 

74 tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)) 

75 

76 with strategy.scope(): 

77 v1 = tf.Variable(initial_value=0.0) 

78 v2 = tf.Variable(initial_value=1.0) 

79 

80 @tf.function 

81 def worker_fn(): 

82 v1.assign_add(0.1) 

83 v2.assign_sub(0.2) 

84 return v1.read_value() / v2.read_value() 

85 

86 result = coordinator.schedule(worker_fn) 

87 # Note that `fetch()` gives the actual result instead of a `tf.Tensor`. 

88 assert result.fetch() == 0.125 

89 

90 for _ in range(10): 

91 # `worker_fn` will be run on arbitrary workers that are available. The 

92 # `result` value will be available later. 

93 result = coordinator.schedule(worker_fn) 

94 ``` 

95 """ 

96 

97 def fetch(self): 

98 """Wait for the result of `RemoteValue` and return the numpy result. 

99 

100 This makes the value concrete by copying the remote value to local. 

101 

102 Returns: 

103 The numpy array structure of the actual output of the `tf.function` 

104 associated with this `RemoteValue`, previously returned by a 

105 `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule` call. 

106 This can be a single value, or a structure of values, depending on the 

107 output of the `tf.function`. 

108 

109 Raises: 

110 tf.errors.CancelledError: If the function that produces this `RemoteValue` 

111 is aborted or cancelled due to failure. 

112 """ 

113 raise NotImplementedError("Must be implemented in subclasses.") 

114 

115 def get(self): 

116 """Wait for the result of `RemoteValue` and return the tensor result. 

117 

118 This makes the value concrete by copying the remote tensor to local. 

119 

120 Returns: 

121 The actual output (in the form of `tf.Tensor`s) of the `tf.function` 

122 associated with this `RemoteValue`, previously returned by a 

123 `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule` call. 

124 This can be a single Tensor, or a structure of Tensors, depending on the 

125 output of the `tf.function`. 

126 

127 Raises: 

128 tf.errors.CancelledError: If the function that produces this `RemoteValue` 

129 is aborted or cancelled due to failure. 

130 """ 

131 raise NotImplementedError("Must be implemented in subclasses.")