Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/_api/v2/data/experimental/service/__init__.py: 100%

11 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# This file is MACHINE GENERATED! Do not edit. 

2# Generated by: tensorflow/python/tools/api/generator/create_python_api.py script. 

3"""API for using the tf.data service. 

4 

5This module contains: 

6 

71. tf.data server implementations for running the tf.data service. 

82. APIs for registering datasets with the tf.data service and reading from 

9 the registered datasets. 

10 

11The tf.data service provides the following benefits: 

12 

13- Horizontal scaling of tf.data input pipeline processing to solve input 

14 bottlenecks. 

15- Data coordination for distributed training. Coordinated reads 

16 enable all replicas to train on similar-length examples across each global 

17 training step, improving step times in synchronous training. 

18- Dynamic balancing of data across training replicas. 

19 

20>>> dispatcher = tf.data.experimental.service.DispatchServer() 

21>>> dispatcher_address = dispatcher.target.split("://")[1] 

22>>> worker = tf.data.experimental.service.WorkerServer( 

23... tf.data.experimental.service.WorkerConfig( 

24... dispatcher_address=dispatcher_address)) 

25>>> dataset = tf.data.Dataset.range(10) 

26>>> dataset = dataset.apply(tf.data.experimental.service.distribute( 

27... processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

28... service=dispatcher.target)) 

29>>> print(list(dataset.as_numpy_iterator())) 

30[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

31 

32## Setup 

33 

34This section goes over how to set up the tf.data service. 

35 

36### Run tf.data servers 

37 

38The tf.data service consists of one dispatch server and `n` worker servers. 

39tf.data servers should be brought up alongside your training jobs, then brought 

40down when the jobs are finished. 

41Use `tf.data.experimental.service.DispatchServer` to start a dispatch server, 

42and `tf.data.experimental.service.WorkerServer` to start worker servers. Servers 

43can be run in the same process for testing purposes, or scaled up on separate 

44machines. 

45 

46See https://github.com/tensorflow/ecosystem/tree/master/data_service for an 

47example of using Google Kubernetes Engine (GKE) to manage the tf.data service. 

48Note that the server implementation in 

49[tf_std_data_server.py](https://github.com/tensorflow/ecosystem/blob/master/data_service/tf_std_data_server.py) 

50is not GKE-specific, and can be used to run the tf.data service in other 

51contexts. 

52 

53### Custom ops 

54 

55If your dataset uses custom ops, these ops need to be made available to tf.data 

56servers by calling 

57[load_op_library](https://www.tensorflow.org/api_docs/python/tf/load_op_library) 

58from the dispatcher and worker processes at startup. 

59 

60## Usage 

61 

62Users interact with tf.data service by programmatically registering their 

63datasets with tf.data service, then creating datasets that read from the 

64registered datasets. The 

65[register_dataset](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/register_dataset) 

66function registers a dataset, then the 

67[from_dataset_id](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/from_dataset_id) 

68function creates a new dataset which reads from the registered dataset. 

69The 

70[distribute](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/distribute) 

71function wraps `register_dataset` and `from_dataset_id` into a single convenient 

72transformation which registers its input dataset and then reads from it. 

73`distribute` enables tf.data service to be used with a one-line code change. 

74However, it assumes that the dataset is created and consumed by the same entity 

75and this assumption might not always be valid or desirable. In particular, in 

76certain scenarios, such as distributed training, it might be desirable to 

77decouple the creation and consumption of the dataset (via `register_dataset` 

78and `from_dataset_id` respectively) to avoid having to create the dataset on 

79each of the training workers. 

80 

81### Example 

82 

83#### `distribute` 

84 

85To use the `distribute` transformation, apply the transformation after the 

86prefix of your input pipeline that you would like to be executed using tf.data 

87service (typically at the end). 

88 

89``` 

90dataset = ... # Define your dataset here. 

91# Move dataset processing from the local machine to the tf.data service 

92dataset = dataset.apply( 

93 tf.data.experimental.service.distribute( 

94 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

95 service=FLAGS.tf_data_service_address, 

96 job_name="shared_job")) 

97# Any transformations added after `distribute` will be run on the local machine. 

98dataset = dataset.prefetch(1) 

99``` 

100 

101The above code will create a tf.data service "job", which iterates through the 

102dataset to generate data. To share the data from a job across multiple clients 

103(e.g. when using TPUStrategy or MultiWorkerMirroredStrategy), set a common 

104`job_name` across all clients. 

105 

106#### `register_dataset` and `from_dataset_id` 

107 

108`register_dataset` registers a dataset with the tf.data service, returning a 

109dataset id for the registered dataset. `from_dataset_id` creates a dataset that 

110reads from the registered dataset. These APIs can be used to reduce dataset 

111building time for distributed training. Instead of building the dataset on all 

112training workers, we can build the dataset just once and then register the 

113dataset using `register_dataset`. Then all workers can call `from_dataset_id` 

114without needing to build the dataset themselves. 

115 

116``` 

117dataset = ... # Define your dataset here. 

118dataset_id = tf.data.experimental.service.register_dataset( 

119 service=FLAGS.tf_data_service_address, 

120 dataset=dataset) 

121# Use `from_dataset_id` to create per-worker datasets. 

122per_worker_datasets = {} 

123for worker in workers: 

124 per_worker_datasets[worker] = tf.data.experimental.service.from_dataset_id( 

125 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

126 service=FLAGS.tf_data_service_address, 

127 dataset_id=dataset_id, 

128 job_name="shared_job") 

129``` 

130 

131### Processing Modes 

132 

133`processing_mode` specifies how to shard a dataset among tf.data service 

134workers. tf.data service supports `OFF`, `DYNAMIC`, `FILE`, `DATA`, 

135`FILE_OR_DATA`, `HINT` sharding policies. 

136 

137OFF: No sharding will be performed. The entire input dataset will be processed 

138independently by each of the tf.data service workers. For this reason, it is 

139important to shuffle data (e.g. filenames) non-deterministically, so that each 

140worker will process the elements of the dataset in a different order. This mode 

141can be used to distribute datasets that aren't splittable. 

142 

143If a worker is added or restarted during ShardingPolicy.OFF processing, the 

144worker will instantiate a new copy of the dataset and begin producing data from 

145the beginning. 

146 

147#### Dynamic Sharding 

148 

149DYNAMIC: In this mode, tf.data service divides the dataset into two components: 

150a source component that generates "splits" such as filenames, and a processing 

151component that takes splits and outputs dataset elements. The source component 

152is executed in a centralized fashion by the tf.data service dispatcher, which 

153generates different splits of input data. The processing component is executed 

154in a parallel fashion by the tf.data service workers, each operating on a 

155different set of input data splits. 

156 

157For example, consider the following dataset: 

158 

159``` 

160dataset = tf.data.Dataset.from_tensor_slices(filenames) 

161dataset = dataset.interleave(TFRecordDataset) 

162dataset = dataset.map(preprocess_fn) 

163dataset = dataset.batch(batch_size) 

164dataset = dataset.apply( 

165 tf.data.experimental.service.distribute( 

166 processing_mode=tf.data.experimental.service.ShardingPolicy.DYNAMIC, 

167 ...)) 

168``` 

169 

170The `from_tensor_slices` will be run on the dispatcher, while the `interleave`, 

171`map`, and `batch` will be run on tf.data service workers. The workers will pull 

172filenames from the dispatcher for processing. To process a dataset with 

173dynamic sharding, the dataset must have a splittable source, and all of 

174its transformations must be compatible with splitting. While most sources and 

175transformations support splitting, there are exceptions, such as custom datasets 

176which may not implement the splitting API. Please file a Github issue if you 

177would like to use distributed epoch processing for a currently unsupported 

178dataset source or transformation. 

179 

180If no workers are restarted during training, dynamic sharding mode will visit 

181every example exactly once. If workers are restarted during training, the splits 

182they were processing will not be fully visited. The dispatcher maintains a 

183cursor through the dataset's splits. Assuming fault tolerance is enabled (See 

184"Fault Tolerance" below), the dispatcher will store cursor state in write-ahead 

185logs so that the cursor can be restored in case the dispatcher is restarted 

186mid-training. This provides an at-most-once visitation guarantee in the presence 

187of server restarts. 

188 

189#### Static Sharding 

190 

191The following are static sharding policies. The semantics are similar to 

192`tf.data.experimental.AutoShardPolicy`. These policies require: 

193 

194 * The tf.data service cluster is configured with a fixed list of workers 

195 in DispatcherConfig. 

196 * Each client only reads from the local tf.data service worker. 

197 

198If a worker is restarted while performing static sharding, the worker will 

199begin processing its shard again from the beginning. 

200 

201FILE: Shards by input files (i.e. each worker will get a fixed set of files to 

202process). When this option is selected, make sure that there is at least as 

203many files as workers. If there are fewer input files than workers, a runtime 

204error will be raised. 

205 

206DATA: Shards by elements produced by the dataset. Each worker will process the 

207whole dataset and discard the portion that is not for itself. Note that for 

208this mode to correctly partition the dataset elements, the dataset needs to 

209produce elements in a deterministic order. 

210 

211FILE_OR_DATA: Attempts FILE-based sharding, falling back to DATA-based 

212sharding on failure. 

213 

214HINT: Looks for the presence of `shard(SHARD_HINT, ...)` which is treated as a 

215placeholder to replace with `shard(num_workers, worker_index)`. 

216 

217For backwards compatibility, `processing_mode` may also be set to the strings 

218`"parallel_epochs"` or `"distributed_epoch"`, which are respectively equivalent 

219to `ShardingPolicy.OFF` and `ShardingPolicy.DYNAMIC`. 

220 

221### Coordinated Data Read 

222 

223By default, when multiple consumers read from the same job, they receive data on 

224a first-come first-served basis. In some use cases, it is advantageous to 

225coordinate the consumers. At each step, consumers read data from the same 

226worker. 

227 

228For example, the tf.data service can be used to coordinate example sizes across 

229a cluster during synchronous training, so that during each step all replicas 

230train on similar-sized elements. To achieve this, define a dataset which 

231generates rounds of `num_consumers` consecutive similar-sized batches, then 

232enable coordinated reads by setting `consumer_index` and `num_consumers`. 

233 

234NOTE: To keep consumers in sync, coordinated reads require that the dataset have 

235infinite cardinality. You can get this by adding `.repeat()` at the end of the 

236dataset definition. 

237 

238### Jobs 

239 

240A tf.data service "job" refers to the process of reading from a dataset managed 

241by the tf.data service, using one or more data consumers. Jobs are created when 

242iterating over datasets that read from tf.data service. The data produced by a 

243job is determined by (1) dataset associated with the job and (2) the job's 

244processing mode. For example, if a job is created for the dataset 

245`Dataset.range(5)`, and the processing mode is `ShardingPolicy.OFF`, each 

246tf.data worker will produce the elements `{0, 1, 2, 3, 4}` for the job, 

247resulting in the 

248job producing `5 * num_workers` elements. If the processing mode is 

249`ShardingPolicy.DYNAMIC`, the job will only produce `5` elements. 

250 

251One or more consumers can consume data from a job. By default, jobs are 

252"anonymous", meaning that only the consumer which created the job can read from 

253it. To share the output of a job across multiple consumers, you can set a common 

254`job_name`. 

255 

256### Fault Tolerance 

257 

258By default, the tf.data dispatch server stores its state in-memory, making it a 

259single point of failure during training. To avoid this, pass 

260`fault_tolerant_mode=True` when creating your `DispatchServer`. Dispatcher 

261fault tolerance requires `work_dir` to be configured and accessible from the 

262dispatcher both before and after restart (e.g. a GCS path). With fault tolerant 

263mode enabled, the dispatcher will journal its state to the work directory so 

264that no state is lost when the dispatcher is restarted. 

265 

266WorkerServers may be freely restarted, added, or removed during training. At 

267startup, workers will register with the dispatcher and begin processing all 

268outstanding jobs from the beginning. 

269 

270### Usage with tf.distribute 

271 

272tf.distribute is the TensorFlow API for distributed training. There are 

273several ways to use tf.data with tf.distribute: 

274`strategy.experimental_distribute_dataset`, 

275`strategy.distribute_datasets_from_function`, and (for PSStrategy) 

276`coordinator.create_per_worker_dataset`. The following sections give code 

277examples for each. 

278 

279In general we recommend using 

280`tf.data.experimental.service.{register_dataset,from_dataset_id}` over 

281`tf.data.experimental.service.distribute` for two reasons: 

282 

283- The dataset only needs to be constructed and optimized once, instead of once 

284 per worker. This can significantly reduce startup time, because the current 

285 `experimental_distribute_dataset` and `distribute_datasets_from_function` 

286 implementations create and optimize worker datasets sequentially. 

287- If a dataset depends on lookup tables or variables that are only present on 

288 one host, the dataset needs to be registered from that host. Typically this 

289 only happens when resources are placed on the chief or worker 0. Registering 

290 the dataset from the chief will avoid issues with depending on remote 

291 resources. 

292 

293#### strategy.experimental_distribute_dataset 

294 

295Nothing special is required when using 

296`strategy.experimental_distribute_dataset`, just apply `register_dataset` and 

297`from_dataset_id` as above, making sure to specify a `job_name` so that all 

298workers consume from the same tf.data service job. 

299 

300``` 

301dataset = ... # Define your dataset here. 

302dataset_id = tf.data.experimental.service.register_dataset( 

303 service=FLAGS.tf_data_service_address, 

304 dataset=dataset) 

305dataset = tf.data.experimental.service.from_dataset_id( 

306 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

307 service=FLAGS.tf_data_service_address, 

308 dataset_id=dataset_id, 

309 job_name="shared_job") 

310 

311dataset = strategy.experimental_distribute_dataset(dataset) 

312``` 

313 

314#### strategy.distribute_datasets_from_function 

315 

316First, make sure the dataset produced by the `dataset_fn` does not depend on the 

317`input_context` for the training worker on which it is run. Instead of each 

318worker building its own (sharded) dataset, one worker should register an 

319unsharded dataset, and the remaining workers should consume data from that 

320dataset. 

321 

322``` 

323dataset = dataset_fn() 

324dataset_id = tf.data.experimental.service.register_dataset( 

325 service=FLAGS.tf_data_service_address, 

326 dataset=dataset) 

327 

328def new_dataset_fn(input_context): 

329 del input_context 

330 return tf.data.experimental.service.from_dataset_id( 

331 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

332 service=FLAGS.tf_data_service_address, 

333 dataset_id=dataset_id, 

334 job_name="shared_job") 

335 

336dataset = strategy.distribute_datasets_from_function(new_dataset_fn) 

337``` 

338 

339#### coordinator.create_per_worker_dataset 

340 

341`create_per_worker_dataset` works the same as 

342`distribute_datasets_from_function`. 

343 

344``` 

345dataset = dataset_fn() 

346dataset_id = tf.data.experimental.service.register_dataset( 

347 service=FLAGS.tf_data_service_address, 

348 dataset=dataset) 

349 

350def new_dataset_fn(input_context): 

351 del input_context 

352 return tf.data.experimental.service.from_dataset_id( 

353 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

354 service=FLAGS.tf_data_service_address, 

355 dataset_id=dataset_id, 

356 job_name="shared_job") 

357 

358dataset = coordinator.create_per_worker_dataset(new_dataset_fn) 

359``` 

360 

361### Sharing tf.data service with concurrent trainers 

362 

363If you run multiple trainers concurrently using the same training data, it could 

364save resources to cache the data in one tf.data service cluster and share the 

365cluster with the trainers. For example, if you use Vizier to tune 

366hyperparameters, the Vizier jobs can run concurrently and share one tf.data 

367service cluster. 

368 

369To enable this feature, each trainer needs to generate a unique trainer ID, and 

370you pass the trainer ID to `tf.data.experimental.service.distribute`. Once a job 

371has consumed data, the data remains in the cache and is re-used by jobs with 

372different `trainer_id`s. Requests with the same `trainer_id` do not re-use data. 

373For example: 

374 

375``` 

376dataset = expensive_computation() 

377dataset = dataset.apply(tf.data.experimental.service.distribute( 

378 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF, 

379 service=FLAGS.tf_data_service_address, 

380 job_name="job", 

381 cross_trainer_cache=data_service_ops.CrossTrainerCache( 

382 trainer_id=trainer_id()))) 

383``` 

384 

385tf.data service uses a sliding-window cache to store shared data. When one 

386trainer consumes data, the data remains in the cache. When other trainers need 

387data, they can get data from the cache instead of repeating the expensive 

388computation. The cache has a bounded size, so some workers may not read the full 

389dataset. To ensure all the trainers get sufficient training data, we require the 

390input dataset to be infinite. This can be achieved, for example, by repeating 

391the dataset and performing random augmentation on the training instances. 

392 

393## Limitations 

394 

395- Python-based data processing: Datasets which use Python-based data processing 

396 (e.g. `tf.py_function`, `tf.numpy_function`, or 

397 `tf.data.Dataset.from_generator`) are currently not supported. 

398- Non-Serializable Resources: Datasets may only depend on TF resources that 

399 support serialization. Serialization is currently supported for lookup 

400 tables and variables. If your dataset depends on a TF resource that cannot be 

401 serialized, please file a Github issue. 

402- Remote Resources: If a dataset depends on a resource, the dataset must be 

403 registered from the same process that created the resource (e.g. the "chief" 

404 job of ParameterServerStrategy). 

405 

406""" 

407 

408import sys as _sys 

409 

410from tensorflow.python.data.experimental.ops.data_service_ops import CrossTrainerCache 

411from tensorflow.python.data.experimental.ops.data_service_ops import ShardingPolicy 

412from tensorflow.python.data.experimental.ops.data_service_ops import distribute 

413from tensorflow.python.data.experimental.ops.data_service_ops import from_dataset_id 

414from tensorflow.python.data.experimental.ops.data_service_ops import register_dataset 

415from tensorflow.python.data.experimental.service.server_lib import DispatchServer 

416from tensorflow.python.data.experimental.service.server_lib import DispatcherConfig 

417from tensorflow.python.data.experimental.service.server_lib import WorkerConfig 

418from tensorflow.python.data.experimental.service.server_lib import WorkerServer