Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/tpu/device_assignment.py: 16%

214 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ====================================== 

15"""Library of TPU helper functions.""" 

16 

17import enum 

18import math 

19from typing import List, Optional, Text, Tuple 

20 

21import numpy as np 

22 

23from tensorflow.python.platform import tf_logging as logging 

24from tensorflow.python.tpu.topology import Topology 

25from tensorflow.python.util.tf_export import tf_export 

26 

27 

28SINGLE_CORE_ASSIGNMENT = [[[0, 0, 0, 0]]] 

29 

30 

31def _compute_task_and_cores_to_replicas(core_assignment, topology): 

32 """Computes a nested dict which maps task and logical core to replicas.""" 

33 task_and_cores_to_replicas = {} 

34 for replica in range(core_assignment.shape[0]): 

35 for logical_core in range(core_assignment.shape[1]): 

36 coordinates = core_assignment[replica, logical_core, :] 

37 task_id = topology.task_ordinal_at_coordinates(coordinates) 

38 if task_id not in task_and_cores_to_replicas: 

39 task_and_cores_to_replicas[task_id] = {} 

40 if logical_core not in task_and_cores_to_replicas[task_id]: 

41 task_and_cores_to_replicas[task_id][logical_core] = set() 

42 

43 task_and_cores_to_replicas[task_id][logical_core].add(replica) 

44 

45 task_to_sorted_replica_id = {} 

46 

47 for task, core_to_replicas in task_and_cores_to_replicas.items(): 

48 core_to_sorted_replicas = {} 

49 for core, replicas in core_to_replicas.items(): 

50 core_to_sorted_replicas[core] = sorted(replicas) 

51 

52 task_to_sorted_replica_id[task] = core_to_sorted_replicas 

53 return task_to_sorted_replica_id 

54 

55 

56@tf_export("tpu.experimental.DeviceAssignment") 

57class DeviceAssignment(object): 

58 """Mapping from logical cores in a computation to the physical TPU topology. 

59 

60 Prefer to use the `DeviceAssignment.build()` helper to construct a 

61 `DeviceAssignment`; it is easier if less flexible than constructing a 

62 `DeviceAssignment` directly. 

63 """ 

64 

65 def __init__(self, topology: Topology, core_assignment: np.ndarray): 

66 """Constructs a `DeviceAssignment` object. 

67 

68 Args: 

69 topology: A `Topology` object that describes the physical TPU topology. 

70 core_assignment: A logical to physical core mapping, represented as a 

71 rank 3 numpy array. See the description of the `core_assignment` 

72 property for more details. 

73 

74 Raises: 

75 ValueError: If `topology` is not `Topology` object. 

76 ValueError: If `core_assignment` is not a rank 3 numpy array. 

77 """ 

78 if not isinstance(topology, Topology): 

79 raise ValueError("topology must be a Topology object, got {}".format( 

80 type(topology))) 

81 core_assignment = np.asarray(core_assignment, dtype=np.int32) 

82 

83 self._topology = topology 

84 

85 if core_assignment.ndim != 3: 

86 raise ValueError("core_assignment must be a rank 3 numpy array, " 

87 f"got shape {core_assignment.shape}") 

88 

89 self._num_replicas = core_assignment.shape[0] 

90 self._num_cores_per_replica = core_assignment.shape[1] 

91 

92 if core_assignment.shape[-1] != topology.mesh_rank: 

93 raise ValueError( 

94 "core_assignment.shape[-1] must have size equal to topology " 

95 f"rank ({topology.mesh_rank}), got " 

96 f"core_assignment.shape={core_assignment.shape}") 

97 

98 self._core_assignment = core_assignment 

99 self._task_and_cores_to_replicas = _compute_task_and_cores_to_replicas( 

100 self._core_assignment, topology) 

101 

102 @property 

103 def topology(self) -> Topology: 

104 """A `Topology` that describes the TPU topology.""" 

105 return self._topology 

