{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "wJcYs_ERTnnI" }, "source": [ "##### Copyright 2021 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "cellView": "form", "execution": { "iopub.execute_input": "2022-08-31T00:36:51.009445Z", "iopub.status.busy": "2022-08-31T00:36:51.008778Z", "iopub.status.idle": "2022-08-31T00:36:51.013085Z", "shell.execute_reply": "2022-08-31T00:36:51.012565Z" }, "id": "HMUDt0CiUJk9" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "77z2OchJTk0l" }, "source": [ "# 迁移多工作进程 CPU/GPU 训练\n", "\n", "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 运行\n", " 在 Github 上查看源代码\n", " 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "meUTrR4I6m1C" }, "source": [ "本指南演示了如何将多工作进程分布式训练工作流从 TensorFlow 1 迁移到 TensorFlow 2。\n", "\n", "使用 CPU/GPU 执行多工作进程训练:\n", "\n", "- 在 TensorFlow 1 中,您通常会使用 `tf.estimator.train_and_evaluate` 和 `tf.estimator.Estimator` API。\n", "- 在 TensorFlow 2 中,使用 Keras API 编写模型、损失函数、优化器和指标。随后,利用 Keras `Model.fit` API 或自定义训练循环(使用 `tf.GradientTape`)并通过 `tf.distribute.experimental.ParameterServerStrategy` 或 `tf.distribute.MultiWorkerMirroredStrategy` 将训练分布到多个工作进程之间。有关详情,请参阅下列教程:\n", " - [使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)\n", " - [使用 Keras Model.fit/自定义训练循环进行参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)\n", " - [在 Keras Model.fit 中使用 MultiWorkerMirroredStrategy](../../tutorials/distribute/multi_worker_with_keras.ipynb)\n", " - [在自定义训练循环中使用 MultiWorkerMirroredStrategy](../../tutorials/distribute/multi_worker_with_ctl.ipynb)。" ] }, { "cell_type": "markdown", "metadata": { "id": "YdZSoIXEbhg-" }, "source": [ "## 安装" ] }, { "cell_type": "markdown", "metadata": { "id": "28f46832b54d" }, "source": [ "从一些必要的导入和用于演示目的的简单数据集开始:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:51.017499Z", "iopub.status.busy": "2022-08-31T00:36:51.017052Z", "iopub.status.idle": "2022-08-31T00:36:54.423271Z", "shell.execute_reply": "2022-08-31T00:36:54.422557Z" }, "id": "iE0vSfMXumKI" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Collecting portpicker\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " Downloading portpicker-1.5.2-py3-none-any.whl (14 kB)\r\n", "Requirement already satisfied: psutil in /tmpfs/src/tf_docs_env/lib/python3.9/site-packages (from portpicker) (5.9.1)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Installing collected packages: portpicker\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Successfully installed portpicker-1.5.2\r\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2022-08-31 00:36:52.723176: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2022-08-31 00:36:53.448692: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory\n", "2022-08-31 00:36:53.448979: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory\n", "2022-08-31 00:36:53.448993: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.\n" ] } ], "source": [ "# The notebook uses a dataset instance for `Model.fit` with\n", "# `ParameterServerStrategy`, which depends on symbols in TF 2.7.\n", "# Install a utility needed for this demonstration\n", "!pip install portpicker\n", "\n", "import tensorflow as tf\n", "import tensorflow.compat.v1 as tf1" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:54.427498Z", "iopub.status.busy": "2022-08-31T00:36:54.427101Z", "iopub.status.idle": "2022-08-31T00:36:54.431273Z", "shell.execute_reply": "2022-08-31T00:36:54.430742Z" }, "id": "m7rnGxsXtDkV" }, "outputs": [], "source": [ "features = [[1., 1.5], [2., 2.5], [3., 3.5]]\n", "labels = [[0.3], [0.5], [0.7]]\n", "eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]\n", "eval_labels = [[0.8], [0.9], [1.]]" ] }, { "cell_type": "markdown", "metadata": { "id": "T2uaw9QaDM_X" }, "source": [ "您将需要 `'TF_CONFIG'` 配置环境变量以在 TensorFlow 中的多台机器上进行训练。使用 `'TF_CONFIG'` 指定 `'cluster'` 和 `'task'` 的地址。(有关详情,请参阅[分布式训练](../...guide/distributed_training.ipynb)指南。)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:54.434858Z", "iopub.status.busy": "2022-08-31T00:36:54.434342Z", "iopub.status.idle": "2022-08-31T00:36:54.438019Z", "shell.execute_reply": "2022-08-31T00:36:54.437447Z" }, "id": "4OUzwoQgXgkG" }, "outputs": [], "source": [ "import json\n", "import os\n", "\n", "tf_config = {\n", " 'cluster': {\n", " 'chief': ['localhost:11111'],\n", " 'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],\n", " 'ps': ['localhost:12121', 'localhost:13131'],\n", " },\n", " 'task': {'type': 'chief', 'index': 0}\n", "}\n", "\n", "os.environ['TF_CONFIG'] = json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "PbeoSbbmDdc0" }, "source": [ "注:遗憾的是,由于在 TensorFlow 1 中使用 `tf.estimator` API 进行多工作进程训练需要多个客户端(在此 Colab 笔记本中实现这一点会特别棘手),这会使笔记本在没有 `'TF_CONFIG'` 环境变量的情况下可运行,因此它会回退为本地训练。(有关详情,请参阅[使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)指南中的设置 `'TF_CONFIG'` 环境变量部分。)\n", "\n", "使用 `del` 语句移除变量(但在 TensorFlow 1 中的实际多工作进程训练中,您不必这样做):" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:54.441758Z", "iopub.status.busy": "2022-08-31T00:36:54.441227Z", "iopub.status.idle": "2022-08-31T00:36:54.444327Z", "shell.execute_reply": "2022-08-31T00:36:54.443753Z" }, "id": "AHuynAR5D8sU" }, "outputs": [], "source": [ "del os.environ['TF_CONFIG']" ] }, { "cell_type": "markdown", "metadata": { "id": "4uXff1BEssdE" }, "source": [ "## TensorFlow 1:使用 tf.estimator API 进行多工作进程分布式训练" ] }, { "cell_type": "markdown", "metadata": { "id": "MpyINdiLEN3c" }, "source": [ "以下代码段演示了 TF1 中多工作进程训练的规范工作流:您将使用 `tf.estimator.Estimator`、`tf.estimator.TrainSpec`、`tf.estimator.EvalSpec` 和 `tf.estimator.train_and_evaluate` API 来分布训练:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:54.448008Z", "iopub.status.busy": "2022-08-31T00:36:54.447553Z", "iopub.status.idle": "2022-08-31T00:36:58.910099Z", "shell.execute_reply": "2022-08-31T00:36:58.909328Z" }, "id": "lqe9obf7suIj" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Using default config.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "WARNING:tensorflow:Using temporary folder as model directory: /tmpfs/tmp/tmp3_hegyqd\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Using config: {'_model_dir': '/tmpfs/tmp/tmp3_hegyqd', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n", "graph_options {\n", " rewrite_options {\n", " meta_optimizer_iterations: ONE\n", " }\n", "}\n", ", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Not using Distribute Coordinator.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Running training and evaluation locally (non-distributed).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow/python/training/training_util.py:396: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.\n", "Instructions for updating:\n", "Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Calling model_fn.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow/python/training/adagrad.py:138: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.\n", "Instructions for updating:\n", "Call initializer instance with the dtype argument instead of passing it to the constructor\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Done calling model_fn.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Create CheckpointSaverHook.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Graph was finalized.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Running local_init_op.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Done running local_init_op.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Saving checkpoints for 0 into /tmpfs/tmp/tmp3_hegyqd/model.ckpt.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:loss = 4.4880586, step = 0\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Saving checkpoints for 3 into /tmpfs/tmp/tmp3_hegyqd/model.ckpt.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Calling model_fn.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Done calling model_fn.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Starting evaluation at 2022-08-31T00:36:58\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Graph was finalized.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Restoring parameters from /tmpfs/tmp/tmp3_hegyqd/model.ckpt-3\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Running local_init_op.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Done running local_init_op.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Inference Time : 0.15916s\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Finished evaluation at 2022-08-31-00:36:58\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 43.62651\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmpfs/tmp/tmp3_hegyqd/model.ckpt-3\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Loss for final step: 20.31957.\n" ] }, { "data": { "text/plain": [ "({'loss': 43.62651, 'global_step': 3}, [])" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def _input_fn():\n", " return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)\n", "\n", "def _eval_input_fn():\n", " return tf1.data.Dataset.from_tensor_slices(\n", " (eval_features, eval_labels)).batch(1)\n", "\n", "def _model_fn(features, labels, mode):\n", " logits = tf1.layers.Dense(1)(features)\n", " loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)\n", " optimizer = tf1.train.AdagradOptimizer(0.05)\n", " train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())\n", " return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)\n", "\n", "estimator = tf1.estimator.Estimator(model_fn=_model_fn)\n", "train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)\n", "eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)\n", "tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)" ] }, { "cell_type": "markdown", "metadata": { "id": "KEmzBjfnsxwT" }, "source": [ "## TensorFlow 2:使用分布策略进行多工作进程训练" ] }, { "cell_type": "markdown", "metadata": { "id": "Syb66qsbEp1x" }, "source": [ "在 TensorFlow 2 中,使用 CPU、GPU 和 TPU 的多个工作进程之间的分布式训练是通过 `tf.distribute.Strategy` 完成的。\n", "\n", "下面的示例演示了如何使用两种这样的策略:`tf.distribute.experimental.ParameterServerStrategy` 和 `tf.distribute.MultiWorkerMirroredStrategy`,这两种策略都是为使用多个工作进程进行 CPU/GPU 训练而设计的。\n", "\n", "`ParameterServerStrategy` 使用了一个*协调器* (`'chief'`),这使其对此 Colab 笔记本中的环境更加友好。您将在此处使用一些效用函数来设置可运行体验所必需的支持元素:您将创建一个*进程内聚簇*,其中线程用于模拟参数服务器 (`'ps'`) 和工作进程 (`'worker'`)。有关参数服务器训练的更多信息,请参阅[使用 ParameterServerStrategy 进行参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)教程。\n", "\n", "在此示例中,首先使用 `tf.distribute.cluster_resolver.TFConfigClusterResolver` 定义 `'TF_CONFIG'` 环境变量以提供聚簇信息。如果您使用聚簇管理系统进行分布式训练,请检查它是否已经提供了 `'TF_CONFIG'`,如果已提供,则无需显式设置此环境变量。(有关详情,请参阅[使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)指南中的设置 `'TF_CONFIG'` 环境变量部分。)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:58.914424Z", "iopub.status.busy": "2022-08-31T00:36:58.913764Z", "iopub.status.idle": "2022-08-31T00:36:58.920349Z", "shell.execute_reply": "2022-08-31T00:36:58.919693Z" }, "id": "rp-gFY0H5rF-" }, "outputs": [], "source": [ "# Find ports that are available for the `'chief'` (the coordinator),\n", "# `'worker'`s, and `'ps'` (parameter servers).\n", "import portpicker\n", "\n", "chief_port = portpicker.pick_unused_port()\n", "worker_ports = [portpicker.pick_unused_port() for _ in range(3)]\n", "ps_ports = [portpicker.pick_unused_port() for _ in range(2)]\n", "\n", "# Dump the cluster information to `'TF_CONFIG'`.\n", "tf_config = {\n", " 'cluster': {\n", " 'chief': [\"localhost:%s\" % chief_port],\n", " 'worker': [\"localhost:%s\" % port for port in worker_ports],\n", " 'ps': [\"localhost:%s\" % port for port in ps_ports],\n", " },\n", " 'task': {'type': 'chief', 'index': 0}\n", "}\n", "os.environ['TF_CONFIG'] = json.dumps(tf_config)\n", "\n", "# Use a cluster resolver to bridge the information to the strategy created below.\n", "cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()" ] }, { "cell_type": "markdown", "metadata": { "id": "o_8uVvJb6dqq" }, "source": [ "然后,为工程进程和参数服务器一一创建 `tf.distribute.Server`:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:58.924345Z", "iopub.status.busy": "2022-08-31T00:36:58.923873Z", "iopub.status.idle": "2022-08-31T00:36:59.208237Z", "shell.execute_reply": "2022-08-31T00:36:59.207383Z" }, "id": "ZJopinmG6b2z" }, "outputs": [], "source": [ "# Workers need some inter_ops threads to work properly.\n", "# This is only needed for this notebook to demo. Real servers\n", "# should not need this.\n", "worker_config = tf.compat.v1.ConfigProto()\n", "worker_config.inter_op_parallelism_threads = 4\n", "\n", "for i in range(3):\n", " tf.distribute.Server(\n", " cluster_resolver.cluster_spec(),\n", " job_name=\"worker\",\n", " task_index=i,\n", " config=worker_config)\n", "\n", "for i in range(2):\n", " tf.distribute.Server(\n", " cluster_resolver.cluster_spec(),\n", " job_name=\"ps\",\n", " task_index=i)" ] }, { "cell_type": "markdown", "metadata": { "id": "IpfCcF0g6Ao8" }, "source": [ "在现实世界的分布式训练中,您将使用多台机器而不是启动协调器上的所有 `tf.distribute.Server`,并且指定为`\"worker\"` 和 `\"ps\"`(参数服务器)的机器将各自运行一个 `tf.distribute.Server`。请参阅[参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)教程中的*现实世界中的聚簇*部分,了解更多详细信息。\n", "\n", "一切准备就绪后,创建 `ParameterServerStrategy` 对象:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:59.213010Z", "iopub.status.busy": "2022-08-31T00:36:59.212464Z", "iopub.status.idle": "2022-08-31T00:36:59.287746Z", "shell.execute_reply": "2022-08-31T00:36:59.286832Z" }, "id": "t45iQeBT7Us_" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:43271'], 'ps': ['localhost:46705', 'localhost:45915'], 'worker': ['localhost:39667', 'localhost:38207', 'localhost:35449']})\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:43271'], 'ps': ['localhost:46705', 'localhost:45915'], 'worker': ['localhost:39667', 'localhost:38207', 'localhost:35449']})\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0', '/job:chief/replica:0/task:0/device:GPU:1', '/job:chief/replica:0/task:0/device:GPU:2', '/job:chief/replica:0/task:0/device:GPU:3'], variable_device = '/device:CPU:0'\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Number of GPUs on workers: 4\n" ] } ], "source": [ "strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)" ] }, { "cell_type": "markdown", "metadata": { "id": "diNsps1MGRS6" }, "source": [ "创建策略对象后,定义模型、优化器和其他变量,然后在 `Strategy.scope` API 中调用 Keras `Model.compile` 以分布训练。(如需了解详情,请参阅 `Strategy.scope` API 文档。)\n", "\n", "如果您更喜欢通过定义前向和后向传递来自定义训练,请参阅[参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)教程中的*使用自定义训练循环进行训练*部分,了解更多详细信息。" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:36:59.291526Z", "iopub.status.busy": "2022-08-31T00:36:59.291233Z", "iopub.status.idle": "2022-08-31T00:37:05.265225Z", "shell.execute_reply": "2022-08-31T00:37:05.264078Z" }, "id": "atVciNgPs0fw" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Epoch 1/5\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow/python/data/ops/dataset_ops.py:461: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.\n", " warnings.warn(\"To make it possible to preserve tf.data options across \"\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2022-08-31 00:37:02.621338: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n", "op: \"TensorSliceDataset\"\n", "input: \"Placeholder/_0\"\n", "input: \"Placeholder/_1\"\n", "attr {\n", " key: \"Toutput_types\"\n", " value {\n", " list {\n", " type: DT_FLOAT\n", " type: DT_FLOAT\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"_cardinality\"\n", " value {\n", " i: 3\n", " }\n", "}\n", "attr {\n", " key: \"is_files\"\n", " value {\n", " b: false\n", " }\n", "}\n", "attr {\n", " key: \"metadata\"\n", " value {\n", " s: \"\\n\\024TensorSliceDataset:6\"\n", " }\n", "}\n", "attr {\n", " key: \"output_shapes\"\n", " value {\n", " list {\n", " shape {\n", " dim {\n", " size: 2\n", " }\n", " }\n", " shape {\n", " dim {\n", " size: 1\n", " }\n", " }\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"replicate_on_split\"\n", " value {\n", " b: false\n", " }\n", "}\n", "experimental_type {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_DATASET\n", " args {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\n", "2022-08-31 00:37:02.626112: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n", "op: \"TensorSliceDataset\"\n", "input: \"Placeholder/_0\"\n", "input: \"Placeholder/_1\"\n", "attr {\n", " key: \"Toutput_types\"\n", " value {\n", " list {\n", " type: DT_FLOAT\n", " type: DT_FLOAT\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"_cardinality\"\n", " value {\n", " i: 3\n", " }\n", "}\n", "attr {\n", " key: \"is_files\"\n", " value {\n", " b: false\n", " }\n", "}\n", "attr {\n", " key: \"metadata\"\n", " value {\n", " s: \"\\n\\024TensorSliceDataset:6\"\n", " }\n", "}\n", "attr {\n", " key: \"output_shapes\"\n", " value {\n", " list {\n", " shape {\n", " dim {\n", " size: 2\n", " }\n", " }\n", " shape {\n", " dim {\n", " size: 1\n", " }\n", " }\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"replicate_on_split\"\n", " value {\n", " b: false\n", " }\n", "}\n", "experimental_type {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_DATASET\n", " args {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\n", "2022-08-31 00:37:02.628310: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n", "op: \"TensorSliceDataset\"\n", "input: \"Placeholder/_0\"\n", "input: \"Placeholder/_1\"\n", "attr {\n", " key: \"Toutput_types\"\n", " value {\n", " list {\n", " type: DT_FLOAT\n", " type: DT_FLOAT\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"_cardinality\"\n", " value {\n", " i: 3\n", " }\n", "}\n", "attr {\n", " key: \"is_files\"\n", " value {\n", " b: false\n", " }\n", "}\n", "attr {\n", " key: \"metadata\"\n", " value {\n", " s: \"\\n\\024TensorSliceDataset:6\"\n", " }\n", "}\n", "attr {\n", " key: \"output_shapes\"\n", " value {\n", " list {\n", " shape {\n", " dim {\n", " size: 2\n", " }\n", " }\n", " shape {\n", " dim {\n", " size: 1\n", " }\n", " }\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"replicate_on_split\"\n", " value {\n", " b: false\n", " }\n", "}\n", "experimental_type {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_DATASET\n", " args {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "10/10 - 5s - loss: 4.6559 - 5s/epoch - 535ms/step\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Epoch 2/5\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "10/10 - 0s - loss: 1.6491 - 66ms/epoch - 7ms/step\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Epoch 3/5\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "10/10 - 0s - loss: 0.7956 - 66ms/epoch - 7ms/step\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Epoch 4/5\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "10/10 - 0s - loss: 0.4192 - 63ms/epoch - 6ms/step\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Epoch 5/5\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "10/10 - 0s - loss: 0.2293 - 62ms/epoch - 6ms/step\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dataset = tf.data.Dataset.from_tensor_slices(\n", " (features, labels)).shuffle(10).repeat().batch(64)\n", "\n", "eval_dataset = tf.data.Dataset.from_tensor_slices(\n", " (eval_features, eval_labels)).repeat().batch(1)\n", "\n", "with strategy.scope():\n", " model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])\n", " optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)\n", " model.compile(optimizer, \"mse\")\n", "\n", "model.fit(dataset, epochs=5, steps_per_epoch=10)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:37:05.269164Z", "iopub.status.busy": "2022-08-31T00:37:05.268587Z", "iopub.status.idle": "2022-08-31T00:37:07.475809Z", "shell.execute_reply": "2022-08-31T00:37:07.474783Z" }, "id": "akZ0aaaS1vA9" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2022-08-31 00:37:06.670870: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n", "op: \"TensorSliceDataset\"\n", "input: \"Placeholder/_0\"\n", "input: \"Placeholder/_1\"\n", "attr {\n", " key: \"Toutput_types\"\n", " value {\n", " list {\n", " type: DT_FLOAT\n", " type: DT_FLOAT\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"_cardinality\"\n", " value {\n", " i: 3\n", " }\n", "}\n", "attr {\n", " key: \"is_files\"\n", " value {\n", " b: false\n", " }\n", "}\n", "attr {\n", " key: \"metadata\"\n", " value {\n", " s: \"\\n\\025TensorSliceDataset:10\"\n", " }\n", "}\n", "attr {\n", " key: \"output_shapes\"\n", " value {\n", " list {\n", " shape {\n", " dim {\n", " size: 2\n", " }\n", " }\n", " shape {\n", " dim {\n", " size: 1\n", " }\n", " }\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"replicate_on_split\"\n", " value {\n", " b: false\n", " }\n", "}\n", "experimental_type {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_DATASET\n", " args {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\n", "2022-08-31 00:37:06.697882: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n", "op: \"TensorSliceDataset\"\n", "input: \"Placeholder/_0\"\n", "input: \"Placeholder/_1\"\n", "attr {\n", " key: \"Toutput_types\"\n", " value {\n", " list {\n", " type: DT_FLOAT\n", " type: DT_FLOAT\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"_cardinality\"\n", " value {\n", " i: 3\n", " }\n", "}\n", "attr {\n", " key: \"is_files\"\n", " value {\n", " b: false\n", " }\n", "}\n", "attr {\n", " key: \"metadata\"\n", " value {\n", " s: \"\\n\\025TensorSliceDataset:10\"\n", " }\n", "}\n", "attr {\n", " key: \"output_shapes\"\n", " value {\n", " list {\n", " shape {\n", " dim {\n", " size: 2\n", " }\n", " }\n", " shape {\n", " dim {\n", " size: 1\n", " }\n", " }\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"replicate_on_split\"\n", " value {\n", " b: false\n", " }\n", "}\n", "experimental_type {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_DATASET\n", " args {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\n", "2022-08-31 00:37:06.700059: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n", "op: \"TensorSliceDataset\"\n", "input: \"Placeholder/_0\"\n", "input: \"Placeholder/_1\"\n", "attr {\n", " key: \"Toutput_types\"\n", " value {\n", " list {\n", " type: DT_FLOAT\n", " type: DT_FLOAT\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"_cardinality\"\n", " value {\n", " i: 3\n", " }\n", "}\n", "attr {\n", " key: \"is_files\"\n", " value {\n", " b: false\n", " }\n", "}\n", "attr {\n", " key: \"metadata\"\n", " value {\n", " s: \"\\n\\025TensorSliceDataset:10\"\n", " }\n", "}\n", "attr {\n", " key: \"output_shapes\"\n", " value {\n", " list {\n", " shape {\n", " dim {\n", " size: 2\n", " }\n", " }\n", " shape {\n", " dim {\n", " size: 1\n", " }\n", " }\n", " }\n", " }\n", "}\n", "attr {\n", " key: \"replicate_on_split\"\n", " value {\n", " b: false\n", " }\n", "}\n", "experimental_type {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_DATASET\n", " args {\n", " type_id: TFT_PRODUCT\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " args {\n", " type_id: TFT_TENSOR\n", " args {\n", " type_id: TFT_FLOAT\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "10/10 - 2s - loss: 1.2941 - 2s/epoch - 212ms/step\n" ] }, { "data": { "text/plain": [ "{'loss': 1.2941229343414307}" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "model.evaluate(eval_dataset, steps=10, return_dict=True)" ] }, { "cell_type": "markdown", "metadata": { "id": "pXbS71XmMSoO" }, "source": [ "> **分区器 (`tf.distribute.experimental.partitioners`)**\n", ">\n", "> TensorFlow 2 中的 `ParameterServerStrategy` 支持变量分区,并提供与 TensorFlow 1 相同,但名称不容易混淆的分区器:\n", ">\n", "> - `tf.compat.v1.variable_axis_size_partitioner` -> `tf.distribute.experimental.partitioners.MaxSizePartitioner`:将分片保持在最大大小以下的分区器)。\n", "> - `tf.compat.v1.min_max_variable_partitioner` -> `tf.distribute.experimental.partitioners.MinSizePartitioner`:为每个分片分配最小大小的分区器。\n", "> - `tf.compat.v1.fixed_size_partitioner` -> `tf.distribute.experimental.partitioners.FixedShardsPartitioner`:分配固定数量分片的分区器。" ] }, { "cell_type": "markdown", "metadata": { "id": "Ig0-uCUbGprd" }, "source": [ "或者,您也可以使用 `MultiWorkerMirroredStrategy` 对象:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "execution": { "iopub.execute_input": "2022-08-31T00:37:07.480808Z", "iopub.status.busy": "2022-08-31T00:37:07.480056Z", "iopub.status.idle": "2022-08-31T00:37:07.523254Z", "shell.execute_reply": "2022-08-31T00:37:07.522542Z" }, "id": "xHXP8bOBGtXL" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0', '/device:GPU:1', '/device:GPU:2', '/device:GPU:3'), communication = CommunicationImplementation.AUTO\n" ] } ], "source": [ "# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.\n", "del os.environ['TF_CONFIG']\n", "strategy = tf.distribute.MultiWorkerMirroredStrategy()" ] }, { "cell_type": "markdown", "metadata": { "id": "tOsmqefTGwUf" }, "source": [ "可以将上面使用的策略替换为 `MultiWorkerMirroredStrategy` 对象,以使用此策略执行训练。\n", "\n", "与 `tf.estimator` API 一样,由于 `MultiWorkerMirroredStrategy` 是一种多客户端策略,在此 Colab 笔记本中运行分布式训练并不容易。因此,用这种策略替换上面的代码最终会以在本地运行结束。[使用 Keras Model.fit](../../tutorials/distribute/multi_worker_with_keras.ipynb)/[自定义训练循环](../../tutorials/distribute/multi_worker_with_ctl.ipynb)的多工作进程训练教程演示了在设置 `'TF_CONFIG'` 变量的情况下,如何在 Colab 的本地主机上使用两个工作进程运行多工作进程训练。在实践中,您将在外部 IP 地址/端口上创建多个工作进程,并使用 `'TF_CONFIG'` 变量为每个工作进程指定聚簇配置。" ] }, { "cell_type": "markdown", "metadata": { "id": "917ef6135660" }, "source": [ "## 后续步骤" ] }, { "cell_type": "markdown", "metadata": { "id": "e76fd9d5c98c" }, "source": [ "要详细了解如何在 TensorFlow 2 中使用 `tf.distribute.experimental.ParameterServerStrategy` 和 `tf.distribute.MultiWorkerMirroredStrategy` 进行多工作进程分布式训练,请查看以下资源:\n", "\n", "- 教程:[使用 ParameterServerStrategy 和 Keras Model.fit/自定义训练循环进行参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)\n", "- 教程:[使用 MultiWorkerMirroredStrategy 和 Keras Model.fit 进行多工作进程训练](../../tutorials/distribute/multi_worker_with_keras.ipynb)\n", "- 教程:[使用 MultiWorkerMirroredStrategy 和自定义训练循环进行多工作进程训练](../../tutorials/distribute/multi_worker_with_ctl.ipynb)\n", "- 指南:[使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)\n", "- 指南:[使用 TensorFlow Profiler 优化 TensorFlow GPU 性能](../../guide/gpu_performance_analysis.ipynb)\n", "- 指南:[使用 GPU](../../guide/gpu.ipynb)(“使用多个 GPU”部分)" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "multi_worker_cpu_gpu_training.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" } }, "nbformat": 4, "nbformat_minor": 0 }