Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/tpu/device_assignment.py: 16%
214 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ======================================
15"""Library of TPU helper functions."""
17import enum
18import math
19from typing import List, Optional, Text, Tuple
21import numpy as np
23from tensorflow.python.platform import tf_logging as logging
24from tensorflow.python.tpu.topology import Topology
25from tensorflow.python.util.tf_export import tf_export
28SINGLE_CORE_ASSIGNMENT = [[[0, 0, 0, 0]]]
31def _compute_task_and_cores_to_replicas(core_assignment, topology):
32 """Computes a nested dict which maps task and logical core to replicas."""
33 task_and_cores_to_replicas = {}
34 for replica in range(core_assignment.shape[0]):
35 for logical_core in range(core_assignment.shape[1]):
36 coordinates = core_assignment[replica, logical_core, :]
37 task_id = topology.task_ordinal_at_coordinates(coordinates)
38 if task_id not in task_and_cores_to_replicas:
39 task_and_cores_to_replicas[task_id] = {}
40 if logical_core not in task_and_cores_to_replicas[task_id]:
41 task_and_cores_to_replicas[task_id][logical_core] = set()
43 task_and_cores_to_replicas[task_id][logical_core].add(replica)
45 task_to_sorted_replica_id = {}
47 for task, core_to_replicas in task_and_cores_to_replicas.items():
48 core_to_sorted_replicas = {}
49 for core, replicas in core_to_replicas.items():
50 core_to_sorted_replicas[core] = sorted(replicas)
52 task_to_sorted_replica_id[task] = core_to_sorted_replicas
53 return task_to_sorted_replica_id
56@tf_export("tpu.experimental.DeviceAssignment")
57class DeviceAssignment(object):
58 """Mapping from logical cores in a computation to the physical TPU topology.
60 Prefer to use the `DeviceAssignment.build()` helper to construct a
61 `DeviceAssignment`; it is easier if less flexible than constructing a
62 `DeviceAssignment` directly.
63 """
65 def __init__(self, topology: Topology, core_assignment: np.ndarray):
66 """Constructs a `DeviceAssignment` object.
68 Args:
69 topology: A `Topology` object that describes the physical TPU topology.
70 core_assignment: A logical to physical core mapping, represented as a
71 rank 3 numpy array. See the description of the `core_assignment`
72 property for more details.
74 Raises:
75 ValueError: If `topology` is not `Topology` object.
76 ValueError: If `core_assignment` is not a rank 3 numpy array.
77 """
78 if not isinstance(topology, Topology):
79 raise ValueError("topology must be a Topology object, got {}".format(
80 type(topology)))
81 core_assignment = np.asarray(core_assignment, dtype=np.int32)
83 self._topology = topology
85 if core_assignment.ndim != 3:
86 raise ValueError("core_assignment must be a rank 3 numpy array, "
87 f"got shape {core_assignment.shape}")
89 self._num_replicas = core_assignment.shape[0]
90 self._num_cores_per_replica = core_assignment.shape[1]
92 if core_assignment.shape[-1] != topology.mesh_rank:
93 raise ValueError(
94 "core_assignment.shape[-1] must have size equal to topology "
95 f"rank ({topology.mesh_rank}), got "
96 f"core_assignment.shape={core_assignment.shape}")
98 self._core_assignment = core_assignment
99 self._task_and_cores_to_replicas = _compute_task_and_cores_to_replicas(
100 self._core_assignment, topology)
102 @property
103 def topology(self) -> Topology:
104 """A `Topology` that describes the TPU topology."""
105 return self._topology
107 @property
108 def num_cores_per_replica(self) -> int:
109 """The number of cores per replica."""
110 return self._num_cores_per_replica
112 @property
113 def num_replicas(self) -> int:
114 """The number of replicas of the computation."""
115 return self._num_replicas
117 @property
118 def core_assignment(self) -> np.ndarray:
119 """The logical to physical core mapping.
121 Returns:
122 An integer numpy array of rank 3, with shape
123 `[num_replicas, num_cores_per_replica, topology_rank]`. Maps
124 (replica, logical core) pairs to physical topology coordinates.
125 """
126 return self._core_assignment
128 def coordinates(self, replica: int, logical_core: int) -> Tuple: # pylint:disable=g-bare-generic
129 """Returns the physical topology coordinates of a logical core."""
130 return tuple(self.core_assignment[replica, logical_core, :])
132 def lookup_replicas(self, task_id: int, logical_core: int) -> List[int]:
133 """Lookup replica ids by task number and logical core.
135 Args:
136 task_id: TensorFlow task number.
137 logical_core: An integer, identifying a logical core.
138 Returns:
139 A sorted list of the replicas that are attached to that task and
140 logical_core.
141 Raises:
142 ValueError: If no replica exists in the task which contains the logical
143 core.
144 """
145 try:
146 return self._task_and_cores_to_replicas[task_id][logical_core]
147 except KeyError:
148 raise ValueError(
149 "Can not find any replica in task: {} contains logical_core: {} ".
150 format(task_id, logical_core))
152 def tpu_ordinal(self, replica: int = 0, logical_core: int = 0) -> int:
153 """Returns the ordinal of the TPU device assigned to a logical core."""
154 coordinates = self.coordinates(replica, logical_core)
155 return self._topology.tpu_device_ordinal_at_coordinates(coordinates)
157 def host_device(self,
158 replica: int = 0,
159 logical_core: int = 0,
160 job: Optional[Text] = None) -> Text:
161 """Returns the CPU device attached to a logical core."""
162 coordinates = self.coordinates(replica, logical_core)
163 return self._topology.cpu_device_name_at_coordinates(coordinates, job=job)
165 def tpu_device(self,
166 replica: int = 0,
167 logical_core: int = 0,
168 job: Optional[Text] = None) -> Text:
169 """Returns the name of the TPU device assigned to a logical core."""
170 coordinates = self.coordinates(replica, logical_core)
171 return self._topology.tpu_device_name_at_coordinates(coordinates, job=job)
173 @staticmethod
174 def build(topology: Topology,
175 computation_shape: Optional[np.ndarray] = None,
176 computation_stride: Optional[np.ndarray] = None,
177 num_replicas: int = 1) -> "DeviceAssignment":
178 return device_assignment(topology, computation_shape, computation_stride,
179 num_replicas)
182def _open_ring_2d(x_size: int, y_size: int,
183 z_coord: int) -> List[Tuple[int, int, int]]:
184 """Ring-order of a X by Y mesh, with a fixed Z coordinate.
186 For example, in a 4x4 mesh, this returns the following order.
187 0 -- 1 -- 2 -- 3
188 | | | |
189 15-- 6 -- 5 -- 4
190 | | | |
191 14-- 7 -- 8 -- 9
192 | | | |
193 13-- 12-- 11-- 10
195 Note that chip 0 is not included in the output.
197 Args:
198 x_size: An integer represents the mesh size in the x-dimension. Must be
199 larger than 1.
200 y_size: An integer represents the mesh size in the y-dimension. Must be
201 larger than 1.
202 z_coord: An integer represents the z-coordinate to use for the chips in the
203 ring.
205 Returns:
206 A list of (x,y,z) triples in ring order.
207 """
208 ret = []
209 for i in range(y_size // 2):
210 for j in range(1, x_size):
211 ret.append((j, 2 * i, z_coord))
212 for j in range(x_size - 1, 0, -1):
213 ret.append((j, 2 * i + 1, z_coord))
214 for i in range(y_size - 1, 0, -1):
215 ret.append((0, i, z_coord))
216 return ret
219def _ring_3d(x_size: int, y_size: int,
220 z_size: int) -> List[Tuple[int, int, int]]:
221 """Ring-order of a X by Y by Z mesh.
223 Constructs the 3d ring from 2d rings that are stacked in the Z dimension and
224 joined in one corner.
226 z == 0:
227 0 -- 1 -- 2 -- 3
228 | | | |
229 15 - 6 -- 5 -- 4
230 | | | |
231 14 - 7 -- 8 -- 9
232 | | | |
233 13 - 12 - 11 - 10
234 z == 1:
235 63 - 30 - 29 - 28
236 | | | |
237 16 - 25 - 26 - 27
238 | | | |
239 17 - 24 - 23 - 22
240 | | | |
241 18 - 19 - 20 - 21
242 z == 2:
243 62 - 31 - 32 - 33
244 | | | |
245 45 - 36 - 35 - 34
246 | | | |
247 44 - 37 - 38 - 39
248 | | | |
249 43 - 42 - 41 - 40
250 z == 3:
251 61 - 60 - 59 - 58
252 | | | |
253 46 - 55 - 56 - 57
254 | | | |
255 47 - 54 - 53 - 52
256 | | | |
257 48 - 49 - 50 - 51
259 Args:
260 x_size: An integer represents the mesh size in the x-dimension. Must be
261 larger than 1.
262 y_size: An integer represents the mesh size in the y-dimension. Must be
263 larger than 1.
264 z_size: An integer represents the mesh size in the z-dimension. Must be
265 larger than 1. For example, in a 4x4x4 mesh, this returns the following
266 order.
268 Returns:
269 A list of (x,y,z) triples in ring order.
270 """
272 # Handle the case where 2 dimensions are size 1.
273 if x_size == 1 and y_size == 1:
274 return [(0, 0, i) for i in range(z_size)]
275 if x_size == 1 and z_size == 1:
276 return [(0, i, 0) for i in range(y_size)]
277 if y_size == 1 and z_size == 1:
278 return [(i, 0, 0) for i in range(x_size)]
280 # Handle odd mesh dimensions. This never happens in practice, so we don't
281 # bother to try building something optimal.
282 if (x_size > 1 and x_size % 2 != 0) or (y_size > 1 and
283 y_size % 2 != 0) or (z_size > 1 and
284 z_size % 2 != 0):
285 logging.warning("Odd dimension")
286 ret = []
287 for z in range(z_size):
288 for y in range(y_size):
289 ret.extend((x, y, z) for x in range(x_size))
290 return ret
292 # Always start with chip 0.
293 ret = [(0, 0, 0)]
294 # Handle the case where one dimension is size 1. We just build a flat, 2d
295 # ring.
296 if z_size == 1:
297 ret.extend(_open_ring_2d(x_size, y_size, 0))
298 return ret
299 if y_size == 1:
300 ret = [(0, 0, 0)]
301 ret.extend((x, y, z) for (x, z, y) in _open_ring_2d(x_size, z_size, 0))
302 return ret
303 if x_size == 1:
304 ret = [(0, 0, 0)]
305 ret.extend((x, y, z) for (y, z, x) in _open_ring_2d(y_size, z_size, 0))
306 return ret
308 # Handle the case where all dimensions have size > 1 and even.
309 ret = [(0, 0, 0)]
310 for i in range(0, z_size):
311 r = _open_ring_2d(x_size, y_size, i)
312 if i % 2 == 0:
313 ret.extend(r)
314 else:
315 ret.extend(reversed(r))
316 for i in range(z_size - 1, 0, -1):
317 ret.append((0, 0, i))
318 return ret
321class DeviceOrderMode(enum.IntEnum):
322 """The way of determining device orders when computing device assignment."""
323 # By default the mode is set to AUTO, the library will choose to form rings
324 # when that is possible.
325 AUTO = 0
326 # Form rings for replicas and model-parallel cores.
327 RING = 1
328 # Form meshes for replicas and/or model-parallel cores.
329 MESH = 2
332def device_assignment(
333 topology: Topology,
334 computation_shape: Optional[np.ndarray] = None,
335 computation_stride: Optional[np.ndarray] = None,
336 num_replicas: int = 1,
337 device_order_mode: DeviceOrderMode = DeviceOrderMode.AUTO
338) -> DeviceAssignment:
339 """Computes a device_assignment of a computation across a TPU topology.
341 Attempts to choose a compact grid of cores for locality.
343 Returns a `DeviceAssignment` that describes the cores in the topology assigned
344 to each core of each replica.
346 `computation_shape` and `computation_stride` values should be powers of 2 for
347 optimal packing.
349 Args:
350 topology: A `Topology` object that describes the TPU cluster topology. To
351 obtain a TPU topology, evaluate the `Tensor` returned by
352 `initialize_system` using `Session.run`. Either a serialized
353 `TopologyProto` or a `Topology` object may be passed. Note: you must
354 evaluate the `Tensor` first; you cannot pass an unevaluated `Tensor`
355 here.
356 computation_shape: A rank 1 int32 numpy array with size equal to the
357 topology rank, describing the shape of the computation's block of cores.
358 If None, the `computation_shape` is `[1] * topology_rank`.
359 computation_stride: A rank 1 int32 numpy array of size `topology_rank`,
360 describing the inter-core spacing of the `computation_shape` cores in the
361 TPU topology. If None, the `computation_stride` is `[1] * topology_rank`.
362 num_replicas: The number of computation replicas to run. The replicas will
363 be packed into the free spaces of the topology.
364 device_order_mode: An enum of `DeviceOrderMode` class which indicates
365 whether to assign devices to form rings or meshes, or let the library to
366 choose.
368 Returns:
369 A DeviceAssignment object, which describes the mapping between the logical
370 cores in each computation replica and the physical cores in the TPU
371 topology.
373 Raises:
374 ValueError: If `topology` is not a valid `Topology` object.
375 ValueError: If `computation_shape` or `computation_stride` are not 1D int32
376 numpy arrays with shape [3] where all values are positive.
377 ValueError: If computation's replicas cannot fit into the TPU topology.
378 """
379 # Deserialize the Topology proto, if it is a string.
380 if isinstance(topology, bytes):
381 topology = Topology(serialized=topology)
383 if not isinstance(topology, Topology):
384 raise ValueError(
385 f"`topology` is not a Topology object; got {type(topology)}")
387 topology_rank = len(topology.mesh_shape)
388 mesh_shape = topology.mesh_shape
389 if computation_shape is None:
390 computation_shape = np.array([1] * topology_rank, dtype=np.int32)
391 else:
392 computation_shape = np.asarray(computation_shape, dtype=np.int32)
394 if computation_stride is None:
395 computation_stride = np.array([1] * topology_rank, dtype=np.int32)
396 else:
397 computation_stride = np.asarray(computation_stride, dtype=np.int32)
399 if computation_shape.shape != (topology_rank,):
400 raise ValueError(
401 f"computation_shape must have shape [{topology_rank}]; "
402 f"got {computation_shape.shape}"
403 )
404 if computation_stride.shape != (topology_rank,):
405 raise ValueError(
406 f"computation_stride must have shape [{topology_rank}]; "
407 f"got {computation_stride.shape}"
408 )
410 if any(computation_shape < 1):
411 raise ValueError(
412 "computation_shape must be positive; got computation_shape={}".format(
413 computation_shape))
414 if any(computation_stride < 1):
415 raise ValueError(
416 "computation_stride must be positive; got computation_stride={}".format(
417 computation_stride))
419 # Computes the physical size of one computation instance.
420 computation_footprint = computation_shape * computation_stride
421 if any(computation_footprint > mesh_shape):
422 raise ValueError(
423 "computation footprint {} does not fit in TPU topology shape {}".format(
424 computation_footprint, mesh_shape))
426 # Computes how many copies of the computation footprint fit in the mesh.
427 block_counts = mesh_shape // computation_footprint
429 replica_counts = block_counts * computation_stride
430 max_replicas = np.prod(replica_counts)
431 if num_replicas > max_replicas:
432 raise ValueError(
433 "requested {} replicas but only {} replicas with shape {} and "
434 "computation_stride {} fit in a TPU mesh of shape {}".format(
435 num_replicas, max_replicas, computation_shape, computation_stride,
436 mesh_shape))
438 def ceil_of_ratio(n, m):
439 return (n + m - 1) // m
441 if topology.missing_devices.size == 0:
442 replica_shape = [0] * topology_rank
443 if num_replicas > 0:
444 remaining_replicas = num_replicas
445 remaining_dims = topology_rank
447 # Choose dimensions as close to an equal cube as possible,
448 # in order of increasing dimension size. By visiting dimensions
449 # in increasing size, we assign the most constrained dimension
450 # first, so we won't make infeasible choices.
451 #
452 # As a secondary sort order, visit the last dimension (core index) first,
453 # then the other dimensions in increasing order. This means we try to use
454 # both cores on the same chip in preference to two cores on different
455 # chips. We visit the x dimension first, and the z dimension last, so
456 # that we prefer to arrange adjacent replicas on the same machine when
457 # possible.
458 #
459 # For example, if num_replicas == 4, we prefer to use a replica_shape of
460 # (2,1,1,2) over (1,1,2,2).
462 for x, ni in sorted(((x, ((i + 1) % topology_rank))
463 for (i, x) in enumerate(replica_counts))):
464 i = (ni + topology_rank - 1) % topology_rank
465 target_size = int(math.ceil(remaining_replicas**(1.0 / remaining_dims)))
466 replica_shape[i] = min(target_size, x)
467 remaining_replicas = ceil_of_ratio(remaining_replicas, replica_shape[i])
468 remaining_dims -= 1
470 assert remaining_replicas == 1 and remaining_dims == 0
472 # Assigns an offset to each replica such that no two replicas overlap.
473 replica_offsets = np.full([num_replicas, topology_rank], -1, dtype=np.int32)
475 enable_3d_tiling = (
476 topology_rank == 4 and
477 computation_shape[-1] == mesh_shape[-1] # Only handle 3D case.
478 and np.prod(computation_stride) == 1 # Ensure no stride.
479 and num_replicas == max_replicas) # Full replication.
481 if device_order_mode != DeviceOrderMode.AUTO:
482 if device_order_mode == DeviceOrderMode.RING and not enable_3d_tiling:
483 raise ValueError(
484 "device_order_mode=DeviceOrderMode.RING is not compatible with the "
485 "3D tiling current topology. Try setting "
486 "device_order_mode=DeviceOrderMode.AUTO"
487 )
488 enable_3d_tiling = device_order_mode == DeviceOrderMode.RING
490 if enable_3d_tiling:
491 assignment = []
492 inner_ring = _ring_3d(computation_shape[0], computation_shape[1],
493 computation_shape[2])
494 outer_ring = _ring_3d(replica_shape[0], replica_shape[1],
495 replica_shape[2])
497 for replica in range(num_replicas):
498 outer_x, outer_y, outer_z = outer_ring[replica]
499 per_replica_assignment = []
500 for index in range(np.prod(computation_shape)):
501 inner_x, inner_y, inner_z = inner_ring[index // mesh_shape[-1]]
502 px = outer_x * computation_shape[0] + inner_x
503 py = outer_y * computation_shape[1] + inner_y
504 pz = outer_z * computation_shape[2] + inner_z
505 pi = index % mesh_shape[-1]
506 per_replica_assignment.append([px, py, pz, pi])
507 assignment.append(per_replica_assignment)
508 else:
509 for replica in range(num_replicas):
510 # Chooses a replica number in each axis.
511 t = replica
512 pos = []
513 # Visit the core number first.
514 for dim in np.concatenate([[replica_shape[-1]], replica_shape[:-1]]):
515 pos.append(t % dim)
516 t //= dim
517 replica_pos = np.concatenate([pos[1:], [pos[0]]])
519 # Determines where that replica starts in each axis.
520 outer = replica_pos // computation_stride
521 inner = replica_pos % computation_stride
522 replica_offsets[replica, :] = outer * computation_footprint + inner
524 # Computes a logical core -> physical core mapping for each replica.
525 indices = [
526 np.arange(0, computation_shape[i] * computation_stride[i],
527 computation_stride[i]) for i in range(topology_rank)
528 ]
529 indices = np.concatenate(
530 [i[..., np.newaxis] for i in np.meshgrid(*indices, indexing="ij")],
531 axis=-1)
532 indices = indices.reshape((-1, topology_rank))
533 assignment = indices + replica_offsets[:, np.newaxis, :]
534 else:
535 # We have a slice with missing chips. We define a simple assignment by
536 # ignoring computation stride. This assignment should enable a consistent
537 # and correct device assignment on degraded slices. It is optimal when
538 # weights are not sharded. But this device assignment may be sub-optimal for
539 # other model parallelism scenarios.
540 assert np.prod(computation_stride) == 1
541 # Next, we check if we have sufficient devices.
542 assert num_replicas * np.prod(
543 computation_shape) <= topology.num_tasks * topology.num_tpus_per_task
544 # Map replicas to physical devices in task order.
545 device_coordinates = topology.device_coordinates
546 assignment = []
547 devices_per_replica = np.prod(computation_shape)
548 for rindex in range(num_replicas):
549 replica_assignment = []
550 for index in range(devices_per_replica):
551 logical_id = rindex * devices_per_replica + index
552 # Pick logical cores in task order
553 task = logical_id // topology.num_tpus_per_task
554 device = logical_id % topology.num_tpus_per_task
555 # Append physical cores to the replica assignment
556 replica_assignment.append(device_coordinates[task, device, :])
557 assignment.append(replica_assignment)
559 return DeviceAssignment(topology, core_assignment=assignment)