Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/_api/v2/data/experimental/service/__init__.py: 100%
11 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# This file is MACHINE GENERATED! Do not edit.
2# Generated by: tensorflow/python/tools/api/generator/create_python_api.py script.
3"""API for using the tf.data service.
5This module contains:
71. tf.data server implementations for running the tf.data service.
82. APIs for registering datasets with the tf.data service and reading from
9 the registered datasets.
11The tf.data service provides the following benefits:
13- Horizontal scaling of tf.data input pipeline processing to solve input
14 bottlenecks.
15- Data coordination for distributed training. Coordinated reads
16 enable all replicas to train on similar-length examples across each global
17 training step, improving step times in synchronous training.
18- Dynamic balancing of data across training replicas.
20>>> dispatcher = tf.data.experimental.service.DispatchServer()
21>>> dispatcher_address = dispatcher.target.split("://")[1]
22>>> worker = tf.data.experimental.service.WorkerServer(
23... tf.data.experimental.service.WorkerConfig(
24... dispatcher_address=dispatcher_address))
25>>> dataset = tf.data.Dataset.range(10)
26>>> dataset = dataset.apply(tf.data.experimental.service.distribute(
27... processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
28... service=dispatcher.target))
29>>> print(list(dataset.as_numpy_iterator()))
30[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
32## Setup
34This section goes over how to set up the tf.data service.
36### Run tf.data servers
38The tf.data service consists of one dispatch server and `n` worker servers.
39tf.data servers should be brought up alongside your training jobs, then brought
40down when the jobs are finished.
41Use `tf.data.experimental.service.DispatchServer` to start a dispatch server,
42and `tf.data.experimental.service.WorkerServer` to start worker servers. Servers
43can be run in the same process for testing purposes, or scaled up on separate
44machines.
46See https://github.com/tensorflow/ecosystem/tree/master/data_service for an
47example of using Google Kubernetes Engine (GKE) to manage the tf.data service.
48Note that the server implementation in
49[tf_std_data_server.py](https://github.com/tensorflow/ecosystem/blob/master/data_service/tf_std_data_server.py)
50is not GKE-specific, and can be used to run the tf.data service in other
51contexts.
53### Custom ops
55If your dataset uses custom ops, these ops need to be made available to tf.data
56servers by calling
57[load_op_library](https://www.tensorflow.org/api_docs/python/tf/load_op_library)
58from the dispatcher and worker processes at startup.
60## Usage
62Users interact with tf.data service by programmatically registering their
63datasets with tf.data service, then creating datasets that read from the
64registered datasets. The
65[register_dataset](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/register_dataset)
66function registers a dataset, then the
67[from_dataset_id](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/from_dataset_id)
68function creates a new dataset which reads from the registered dataset.
69The
70[distribute](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/distribute)
71function wraps `register_dataset` and `from_dataset_id` into a single convenient
72transformation which registers its input dataset and then reads from it.
73`distribute` enables tf.data service to be used with a one-line code change.
74However, it assumes that the dataset is created and consumed by the same entity
75and this assumption might not always be valid or desirable. In particular, in
76certain scenarios, such as distributed training, it might be desirable to
77decouple the creation and consumption of the dataset (via `register_dataset`
78and `from_dataset_id` respectively) to avoid having to create the dataset on
79each of the training workers.
81### Example
83#### `distribute`
85To use the `distribute` transformation, apply the transformation after the
86prefix of your input pipeline that you would like to be executed using tf.data
87service (typically at the end).
89```
90dataset = ... # Define your dataset here.
91# Move dataset processing from the local machine to the tf.data service
92dataset = dataset.apply(
93 tf.data.experimental.service.distribute(
94 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
95 service=FLAGS.tf_data_service_address,
96 job_name="shared_job"))
97# Any transformations added after `distribute` will be run on the local machine.
98dataset = dataset.prefetch(1)
99```
101The above code will create a tf.data service "job", which iterates through the
102dataset to generate data. To share the data from a job across multiple clients
103(e.g. when using TPUStrategy or MultiWorkerMirroredStrategy), set a common
104`job_name` across all clients.
106#### `register_dataset` and `from_dataset_id`
108`register_dataset` registers a dataset with the tf.data service, returning a
109dataset id for the registered dataset. `from_dataset_id` creates a dataset that
110reads from the registered dataset. These APIs can be used to reduce dataset
111building time for distributed training. Instead of building the dataset on all
112training workers, we can build the dataset just once and then register the
113dataset using `register_dataset`. Then all workers can call `from_dataset_id`
114without needing to build the dataset themselves.
116```
117dataset = ... # Define your dataset here.
118dataset_id = tf.data.experimental.service.register_dataset(
119 service=FLAGS.tf_data_service_address,
120 dataset=dataset)
121# Use `from_dataset_id` to create per-worker datasets.
122per_worker_datasets = {}
123for worker in workers:
124 per_worker_datasets[worker] = tf.data.experimental.service.from_dataset_id(
125 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
126 service=FLAGS.tf_data_service_address,
127 dataset_id=dataset_id,
128 job_name="shared_job")
129```
131### Processing Modes
133`processing_mode` specifies how to shard a dataset among tf.data service
134workers. tf.data service supports `OFF`, `DYNAMIC`, `FILE`, `DATA`,
135`FILE_OR_DATA`, `HINT` sharding policies.
137OFF: No sharding will be performed. The entire input dataset will be processed
138independently by each of the tf.data service workers. For this reason, it is
139important to shuffle data (e.g. filenames) non-deterministically, so that each
140worker will process the elements of the dataset in a different order. This mode
141can be used to distribute datasets that aren't splittable.
143If a worker is added or restarted during ShardingPolicy.OFF processing, the
144worker will instantiate a new copy of the dataset and begin producing data from
145the beginning.
147#### Dynamic Sharding
149DYNAMIC: In this mode, tf.data service divides the dataset into two components:
150a source component that generates "splits" such as filenames, and a processing
151component that takes splits and outputs dataset elements. The source component
152is executed in a centralized fashion by the tf.data service dispatcher, which
153generates different splits of input data. The processing component is executed
154in a parallel fashion by the tf.data service workers, each operating on a
155different set of input data splits.
157For example, consider the following dataset:
159```
160dataset = tf.data.Dataset.from_tensor_slices(filenames)
161dataset = dataset.interleave(TFRecordDataset)
162dataset = dataset.map(preprocess_fn)
163dataset = dataset.batch(batch_size)
164dataset = dataset.apply(
165 tf.data.experimental.service.distribute(
166 processing_mode=tf.data.experimental.service.ShardingPolicy.DYNAMIC,
167 ...))
168```
170The `from_tensor_slices` will be run on the dispatcher, while the `interleave`,
171`map`, and `batch` will be run on tf.data service workers. The workers will pull
172filenames from the dispatcher for processing. To process a dataset with
173dynamic sharding, the dataset must have a splittable source, and all of
174its transformations must be compatible with splitting. While most sources and
175transformations support splitting, there are exceptions, such as custom datasets
176which may not implement the splitting API. Please file a Github issue if you
177would like to use distributed epoch processing for a currently unsupported
178dataset source or transformation.
180If no workers are restarted during training, dynamic sharding mode will visit
181every example exactly once. If workers are restarted during training, the splits
182they were processing will not be fully visited. The dispatcher maintains a
183cursor through the dataset's splits. Assuming fault tolerance is enabled (See
184"Fault Tolerance" below), the dispatcher will store cursor state in write-ahead
185logs so that the cursor can be restored in case the dispatcher is restarted
186mid-training. This provides an at-most-once visitation guarantee in the presence
187of server restarts.
189#### Static Sharding
191The following are static sharding policies. The semantics are similar to
192`tf.data.experimental.AutoShardPolicy`. These policies require:
194 * The tf.data service cluster is configured with a fixed list of workers
195 in DispatcherConfig.
196 * Each client only reads from the local tf.data service worker.
198If a worker is restarted while performing static sharding, the worker will
199begin processing its shard again from the beginning.
201FILE: Shards by input files (i.e. each worker will get a fixed set of files to
202process). When this option is selected, make sure that there is at least as
203many files as workers. If there are fewer input files than workers, a runtime
204error will be raised.
206DATA: Shards by elements produced by the dataset. Each worker will process the
207whole dataset and discard the portion that is not for itself. Note that for
208this mode to correctly partition the dataset elements, the dataset needs to
209produce elements in a deterministic order.
211FILE_OR_DATA: Attempts FILE-based sharding, falling back to DATA-based
212sharding on failure.
214HINT: Looks for the presence of `shard(SHARD_HINT, ...)` which is treated as a
215placeholder to replace with `shard(num_workers, worker_index)`.
217For backwards compatibility, `processing_mode` may also be set to the strings
218`"parallel_epochs"` or `"distributed_epoch"`, which are respectively equivalent
219to `ShardingPolicy.OFF` and `ShardingPolicy.DYNAMIC`.
221### Coordinated Data Read
223By default, when multiple consumers read from the same job, they receive data on
224a first-come first-served basis. In some use cases, it is advantageous to
225coordinate the consumers. At each step, consumers read data from the same
226worker.
228For example, the tf.data service can be used to coordinate example sizes across
229a cluster during synchronous training, so that during each step all replicas
230train on similar-sized elements. To achieve this, define a dataset which
231generates rounds of `num_consumers` consecutive similar-sized batches, then
232enable coordinated reads by setting `consumer_index` and `num_consumers`.
234NOTE: To keep consumers in sync, coordinated reads require that the dataset have
235infinite cardinality. You can get this by adding `.repeat()` at the end of the
236dataset definition.
238### Jobs
240A tf.data service "job" refers to the process of reading from a dataset managed
241by the tf.data service, using one or more data consumers. Jobs are created when
242iterating over datasets that read from tf.data service. The data produced by a
243job is determined by (1) dataset associated with the job and (2) the job's
244processing mode. For example, if a job is created for the dataset
245`Dataset.range(5)`, and the processing mode is `ShardingPolicy.OFF`, each
246tf.data worker will produce the elements `{0, 1, 2, 3, 4}` for the job,
247resulting in the
248job producing `5 * num_workers` elements. If the processing mode is
249`ShardingPolicy.DYNAMIC`, the job will only produce `5` elements.
251One or more consumers can consume data from a job. By default, jobs are
252"anonymous", meaning that only the consumer which created the job can read from
253it. To share the output of a job across multiple consumers, you can set a common
254`job_name`.
256### Fault Tolerance
258By default, the tf.data dispatch server stores its state in-memory, making it a
259single point of failure during training. To avoid this, pass
260`fault_tolerant_mode=True` when creating your `DispatchServer`. Dispatcher
261fault tolerance requires `work_dir` to be configured and accessible from the
262dispatcher both before and after restart (e.g. a GCS path). With fault tolerant
263mode enabled, the dispatcher will journal its state to the work directory so
264that no state is lost when the dispatcher is restarted.
266WorkerServers may be freely restarted, added, or removed during training. At
267startup, workers will register with the dispatcher and begin processing all
268outstanding jobs from the beginning.
270### Usage with tf.distribute
272tf.distribute is the TensorFlow API for distributed training. There are
273several ways to use tf.data with tf.distribute:
274`strategy.experimental_distribute_dataset`,
275`strategy.distribute_datasets_from_function`, and (for PSStrategy)
276`coordinator.create_per_worker_dataset`. The following sections give code
277examples for each.
279In general we recommend using
280`tf.data.experimental.service.{register_dataset,from_dataset_id}` over
281`tf.data.experimental.service.distribute` for two reasons:
283- The dataset only needs to be constructed and optimized once, instead of once
284 per worker. This can significantly reduce startup time, because the current
285 `experimental_distribute_dataset` and `distribute_datasets_from_function`
286 implementations create and optimize worker datasets sequentially.
287- If a dataset depends on lookup tables or variables that are only present on
288 one host, the dataset needs to be registered from that host. Typically this
289 only happens when resources are placed on the chief or worker 0. Registering
290 the dataset from the chief will avoid issues with depending on remote
291 resources.
293#### strategy.experimental_distribute_dataset
295Nothing special is required when using
296`strategy.experimental_distribute_dataset`, just apply `register_dataset` and
297`from_dataset_id` as above, making sure to specify a `job_name` so that all
298workers consume from the same tf.data service job.
300```
301dataset = ... # Define your dataset here.
302dataset_id = tf.data.experimental.service.register_dataset(
303 service=FLAGS.tf_data_service_address,
304 dataset=dataset)
305dataset = tf.data.experimental.service.from_dataset_id(
306 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
307 service=FLAGS.tf_data_service_address,
308 dataset_id=dataset_id,
309 job_name="shared_job")
311dataset = strategy.experimental_distribute_dataset(dataset)
312```
314#### strategy.distribute_datasets_from_function
316First, make sure the dataset produced by the `dataset_fn` does not depend on the
317`input_context` for the training worker on which it is run. Instead of each
318worker building its own (sharded) dataset, one worker should register an
319unsharded dataset, and the remaining workers should consume data from that
320dataset.
322```
323dataset = dataset_fn()
324dataset_id = tf.data.experimental.service.register_dataset(
325 service=FLAGS.tf_data_service_address,
326 dataset=dataset)
328def new_dataset_fn(input_context):
329 del input_context
330 return tf.data.experimental.service.from_dataset_id(
331 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
332 service=FLAGS.tf_data_service_address,
333 dataset_id=dataset_id,
334 job_name="shared_job")
336dataset = strategy.distribute_datasets_from_function(new_dataset_fn)
337```
339#### coordinator.create_per_worker_dataset
341`create_per_worker_dataset` works the same as
342`distribute_datasets_from_function`.
344```
345dataset = dataset_fn()
346dataset_id = tf.data.experimental.service.register_dataset(
347 service=FLAGS.tf_data_service_address,
348 dataset=dataset)
350def new_dataset_fn(input_context):
351 del input_context
352 return tf.data.experimental.service.from_dataset_id(
353 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
354 service=FLAGS.tf_data_service_address,
355 dataset_id=dataset_id,
356 job_name="shared_job")
358dataset = coordinator.create_per_worker_dataset(new_dataset_fn)
359```
361### Sharing tf.data service with concurrent trainers
363If you run multiple trainers concurrently using the same training data, it could
364save resources to cache the data in one tf.data service cluster and share the
365cluster with the trainers. For example, if you use Vizier to tune
366hyperparameters, the Vizier jobs can run concurrently and share one tf.data
367service cluster.
369To enable this feature, each trainer needs to generate a unique trainer ID, and
370you pass the trainer ID to `tf.data.experimental.service.distribute`. Once a job
371has consumed data, the data remains in the cache and is re-used by jobs with
372different `trainer_id`s. Requests with the same `trainer_id` do not re-use data.
373For example:
375```
376dataset = expensive_computation()
377dataset = dataset.apply(tf.data.experimental.service.distribute(
378 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
379 service=FLAGS.tf_data_service_address,
380 job_name="job",
381 cross_trainer_cache=data_service_ops.CrossTrainerCache(
382 trainer_id=trainer_id())))
383```
385tf.data service uses a sliding-window cache to store shared data. When one
386trainer consumes data, the data remains in the cache. When other trainers need
387data, they can get data from the cache instead of repeating the expensive
388computation. The cache has a bounded size, so some workers may not read the full
389dataset. To ensure all the trainers get sufficient training data, we require the
390input dataset to be infinite. This can be achieved, for example, by repeating
391the dataset and performing random augmentation on the training instances.
393## Limitations
395- Python-based data processing: Datasets which use Python-based data processing
396 (e.g. `tf.py_function`, `tf.numpy_function`, or
397 `tf.data.Dataset.from_generator`) are currently not supported.
398- Non-Serializable Resources: Datasets may only depend on TF resources that
399 support serialization. Serialization is currently supported for lookup
400 tables and variables. If your dataset depends on a TF resource that cannot be
401 serialized, please file a Github issue.
402- Remote Resources: If a dataset depends on a resource, the dataset must be
403 registered from the same process that created the resource (e.g. the "chief"
404 job of ParameterServerStrategy).
406"""
408import sys as _sys
410from tensorflow.python.data.experimental.ops.data_service_ops import CrossTrainerCache
411from tensorflow.python.data.experimental.ops.data_service_ops import ShardingPolicy
412from tensorflow.python.data.experimental.ops.data_service_ops import distribute
413from tensorflow.python.data.experimental.ops.data_service_ops import from_dataset_id
414from tensorflow.python.data.experimental.ops.data_service_ops import register_dataset
415from tensorflow.python.data.experimental.service.server_lib import DispatchServer
416from tensorflow.python.data.experimental.service.server_lib import DispatcherConfig
417from tensorflow.python.data.experimental.service.server_lib import WorkerConfig
418from tensorflow.python.data.experimental.service.server_lib import WorkerServer