Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/data/experimental/service/__init__.py: 100%

9 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2020 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""API for using the tf.data service. 

16 

17This module contains: 

18 

191. tf.data server implementations for running the tf.data service. 

202. APIs for registering datasets with the tf.data service and reading from 

21 the registered datasets. 

22 

23The tf.data service provides the following benefits: 

24 

25- Horizontal scaling of tf.data input pipeline processing to solve input 

26 bottlenecks. 

27- Data coordination for distributed training. Coordinated reads 

28 enable all replicas to train on similar-length examples across each global 

29 training step, improving step times in synchronous training. 

30- Dynamic balancing of data across training replicas. 

31 

32>>> dispatcher = tf.data.experimental.service.DispatchServer() 

33>>> dispatcher_address = dispatcher.target.split("://")[1] 

34>>> worker = tf.data.experimental.service.WorkerServer( 

35... tf.data.experimental.service.WorkerConfig( 

36... dispatcher_address=dispatcher_address)) 

37>>> dataset = tf.data.Dataset.range(10) 

38>>> dataset = dataset.apply(tf.data.experimental.service.distribute( 

39... processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

40... service=dispatcher.target)) 

41>>> print(list(dataset.as_numpy_iterator())) 

42[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

43 

44## Setup 

45 

46This section goes over how to set up the tf.data service. 

47 

48### Run tf.data servers 

49 

50The tf.data service consists of one dispatch server and `n` worker servers. 

51tf.data servers should be brought up alongside your training jobs, then brought 

52down when the jobs are finished. 

53Use `tf.data.experimental.service.DispatchServer` to start a dispatch server, 

54and `tf.data.experimental.service.WorkerServer` to start worker servers. Servers 

55can be run in the same process for testing purposes, or scaled up on separate 

56machines. 

57 

58See https://github.com/tensorflow/ecosystem/tree/master/data_service for an 

59example of using Google Kubernetes Engine (GKE) to manage the tf.data service. 

60Note that the server implementation in 

61[tf_std_data_server.py](https://github.com/tensorflow/ecosystem/blob/master/data_service/tf_std_data_server.py) 

62is not GKE-specific, and can be used to run the tf.data service in other 

63contexts. 

64 

65### Custom ops 

66 

67If your dataset uses custom ops, these ops need to be made available to tf.data 

68servers by calling 

69[load_op_library](https://www.tensorflow.org/api_docs/python/tf/load_op_library) 

70from the dispatcher and worker processes at startup. 

71 

72## Usage 

73 

74Users interact with tf.data service by programmatically registering their 

75datasets with tf.data service, then creating datasets that read from the 

76registered datasets. The 

77[register_dataset](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/register_dataset) 

78function registers a dataset, then the 

79[from_dataset_id](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/from_dataset_id) 

80function creates a new dataset which reads from the registered dataset. 

81The 

82[distribute](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/distribute) 

83function wraps `register_dataset` and `from_dataset_id` into a single convenient 

84transformation which registers its input dataset and then reads from it. 

85`distribute` enables tf.data service to be used with a one-line code change. 

86However, it assumes that the dataset is created and consumed by the same entity 

87and this assumption might not always be valid or desirable. In particular, in 

88certain scenarios, such as distributed training, it might be desirable to 

89decouple the creation and consumption of the dataset (via `register_dataset` 

90and `from_dataset_id` respectively) to avoid having to create the dataset on 

91each of the training workers. 

92 

93### Example 

94 

95#### `distribute` 

96 

97To use the `distribute` transformation, apply the transformation after the 

98prefix of your input pipeline that you would like to be executed using tf.data 

99service (typically at the end). 

100 

101``` 

102dataset = ... # Define your dataset here. 

103# Move dataset processing from the local machine to the tf.data service 

104dataset = dataset.apply( 

105 tf.data.experimental.service.distribute( 

106 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

107 service=FLAGS.tf_data_service_address, 

108 job_name="shared_job")) 

109# Any transformations added after `distribute` will be run on the local machine. 

110dataset = dataset.prefetch(1) 

111``` 

112 

113The above code will create a tf.data service "job", which iterates through the 

114dataset to generate data. To share the data from a job across multiple clients 

115(e.g. when using TPUStrategy or MultiWorkerMirroredStrategy), set a common 

116`job_name` across all clients. 

117 

118#### `register_dataset` and `from_dataset_id` 

119 

120`register_dataset` registers a dataset with the tf.data service, returning a 

121dataset id for the registered dataset. `from_dataset_id` creates a dataset that 

122reads from the registered dataset. These APIs can be used to reduce dataset 

123building time for distributed training. Instead of building the dataset on all 

124training workers, we can build the dataset just once and then register the 

125dataset using `register_dataset`. Then all workers can call `from_dataset_id` 

126without needing to build the dataset themselves. 

127 

128``` 

129dataset = ... # Define your dataset here. 

130dataset_id = tf.data.experimental.service.register_dataset( 

131 service=FLAGS.tf_data_service_address, 

132 dataset=dataset) 

133# Use `from_dataset_id` to create per-worker datasets. 

134per_worker_datasets = {} 

135for worker in workers: 

136 per_worker_datasets[worker] = tf.data.experimental.service.from_dataset_id( 

137 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

138 service=FLAGS.tf_data_service_address, 

139 dataset_id=dataset_id, 

140 job_name="shared_job") 

141``` 

142 

143### Processing Modes 

144 

145`processing_mode` specifies how to shard a dataset among tf.data service 

146workers. tf.data service supports `OFF`, `DYNAMIC`, `FILE`, `DATA`, 

147`FILE_OR_DATA`, `HINT` sharding policies. 

148 

149OFF: No sharding will be performed. The entire input dataset will be processed 

150independently by each of the tf.data service workers. For this reason, it is 

151important to shuffle data (e.g. filenames) non-deterministically, so that each 

152worker will process the elements of the dataset in a different order. This mode 

153can be used to distribute datasets that aren't splittable. 

154 

155If a worker is added or restarted during ShardingPolicy.OFF processing, the 

156worker will instantiate a new copy of the dataset and begin producing data from 

157the beginning. 

158 

159#### Dynamic Sharding 

160 

161DYNAMIC: In this mode, tf.data service divides the dataset into two components: 

162a source component that generates "splits" such as filenames, and a processing 

163component that takes splits and outputs dataset elements. The source component 

164is executed in a centralized fashion by the tf.data service dispatcher, which 

165generates different splits of input data. The processing component is executed 

166in a parallel fashion by the tf.data service workers, each operating on a 

167different set of input data splits. 

168 

169For example, consider the following dataset: 

170 

171``` 

172dataset = tf.data.Dataset.from_tensor_slices(filenames) 

173dataset = dataset.interleave(TFRecordDataset) 

174dataset = dataset.map(preprocess_fn) 

175dataset = dataset.batch(batch_size) 

176dataset = dataset.apply( 

177 tf.data.experimental.service.distribute( 

178 processing_mode=tf.data.experimental.service.ShardingPolicy.DYNAMIC, 

179 ...)) 

180``` 

181 

182The `from_tensor_slices` will be run on the dispatcher, while the `interleave`, 

183`map`, and `batch` will be run on tf.data service workers. The workers will pull 

184filenames from the dispatcher for processing. To process a dataset with 

185dynamic sharding, the dataset must have a splittable source, and all of 

186its transformations must be compatible with splitting. While most sources and 

187transformations support splitting, there are exceptions, such as custom datasets 

188which may not implement the splitting API. Please file a Github issue if you 

189would like to use distributed epoch processing for a currently unsupported 

190dataset source or transformation. 

191 

192If no workers are restarted during training, dynamic sharding mode will visit 

193every example exactly once. If workers are restarted during training, the splits 

194they were processing will not be fully visited. The dispatcher maintains a 

195cursor through the dataset's splits. Assuming fault tolerance is enabled (See 

196"Fault Tolerance" below), the dispatcher will store cursor state in write-ahead 

197logs so that the cursor can be restored in case the dispatcher is restarted 

198mid-training. This provides an at-most-once visitation guarantee in the presence 

199of server restarts. 

200 

201#### Static Sharding 

202 

203The following are static sharding policies. The semantics are similar to 

204`tf.data.experimental.AutoShardPolicy`. These policies require: 

205 

206 * The tf.data service cluster is configured with a fixed list of workers 

207 in DispatcherConfig. 

208 * Each client only reads from the local tf.data service worker. 

209 

210If a worker is restarted while performing static sharding, the worker will 

211begin processing its shard again from the beginning. 

212 

213FILE: Shards by input files (i.e. each worker will get a fixed set of files to 

214process). When this option is selected, make sure that there is at least as 

215many files as workers. If there are fewer input files than workers, a runtime 

216error will be raised. 

217 

218DATA: Shards by elements produced by the dataset. Each worker will process the 

219whole dataset and discard the portion that is not for itself. Note that for 

220this mode to correctly partition the dataset elements, the dataset needs to 

221produce elements in a deterministic order. 

222 

223FILE_OR_DATA: Attempts FILE-based sharding, falling back to DATA-based 

224sharding on failure. 

225 

226HINT: Looks for the presence of `shard(SHARD_HINT, ...)` which is treated as a 

227placeholder to replace with `shard(num_workers, worker_index)`. 

228 

229For backwards compatibility, `processing_mode` may also be set to the strings 

230`"parallel_epochs"` or `"distributed_epoch"`, which are respectively equivalent 

231to `ShardingPolicy.OFF` and `ShardingPolicy.DYNAMIC`. 

232 

233### Coordinated Data Read 

234 

235By default, when multiple consumers read from the same job, they receive data on 

236a first-come first-served basis. In some use cases, it is advantageous to 

237coordinate the consumers. At each step, consumers read data from the same 

238worker. 

239 

240For example, the tf.data service can be used to coordinate example sizes across 

241a cluster during synchronous training, so that during each step all replicas 

242train on similar-sized elements. To achieve this, define a dataset which 

243generates rounds of `num_consumers` consecutive similar-sized batches, then 

244enable coordinated reads by setting `consumer_index` and `num_consumers`. 

245 

246NOTE: To keep consumers in sync, coordinated reads require that the dataset have 

247infinite cardinality. You can get this by adding `.repeat()` at the end of the 

248dataset definition. 

249 

250### Jobs 

251 

252A tf.data service "job" refers to the process of reading from a dataset managed 

253by the tf.data service, using one or more data consumers. Jobs are created when 

254iterating over datasets that read from tf.data service. The data produced by a 

255job is determined by (1) dataset associated with the job and (2) the job's 

256processing mode. For example, if a job is created for the dataset 

257`Dataset.range(5)`, and the processing mode is `ShardingPolicy.OFF`, each 

258tf.data worker will produce the elements `{0, 1, 2, 3, 4}` for the job, 

259resulting in the 

260job producing `5 * num_workers` elements. If the processing mode is 

261`ShardingPolicy.DYNAMIC`, the job will only produce `5` elements. 

262 

263One or more consumers can consume data from a job. By default, jobs are 

264"anonymous", meaning that only the consumer which created the job can read from 

265it. To share the output of a job across multiple consumers, you can set a common 

266`job_name`. 

267 

268### Fault Tolerance 

269 

270By default, the tf.data dispatch server stores its state in-memory, making it a 

271single point of failure during training. To avoid this, pass 

272`fault_tolerant_mode=True` when creating your `DispatchServer`. Dispatcher 

273fault tolerance requires `work_dir` to be configured and accessible from the 

274dispatcher both before and after restart (e.g. a GCS path). With fault tolerant 

275mode enabled, the dispatcher will journal its state to the work directory so 

276that no state is lost when the dispatcher is restarted. 

277 

278WorkerServers may be freely restarted, added, or removed during training. At 

279startup, workers will register with the dispatcher and begin processing all 

280outstanding jobs from the beginning. 

281 

282### Usage with tf.distribute 

283 

284tf.distribute is the TensorFlow API for distributed training. There are 

285several ways to use tf.data with tf.distribute: 

286`strategy.experimental_distribute_dataset`, 

287`strategy.distribute_datasets_from_function`, and (for PSStrategy) 

288`coordinator.create_per_worker_dataset`. The following sections give code 

289examples for each. 

290 

291In general we recommend using 

292`tf.data.experimental.service.{register_dataset,from_dataset_id}` over 

293`tf.data.experimental.service.distribute` for two reasons: 

294 

295- The dataset only needs to be constructed and optimized once, instead of once 

296 per worker. This can significantly reduce startup time, because the current 

297 `experimental_distribute_dataset` and `distribute_datasets_from_function` 

298 implementations create and optimize worker datasets sequentially. 

299- If a dataset depends on lookup tables or variables that are only present on 

300 one host, the dataset needs to be registered from that host. Typically this 

301 only happens when resources are placed on the chief or worker 0. Registering 

302 the dataset from the chief will avoid issues with depending on remote 

303 resources. 

304 

305#### strategy.experimental_distribute_dataset 

306 

307Nothing special is required when using 

308`strategy.experimental_distribute_dataset`, just apply `register_dataset` and 

309`from_dataset_id` as above, making sure to specify a `job_name` so that all 

310workers consume from the same tf.data service job. 

311 

312``` 

313dataset = ... # Define your dataset here. 

314dataset_id = tf.data.experimental.service.register_dataset( 

315 service=FLAGS.tf_data_service_address, 

316 dataset=dataset) 

317dataset = tf.data.experimental.service.from_dataset_id( 

318 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

319 service=FLAGS.tf_data_service_address, 

320 dataset_id=dataset_id, 

321 job_name="shared_job") 

322 

323dataset = strategy.experimental_distribute_dataset(dataset) 

324``` 

325 

326#### strategy.distribute_datasets_from_function 

327 

328First, make sure the dataset produced by the `dataset_fn` does not depend on the 

329`input_context` for the training worker on which it is run. Instead of each 

330worker building its own (sharded) dataset, one worker should register an 

331unsharded dataset, and the remaining workers should consume data from that 

332dataset. 

333 

334``` 

335dataset = dataset_fn() 

336dataset_id = tf.data.experimental.service.register_dataset( 

337 service=FLAGS.tf_data_service_address, 

338 dataset=dataset) 

339 

340def new_dataset_fn(input_context): 

341 del input_context 

342 return tf.data.experimental.service.from_dataset_id( 

343 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

344 service=FLAGS.tf_data_service_address, 

345 dataset_id=dataset_id, 

346 job_name="shared_job") 

347 

348dataset = strategy.distribute_datasets_from_function(new_dataset_fn) 

349``` 

350 

351#### coordinator.create_per_worker_dataset 

352 

353`create_per_worker_dataset` works the same as 

354`distribute_datasets_from_function`. 

355 

356``` 

357dataset = dataset_fn() 

358dataset_id = tf.data.experimental.service.register_dataset( 

359 service=FLAGS.tf_data_service_address, 

360 dataset=dataset) 

361 

362def new_dataset_fn(input_context): 

363 del input_context 

364 return tf.data.experimental.service.from_dataset_id( 

365 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

366 service=FLAGS.tf_data_service_address, 

367 dataset_id=dataset_id, 

368 job_name="shared_job") 

369 

370dataset = coordinator.create_per_worker_dataset(new_dataset_fn) 

371``` 

372 

373### Sharing tf.data service with concurrent trainers 

374 

375If you run multiple trainers concurrently using the same training data, it could 

376save resources to cache the data in one tf.data service cluster and share the 

377cluster with the trainers. For example, if you use Vizier to tune 

378hyperparameters, the Vizier jobs can run concurrently and share one tf.data 

379service cluster. 

380 

381To enable this feature, each trainer needs to generate a unique trainer ID, and 

382you pass the trainer ID to `tf.data.experimental.service.distribute`. Once a job 

383has consumed data, the data remains in the cache and is re-used by jobs with 

384different `trainer_id`s. Requests with the same `trainer_id` do not re-use data. 

385For example: 

386 

387``` 

388dataset = expensive_computation() 

389dataset = dataset.apply(tf.data.experimental.service.distribute( 

390 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

391 service=FLAGS.tf_data_service_address, 

392 job_name="job", 

393 cross_trainer_cache=data_service_ops.CrossTrainerCache( 

394 trainer_id=trainer_id()))) 

395``` 

396 

397tf.data service uses a sliding-window cache to store shared data. When one 

398trainer consumes data, the data remains in the cache. When other trainers need 

399data, they can get data from the cache instead of repeating the expensive 

400computation. The cache has a bounded size, so some workers may not read the full 

401dataset. To ensure all the trainers get sufficient training data, we require the 

402input dataset to be infinite. This can be achieved, for example, by repeating 

403the dataset and performing random augmentation on the training instances. 

404 

405## Limitations 

406 

407- Python-based data processing: Datasets which use Python-based data processing 

408 (e.g. `tf.py_function`, `tf.numpy_function`, or 

409 `tf.data.Dataset.from_generator`) are currently not supported. 

410- Non-Serializable Resources: Datasets may only depend on TF resources that 

411 support serialization. Serialization is currently supported for lookup 

412 tables and variables. If your dataset depends on a TF resource that cannot be 

413 serialized, please file a Github issue. 

414- Remote Resources: If a dataset depends on a resource, the dataset must be 

415 registered from the same process that created the resource (e.g. the "chief" 

416 job of ParameterServerStrategy). 

417""" 

418 

419from tensorflow.python.data.experimental.ops.data_service_ops import distribute 

420from tensorflow.python.data.experimental.ops.data_service_ops import from_dataset_id 

421from tensorflow.python.data.experimental.ops.data_service_ops import register_dataset 

422from tensorflow.python.data.experimental.ops.data_service_ops import ShardingPolicy 

423from tensorflow.python.data.experimental.service.server_lib import DispatcherConfig 

424from tensorflow.python.data.experimental.service.server_lib import DispatchServer 

425from tensorflow.python.data.experimental.service.server_lib import WorkerConfig 

426from tensorflow.python.data.experimental.service.server_lib import WorkerServer