106 

107 @property 

108 def num_cores_per_replica(self) -> int: 

109 """The number of cores per replica.""" 

110 return self._num_cores_per_replica 

111 

112 @property 

113 def num_replicas(self) -> int: 

114 """The number of replicas of the computation.""" 

115 return self._num_replicas 

116 

117 @property 

118 def core_assignment(self) -> np.ndarray: 

119 """The logical to physical core mapping. 

120 

121 Returns: 

122 An integer numpy array of rank 3, with shape 

123 `[num_replicas, num_cores_per_replica, topology_rank]`. Maps 

124 (replica, logical core) pairs to physical topology coordinates. 

125 """ 

126 return self._core_assignment 

127 

128 def coordinates(self, replica: int, logical_core: int) -> Tuple: # pylint:disable=g-bare-generic 

129 """Returns the physical topology coordinates of a logical core.""" 

130 return tuple(self.core_assignment[replica, logical_core, :]) 

131 

132 def lookup_replicas(self, task_id: int, logical_core: int) -> List[int]: 

133 """Lookup replica ids by task number and logical core. 

134 

135 Args: 

136 task_id: TensorFlow task number. 

137 logical_core: An integer, identifying a logical core. 

138 Returns: 

139 A sorted list of the replicas that are attached to that task and 

140 logical_core. 

141 Raises: 

142 ValueError: If no replica exists in the task which contains the logical 

143 core. 

144 """ 

145 try: 

146 return self._task_and_cores_to_replicas[task_id][logical_core] 

147 except KeyError: 

148 raise ValueError( 

149 "Can not find any replica in task: {} contains logical_core: {} ". 

150 format(task_id, logical_core)) 

151 

152 def tpu_ordinal(self, replica: int = 0, logical_core: int = 0) -> int: 

153 """Returns the ordinal of the TPU device assigned to a logical core.""" 

154 coordinates = self.coordinates(replica, logical_core) 

155 return self._topology.tpu_device_ordinal_at_coordinates(coordinates) 

156 

157 def host_device(self, 

158 replica: int = 0, 

159 logical_core: int = 0, 

160 job: Optional[Text] = None) -> Text: 

161 """Returns the CPU device attached to a logical core.""" 

162 coordinates = self.coordinates(replica, logical_core) 

163 return self._topology.cpu_device_name_at_coordinates(coordinates, job=job) 

164 

165 def tpu_device(self, 

166 replica: int = 0, 

167 logical_core: int = 0, 

168 job: Optional[Text] = None) -> Text: 

169 """Returns the name of the TPU device assigned to a logical core.""" 

170 coordinates = self.coordinates(replica, logical_core) 

171 return self._topology.tpu_device_name_at_coordinates(coordinates, job=job) 

172 

173 @staticmethod 

174 def build(topology: Topology, 

175 computation_shape: Optional[np.ndarray] = None, 

176 computation_stride: Optional[np.ndarray] = None, 

177 num_replicas: int = 1) -> "DeviceAssignment": 

178 return device_assignment(topology, computation_shape, computation_stride, 

179 num_replicas) 

180 

181 

182def _open_ring_2d(x_size: int, y_size: int, 

183 z_coord: int) -> List[Tuple[int, int, int]]: 

184 """Ring-order of a X by Y mesh, with a fixed Z coordinate. 

185 

186 For example, in a 4x4 mesh, this returns the following order. 

187 0 -- 1 -- 2 -- 3 

188 | | | | 

189 15-- 6 -- 5 -- 4 

190 | | | | 

191 14-- 7 -- 8 -- 9 

192 | | | | 

193 13-- 12-- 11-- 10 

194 

195 Note that chip 0 is not included in the output. 

196 

197 Args: 

198 x_size: An integer represents the mesh size in the x-dimension. Must be 

199 larger than 1. 

200 y_size: An integer represents the mesh size in the y-dimension. Must be 

201 larger than 1. 

202 z_coord: An integer represents the z-coordinate to use for the chips in the 

203 ring. 

204 

205 Returns: 

206 A list of (x,y,z) triples in ring order. 

207 """ 

