Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/data/experimental/service/__init__.py: 100%
9 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""API for using the tf.data service.
17This module contains:
191. tf.data server implementations for running the tf.data service.
202. APIs for registering datasets with the tf.data service and reading from
21 the registered datasets.
23The tf.data service provides the following benefits:
25- Horizontal scaling of tf.data input pipeline processing to solve input
26 bottlenecks.
27- Data coordination for distributed training. Coordinated reads
28 enable all replicas to train on similar-length examples across each global
29 training step, improving step times in synchronous training.
30- Dynamic balancing of data across training replicas.
32>>> dispatcher = tf.data.experimental.service.DispatchServer()
33>>> dispatcher_address = dispatcher.target.split("://")[1]
34>>> worker = tf.data.experimental.service.WorkerServer(
35... tf.data.experimental.service.WorkerConfig(
36... dispatcher_address=dispatcher_address))
37>>> dataset = tf.data.Dataset.range(10)
38>>> dataset = dataset.apply(tf.data.experimental.service.distribute(
39... processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
40... service=dispatcher.target))
41>>> print(list(dataset.as_numpy_iterator()))
42[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
44## Setup
46This section goes over how to set up the tf.data service.
48### Run tf.data servers
50The tf.data service consists of one dispatch server and `n` worker servers.
51tf.data servers should be brought up alongside your training jobs, then brought
52down when the jobs are finished.
53Use `tf.data.experimental.service.DispatchServer` to start a dispatch server,
54and `tf.data.experimental.service.WorkerServer` to start worker servers. Servers
55can be run in the same process for testing purposes, or scaled up on separate
56machines.
58See https://github.com/tensorflow/ecosystem/tree/master/data_service for an
59example of using Google Kubernetes Engine (GKE) to manage the tf.data service.
60Note that the server implementation in
61[tf_std_data_server.py](https://github.com/tensorflow/ecosystem/blob/master/data_service/tf_std_data_server.py)
62is not GKE-specific, and can be used to run the tf.data service in other
63contexts.
65### Custom ops
67If your dataset uses custom ops, these ops need to be made available to tf.data
68servers by calling
69[load_op_library](https://www.tensorflow.org/api_docs/python/tf/load_op_library)
70from the dispatcher and worker processes at startup.
72## Usage
74Users interact with tf.data service by programmatically registering their
75datasets with tf.data service, then creating datasets that read from the
76registered datasets. The
77[register_dataset](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/register_dataset)
78function registers a dataset, then the
79[from_dataset_id](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/from_dataset_id)
80function creates a new dataset which reads from the registered dataset.
81The
82[distribute](https://www.tensorflow.org/api_docs/python/tf/data/experimental/service/distribute)
83function wraps `register_dataset` and `from_dataset_id` into a single convenient
84transformation which registers its input dataset and then reads from it.
85`distribute` enables tf.data service to be used with a one-line code change.
86However, it assumes that the dataset is created and consumed by the same entity
87and this assumption might not always be valid or desirable. In particular, in
88certain scenarios, such as distributed training, it might be desirable to
89decouple the creation and consumption of the dataset (via `register_dataset`
90and `from_dataset_id` respectively) to avoid having to create the dataset on
91each of the training workers.
93### Example
95#### `distribute`
97To use the `distribute` transformation, apply the transformation after the
98prefix of your input pipeline that you would like to be executed using tf.data
99service (typically at the end).
101```
102dataset = ... # Define your dataset here.
103# Move dataset processing from the local machine to the tf.data service
104dataset = dataset.apply(
105 tf.data.experimental.service.distribute(
106 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
107 service=FLAGS.tf_data_service_address,
108 job_name="shared_job"))
109# Any transformations added after `distribute` will be run on the local machine.
110dataset = dataset.prefetch(1)
111```
113The above code will create a tf.data service "job", which iterates through the
114dataset to generate data. To share the data from a job across multiple clients
115(e.g. when using TPUStrategy or MultiWorkerMirroredStrategy), set a common
116`job_name` across all clients.
118#### `register_dataset` and `from_dataset_id`
120`register_dataset` registers a dataset with the tf.data service, returning a
121dataset id for the registered dataset. `from_dataset_id` creates a dataset that
122reads from the registered dataset. These APIs can be used to reduce dataset
123building time for distributed training. Instead of building the dataset on all
124training workers, we can build the dataset just once and then register the
125dataset using `register_dataset`. Then all workers can call `from_dataset_id`
126without needing to build the dataset themselves.
128```
129dataset = ... # Define your dataset here.
130dataset_id = tf.data.experimental.service.register_dataset(
131 service=FLAGS.tf_data_service_address,
132 dataset=dataset)
133# Use `from_dataset_id` to create per-worker datasets.
134per_worker_datasets = {}
135for worker in workers:
136 per_worker_datasets[worker] = tf.data.experimental.service.from_dataset_id(
137 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
138 service=FLAGS.tf_data_service_address,
139 dataset_id=dataset_id,
140 job_name="shared_job")
141```
143### Processing Modes
145`processing_mode` specifies how to shard a dataset among tf.data service
146workers. tf.data service supports `OFF`, `DYNAMIC`, `FILE`, `DATA`,
147`FILE_OR_DATA`, `HINT` sharding policies.
149OFF: No sharding will be performed. The entire input dataset will be processed
150independently by each of the tf.data service workers. For this reason, it is
151important to shuffle data (e.g. filenames) non-deterministically, so that each
152worker will process the elements of the dataset in a different order. This mode
153can be used to distribute datasets that aren't splittable.
155If a worker is added or restarted during ShardingPolicy.OFF processing, the
156worker will instantiate a new copy of the dataset and begin producing data from
157the beginning.
159#### Dynamic Sharding
161DYNAMIC: In this mode, tf.data service divides the dataset into two components:
162a source component that generates "splits" such as filenames, and a processing
163component that takes splits and outputs dataset elements. The source component
164is executed in a centralized fashion by the tf.data service dispatcher, which
165generates different splits of input data. The processing component is executed
166in a parallel fashion by the tf.data service workers, each operating on a
167different set of input data splits.
169For example, consider the following dataset:
171```
172dataset = tf.data.Dataset.from_tensor_slices(filenames)
173dataset = dataset.interleave(TFRecordDataset)
174dataset = dataset.map(preprocess_fn)
175dataset = dataset.batch(batch_size)
176dataset = dataset.apply(
177 tf.data.experimental.service.distribute(
178 processing_mode=tf.data.experimental.service.ShardingPolicy.DYNAMIC,
179 ...))
180```
182The `from_tensor_slices` will be run on the dispatcher, while the `interleave`,
183`map`, and `batch` will be run on tf.data service workers. The workers will pull
184filenames from the dispatcher for processing. To process a dataset with
185dynamic sharding, the dataset must have a splittable source, and all of
186its transformations must be compatible with splitting. While most sources and
187transformations support splitting, there are exceptions, such as custom datasets
188which may not implement the splitting API. Please file a Github issue if you
189would like to use distributed epoch processing for a currently unsupported
190dataset source or transformation.
192If no workers are restarted during training, dynamic sharding mode will visit
193every example exactly once. If workers are restarted during training, the splits
194they were processing will not be fully visited. The dispatcher maintains a
195cursor through the dataset's splits. Assuming fault tolerance is enabled (See
196"Fault Tolerance" below), the dispatcher will store cursor state in write-ahead
197logs so that the cursor can be restored in case the dispatcher is restarted
198mid-training. This provides an at-most-once visitation guarantee in the presence
199of server restarts.
201#### Static Sharding
203The following are static sharding policies. The semantics are similar to
204`tf.data.experimental.AutoShardPolicy`. These policies require:
206 * The tf.data service cluster is configured with a fixed list of workers
207 in DispatcherConfig.
208 * Each client only reads from the local tf.data service worker.
210If a worker is restarted while performing static sharding, the worker will
211begin processing its shard again from the beginning.
213FILE: Shards by input files (i.e. each worker will get a fixed set of files to
214process). When this option is selected, make sure that there is at least as
215many files as workers. If there are fewer input files than workers, a runtime
216error will be raised.
218DATA: Shards by elements produced by the dataset. Each worker will process the
219whole dataset and discard the portion that is not for itself. Note that for
220this mode to correctly partition the dataset elements, the dataset needs to
221produce elements in a deterministic order.
223FILE_OR_DATA: Attempts FILE-based sharding, falling back to DATA-based
224sharding on failure.
226HINT: Looks for the presence of `shard(SHARD_HINT, ...)` which is treated as a
227placeholder to replace with `shard(num_workers, worker_index)`.
229For backwards compatibility, `processing_mode` may also be set to the strings
230`"parallel_epochs"` or `"distributed_epoch"`, which are respectively equivalent
231to `ShardingPolicy.OFF` and `ShardingPolicy.DYNAMIC`.
233### Coordinated Data Read
235By default, when multiple consumers read from the same job, they receive data on
236a first-come first-served basis. In some use cases, it is advantageous to
237coordinate the consumers. At each step, consumers read data from the same
238worker.
240For example, the tf.data service can be used to coordinate example sizes across
241a cluster during synchronous training, so that during each step all replicas
242train on similar-sized elements. To achieve this, define a dataset which
243generates rounds of `num_consumers` consecutive similar-sized batches, then
244enable coordinated reads by setting `consumer_index` and `num_consumers`.
246NOTE: To keep consumers in sync, coordinated reads require that the dataset have
247infinite cardinality. You can get this by adding `.repeat()` at the end of the
248dataset definition.
250### Jobs
252A tf.data service "job" refers to the process of reading from a dataset managed
253by the tf.data service, using one or more data consumers. Jobs are created when
254iterating over datasets that read from tf.data service. The data produced by a
255job is determined by (1) dataset associated with the job and (2) the job's
256processing mode. For example, if a job is created for the dataset
257`Dataset.range(5)`, and the processing mode is `ShardingPolicy.OFF`, each
258tf.data worker will produce the elements `{0, 1, 2, 3, 4}` for the job,
259resulting in the
260job producing `5 * num_workers` elements. If the processing mode is
261`ShardingPolicy.DYNAMIC`, the job will only produce `5` elements.
263One or more consumers can consume data from a job. By default, jobs are
264"anonymous", meaning that only the consumer which created the job can read from
265it. To share the output of a job across multiple consumers, you can set a common
266`job_name`.
268### Fault Tolerance
270By default, the tf.data dispatch server stores its state in-memory, making it a
271single point of failure during training. To avoid this, pass
272`fault_tolerant_mode=True` when creating your `DispatchServer`. Dispatcher
273fault tolerance requires `work_dir` to be configured and accessible from the
274dispatcher both before and after restart (e.g. a GCS path). With fault tolerant
275mode enabled, the dispatcher will journal its state to the work directory so
276that no state is lost when the dispatcher is restarted.
278WorkerServers may be freely restarted, added, or removed during training. At
279startup, workers will register with the dispatcher and begin processing all
280outstanding jobs from the beginning.
282### Usage with tf.distribute
284tf.distribute is the TensorFlow API for distributed training. There are
285several ways to use tf.data with tf.distribute:
286`strategy.experimental_distribute_dataset`,
287`strategy.distribute_datasets_from_function`, and (for PSStrategy)
288`coordinator.create_per_worker_dataset`. The following sections give code
289examples for each.
291In general we recommend using
292`tf.data.experimental.service.{register_dataset,from_dataset_id}` over
293`tf.data.experimental.service.distribute` for two reasons:
295- The dataset only needs to be constructed and optimized once, instead of once
296 per worker. This can significantly reduce startup time, because the current
297 `experimental_distribute_dataset` and `distribute_datasets_from_function`
298 implementations create and optimize worker datasets sequentially.
299- If a dataset depends on lookup tables or variables that are only present on
300 one host, the dataset needs to be registered from that host. Typically this
301 only happens when resources are placed on the chief or worker 0. Registering
302 the dataset from the chief will avoid issues with depending on remote
303 resources.
305#### strategy.experimental_distribute_dataset
307Nothing special is required when using
308`strategy.experimental_distribute_dataset`, just apply `register_dataset` and
309`from_dataset_id` as above, making sure to specify a `job_name` so that all
310workers consume from the same tf.data service job.
312```
313dataset = ... # Define your dataset here.
314dataset_id = tf.data.experimental.service.register_dataset(
315 service=FLAGS.tf_data_service_address,
316 dataset=dataset)
317dataset = tf.data.experimental.service.from_dataset_id(
318 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
319 service=FLAGS.tf_data_service_address,
320 dataset_id=dataset_id,
321 job_name="shared_job")
323dataset = strategy.experimental_distribute_dataset(dataset)
324```
326#### strategy.distribute_datasets_from_function
328First, make sure the dataset produced by the `dataset_fn` does not depend on the
329`input_context` for the training worker on which it is run. Instead of each
330worker building its own (sharded) dataset, one worker should register an
331unsharded dataset, and the remaining workers should consume data from that
332dataset.
334```
335dataset = dataset_fn()
336dataset_id = tf.data.experimental.service.register_dataset(
337 service=FLAGS.tf_data_service_address,
338 dataset=dataset)
340def new_dataset_fn(input_context):
341 del input_context
342 return tf.data.experimental.service.from_dataset_id(
343 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
344 service=FLAGS.tf_data_service_address,
345 dataset_id=dataset_id,
346 job_name="shared_job")
348dataset = strategy.distribute_datasets_from_function(new_dataset_fn)
349```
351#### coordinator.create_per_worker_dataset
353`create_per_worker_dataset` works the same as
354`distribute_datasets_from_function`.
356```
357dataset = dataset_fn()
358dataset_id = tf.data.experimental.service.register_dataset(
359 service=FLAGS.tf_data_service_address,
360 dataset=dataset)
362def new_dataset_fn(input_context):
363 del input_context
364 return tf.data.experimental.service.from_dataset_id(
365 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
366 service=FLAGS.tf_data_service_address,
367 dataset_id=dataset_id,
368 job_name="shared_job")
370dataset = coordinator.create_per_worker_dataset(new_dataset_fn)
371```
373### Sharing tf.data service with concurrent trainers
375If you run multiple trainers concurrently using the same training data, it could
376save resources to cache the data in one tf.data service cluster and share the
377cluster with the trainers. For example, if you use Vizier to tune
378hyperparameters, the Vizier jobs can run concurrently and share one tf.data
379service cluster.
381To enable this feature, each trainer needs to generate a unique trainer ID, and
382you pass the trainer ID to `tf.data.experimental.service.distribute`. Once a job
383has consumed data, the data remains in the cache and is re-used by jobs with
384different `trainer_id`s. Requests with the same `trainer_id` do not re-use data.
385For example:
387```
388dataset = expensive_computation()
389dataset = dataset.apply(tf.data.experimental.service.distribute(
390 processing_mode=tf.data.experimental.service.ShardingPolicy.OFF,
391 service=FLAGS.tf_data_service_address,
392 job_name="job",
393 cross_trainer_cache=data_service_ops.CrossTrainerCache(
394 trainer_id=trainer_id())))
395```
397tf.data service uses a sliding-window cache to store shared data. When one
398trainer consumes data, the data remains in the cache. When other trainers need
399data, they can get data from the cache instead of repeating the expensive
400computation. The cache has a bounded size, so some workers may not read the full
401dataset. To ensure all the trainers get sufficient training data, we require the
402input dataset to be infinite. This can be achieved, for example, by repeating
403the dataset and performing random augmentation on the training instances.
405## Limitations
407- Python-based data processing: Datasets which use Python-based data processing
408 (e.g. `tf.py_function`, `tf.numpy_function`, or
409 `tf.data.Dataset.from_generator`) are currently not supported.
410- Non-Serializable Resources: Datasets may only depend on TF resources that
411 support serialization. Serialization is currently supported for lookup
412 tables and variables. If your dataset depends on a TF resource that cannot be
413 serialized, please file a Github issue.
414- Remote Resources: If a dataset depends on a resource, the dataset must be
415 registered from the same process that created the resource (e.g. the "chief"
416 job of ParameterServerStrategy).
417"""
419from tensorflow.python.data.experimental.ops.data_service_ops import distribute
420from tensorflow.python.data.experimental.ops.data_service_ops import from_dataset_id
421from tensorflow.python.data.experimental.ops.data_service_ops import register_dataset
422from tensorflow.python.data.experimental.ops.data_service_ops import ShardingPolicy
423from tensorflow.python.data.experimental.service.server_lib import DispatcherConfig
424from tensorflow.python.data.experimental.service.server_lib import DispatchServer
425from tensorflow.python.data.experimental.service.server_lib import WorkerConfig
426from tensorflow.python.data.experimental.service.server_lib import WorkerServer