208 ret = [] 

209 for i in range(y_size // 2): 

210 for j in range(1, x_size): 

211 ret.append((j, 2 * i, z_coord)) 

212 for j in range(x_size - 1, 0, -1): 

213 ret.append((j, 2 * i + 1, z_coord)) 

214 for i in range(y_size - 1, 0, -1): 

215 ret.append((0, i, z_coord)) 

216 return ret 

217 

218 

219def _ring_3d(x_size: int, y_size: int, 

220 z_size: int) -> List[Tuple[int, int, int]]: 

221 """Ring-order of a X by Y by Z mesh. 

222 

223 Constructs the 3d ring from 2d rings that are stacked in the Z dimension and 

224 joined in one corner. 

225 

226 z == 0: 

227 0 -- 1 -- 2 -- 3 

228 | | | | 

229 15 - 6 -- 5 -- 4 

230 | | | | 

231 14 - 7 -- 8 -- 9 

232 | | | | 

233 13 - 12 - 11 - 10 

234 z == 1: 

235 63 - 30 - 29 - 28 

236 | | | | 

237 16 - 25 - 26 - 27 

238 | | | | 

239 17 - 24 - 23 - 22 

240 | | | | 

241 18 - 19 - 20 - 21 

242 z == 2: 

243 62 - 31 - 32 - 33 

244 | | | | 

245 45 - 36 - 35 - 34 

246 | | | | 

247 44 - 37 - 38 - 39 

248 | | | | 

249 43 - 42 - 41 - 40 

250 z == 3: 

251 61 - 60 - 59 - 58 

252 | | | | 

253 46 - 55 - 56 - 57 

254 | | | | 

255 47 - 54 - 53 - 52 

256 | | | | 

257 48 - 49 - 50 - 51 

258 

259 Args: 

260 x_size: An integer represents the mesh size in the x-dimension. Must be 

261 larger than 1. 

262 y_size: An integer represents the mesh size in the y-dimension. Must be 

263 larger than 1. 

264 z_size: An integer represents the mesh size in the z-dimension. Must be 

265 larger than 1. For example, in a 4x4x4 mesh, this returns the following 

266 order. 

267 

268 Returns: 

269 A list of (x,y,z) triples in ring order. 

270 """ 

271 

272 # Handle the case where 2 dimensions are size 1. 

273 if x_size == 1 and y_size == 1: 

274 return [(0, 0, i) for i in range(z_size)] 

275 if x_size == 1 and z_size == 1: 

276 return [(0, i, 0) for i in range(y_size)] 

277 if y_size == 1 and z_size == 1: 

278 return [(i, 0, 0) for i in range(x_size)] 

279 

280 # Handle odd mesh dimensions. This never happens in practice, so we don't 

281 # bother to try building something optimal. 

282 if (x_size > 1 and x_size % 2 != 0) or (y_size > 1 and 

283 y_size % 2 != 0) or (z_size > 1 and 

284 z_size % 2 != 0): 

285 logging.warning("Odd dimension") 

286 ret = [] 

287 for z in range(z_size): 

288 for y in range(y_size): 

289 ret.extend((x, y, z) for x in range(x_size)) 

290 return ret 

291 

292 # Always start with chip 0. 

293 ret = [(0, 0, 0)] 

294 # Handle the case where one dimension is size 1. We just build a flat, 2d 

295 # ring. 

296 if z_size == 1: 

297 ret.extend(_open_ring_2d(x_size, y_size, 0)) 

298 return ret 

299 if y_size == 1: 

300 ret = [(0, 0, 0)] 

301 ret.extend((x, y, z) for (x, z, y) in _open_ring_2d(x_size, z_size, 0)) 

302 return ret 

303 if x_size == 1: 

304 ret = [(0, 0, 0)] 

305 ret.extend((x, y, z) for (y, z, x) in _open_ring_2d(y_size, z_size, 0)) 

306 return ret 

307 

308 # Handle the case where all dimensions have size > 1 and even. 

309 ret = [(0, 0, 0)] 

310 for i in range(0, z_size): 

311 r = _open_ring_2d(x_size, y_size, i) 

312 if i % 2 == 0: 

313 ret.extend(r) 

314 else: 

315 ret.extend(reversed(r)) 

316 for i in range(z_size - 1, 0, -1): 

317 ret.append((0, 0, i)) 

318 return ret 

319 

320 

321class DeviceOrderMode(enum.IntEnum): 

322 """The way of determining device orders when computing device assignment.""" 

323 # By default the mode is set to AUTO, the library will choose to form rings 

324 # when that is possible. 

325 AUTO = 0 

326 # Form rings for replicas and model-parallel cores. 

327 RING = 1 

328 # Form meshes for replicas and/or model-parallel cores. 

329 MESH = 2 

330 

331 

332def device_assignment( 

333 topology: Topology, 

334 computation_shape: Optional[np.ndarray] = None, 

335 computation_stride: Optional[np.ndarray] = None, 

336 num_replicas: int = 1, 

337 device_order_mode: DeviceOrderMode = DeviceOrderMode.AUTO 

338) -> DeviceAssignment: 

339 """Computes a device_assignment of a computation across a TPU topology. 

340 

341 Attempts to choose a compact grid of cores for locality. 

342 

343 Returns a `DeviceAssignment` that describes the cores in the topology assigned 

344 to each core of each replica. 

345 

346 `computation_shape` and `computation_stride` values should be powers of 2 for 

347 optimal packing. 

348 

349 Args: 

350 topology: A `Topology` object that describes the TPU cluster topology. To 

351 obtain a TPU topology, evaluate the `Tensor` returned by 

352 `initialize_system` using `Session.run`. Either a serialized 

353 `TopologyProto` or a `Topology` object may be passed. Note: you must 

354 evaluate the `Tensor` first; you cannot pass an unevaluated `Tensor` 

355 here. 

356 computation_shape: A rank 1 int32 numpy array with size equal to the 

357 topology rank, describing the shape of the computation's block of cores. 

358 If None, the `computation_shape` is `[1] * topology_rank`. 

359 computation_stride: A rank 1 int32 numpy array of size `topology_rank`, 

360 describing the inter-core spacing of the `computation_shape` cores in the 

361 TPU topology. If None, the `computation_stride` is `[1] * topology_rank`. 

362 num_replicas: The number of computation replicas to run. The replicas will 

363 be packed into the free spaces of the topology. 

364 device_order_mode: An enum of `DeviceOrderMode` class which indicates 

365 whether to assign devices to form rings or meshes, or let the library to 

366 choose. 

367 

368 Returns: 

369 A DeviceAssignment object, which describes the mapping between the logical 

370 cores in each computation replica and the physical cores in the TPU 

371 topology. 

372 

373 Raises: 

374 ValueError: If `topology` is not a valid `Topology` object. 

375 ValueError: If `computation_shape` or `computation_stride` are not 1D int32 

376 numpy arrays with shape [3] where all values are positive. 

377 ValueError: If computation's replicas cannot fit into the TPU topology. 

378 """ 

379 # Deserialize the Topology proto, if it is a string. 

380 if isinstance(topology, bytes): 

381 topology = Topology(serialized=topology) 

382 

383 if not isinstance(topology, Topology): 

384 raise ValueError( 

385 f"`topology` is not a Topology object; got {type(topology)}") 

386 

387 topology_rank = len(topology.mesh_shape) 

388 mesh_shape = topology.mesh_shape 

389 if computation_shape is None: 

390 computation_shape = np.array([1] * topology_rank, dtype=np.int32) 

391 else: 

392 computation_shape = np.asarray(computation_shape, dtype=np.int32) 

393 

394 if computation_stride is None: 

395 computation_stride = np.array([1] * topology_rank, dtype=np.int32) 

396 else: 

397 computation_stride = np.asarray(computation_stride, dtype=np.int32) 

398 

399 if computation_shape.shape != (topology_rank,): 

400 raise ValueError( 

401 f"computation_shape must have shape [{topology_rank}]; " 

402 f"got {computation_shape.shape}" 

403 ) 

404 if computation_stride.shape != (topology_rank,): 

405 raise ValueError( 

406 f"computation_stride must have shape [{topology_rank}]; " 

407 f"got {computation_stride.shape}" 

408 ) 

409 

410 if any(computation_shape < 1): 

411 raise ValueError( 

412 "computation_shape must be positive; got computation_shape={}".format( 

413 computation_shape)) 

414 if any(computation_stride < 1): 

415 raise ValueError( 

416 "computation_stride must be positive; got computation_stride={}".format( 

417 computation_stride)) 

418 

419 # Computes the physical size of one computation instance. 

420 computation_footprint = computation_shape * computation_stride 

421 if any(computation_footprint > mesh_shape): 

422 raise ValueError( 

423 "computation footprint {} does not fit in TPU topology shape {}".format( 

424 computation_footprint, mesh_shape)) 

425 

426 # Computes how many copies of the computation footprint fit in the mesh. 

427 block_counts = mesh_shape // computation_footprint 

428 

429 replica_counts = block_counts * computation_stride 

430 max_replicas = np.prod(replica_counts) 

431 if num_replicas > max_replicas: 

432 raise ValueError( 

433 "requested {} replicas but only {} replicas with shape {} and " 

434 "computation_stride {} fit in a TPU mesh of shape {}".format( 

435 num_replicas, max_replicas, computation_shape, computation_stride, 

436 mesh_shape)) 

437 

438 def ceil_of_ratio(n, m): 

439 return (n + m - 1) // m 

440 

441 if topology.missing_devices.size == 0: 

442 replica_shape = [0] * topology_rank 

443 if num_replicas > 0: 

444 remaining_replicas = num_replicas 

445 remaining_dims = topology_rank 

446 

447 # Choose dimensions as close to an equal cube as possible, 

448 # in order of increasing dimension size. By visiting dimensions 

449 # in increasing size, we assign the most constrained dimension 

450 # first, so we won't make infeasible choices. 

451 # 

452 # As a secondary sort order, visit the last dimension (core index) first, 

453 # then the other dimensions in increasing order. This means we try to use 

454 # both cores on the same chip in preference to two cores on different 

455 # chips. We visit the x dimension first, and the z dimension last, so 

456 # that we prefer to arrange adjacent replicas on the same machine when 

457 # possible. 

458 # 

459 # For example, if num_replicas == 4, we prefer to use a replica_shape of 

460 # (2,1,1,2) over (1,1,2,2). 

461 

462 for x, ni in sorted(((x, ((i + 1) % topology_rank)) 

463 for (i, x) in enumerate(replica_counts))): 

464 i = (ni + topology_rank - 1) % topology_rank 

465 target_size = int(math.ceil(remaining_replicas**(1.0 / remaining_dims))) 

466 replica_shape[i] = min(target_size, x) 

467 remaining_replicas = ceil_of_ratio(remaining_replicas, replica_shape[i]) 

468 remaining_dims -= 1 

469 

470 assert remaining_replicas == 1 and remaining_dims == 0 

471 

472 # Assigns an offset to each replica such that no two replicas overlap. 

473 replica_offsets = np.full([num_replicas, topology_rank], -1, dtype=np.int32) 

474 

475 enable_3d_tiling = ( 

476 topology_rank == 4 and 

477 computation_shape[-1] == mesh_shape[-1] # Only handle 3D case. 

478 and np.prod(computation_stride) == 1 # Ensure no stride. 

479 and num_replicas == max_replicas) # Full replication. 

480 

481 if device_order_mode != DeviceOrderMode.AUTO: 

482 if device_order_mode == DeviceOrderMode.RING and not enable_3d_tiling: 

483 raise ValueError( 

484 "device_order_mode=DeviceOrderMode.RING is not compatible with the " 

485 "3D tiling current topology. Try setting " 

486 "device_order_mode=DeviceOrderMode.AUTO" 

487 ) 

488 enable_3d_tiling = device_order_mode == DeviceOrderMode.RING 

489 

490 if enable_3d_tiling: 

491 assignment = [] 

492 inner_ring = _ring_3d(computation_shape[0], computation_shape[1], 

493 computation_shape[2]) 

494 outer_ring = _ring_3d(replica_shape[0], replica_shape[1], 

495 replica_shape[2]) 

496 

497 for replica in range(num_replicas): 

498 outer_x, outer_y, outer_z = outer_ring[replica] 

499 per_replica_assignment = [] 

500 for index in range(np.prod(computation_shape)): 

501 inner_x, inner_y, inner_z = inner_ring[index // mesh_shape[-1]] 

502 px = outer_x * computation_shape[0] + inner_x 

503 py = outer_y * computation_shape[1] + inner_y 

504 pz = outer_z * computation_shape[2] + inner_z 

505 pi = index % mesh_shape[-1] 

506 per_replica_assignment.append([px, py, pz, pi]) 

507 assignment.append(per_replica_assignment) 

508 else: 

509 for replica in range(num_replicas): 

510 # Chooses a replica number in each axis. 

511 t = replica 

512 pos = [] 

513 # Visit the core number first. 

514 for dim in np.concatenate([[replica_shape[-1]], replica_shape[:-1]]): 

515 pos.append(t % dim) 

516 t //= dim 

517 replica_pos = np.concatenate([pos[1:], [pos[0]]]) 

518 

519 # Determines where that replica starts in each axis. 

520 outer = replica_pos // computation_stride 

521 inner = replica_pos % computation_stride 

522 replica_offsets[replica, :] = outer * computation_footprint + inner 

523 

524 # Computes a logical core -> physical core mapping for each replica. 

525 indices = [ 

526 np.arange(0, computation_shape[i] * computation_stride[i], 

527 computation_stride[i]) for i in range(topology_rank) 

528 ] 

529 indices = np.concatenate( 

530 [i[..., np.newaxis] for i in np.meshgrid(*indices, indexing="ij")], 

531 axis=-1) 

532 indices = indices.reshape((-1, topology_rank)) 

533 assignment = indices + replica_offsets[:, np.newaxis, :] 

534 else: 

535 # We have a slice with missing chips. We define a simple assignment by 

536 # ignoring computation stride. This assignment should enable a consistent 

537 # and correct device assignment on degraded slices. It is optimal when 

538 # weights are not sharded. But this device assignment may be sub-optimal for 

539 # other model parallelism scenarios. 

540 assert np.prod(computation_stride) == 1 

541 # Next, we check if we have sufficient devices. 

542 assert num_replicas * np.prod( 

543 computation_shape) <= topology.num_tasks * topology.num_tpus_per_task 

544 # Map replicas to physical devices in task order. 

545 device_coordinates = topology.device_coordinates 

546 assignment = [] 

547 devices_per_replica = np.prod(computation_shape) 

548 for rindex in range(num_replicas): 

549 replica_assignment = [] 

550 for index in range(devices_per_replica): 

551 logical_id = rindex * devices_per_replica + index 

552 # Pick logical cores in task order 

553 task = logical_id // topology.num_tpus_per_task 

554 device = logical_id % topology.num_tpus_per_task 

555 # Append physical cores to the replica assignment 

556 replica_assignment.append(device_coordinates[task, device, :]) 

557 assignment.append(replica_assignment) 

558 

559 return DeviceAssignment(topology, core_assignment=assignment)