Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py: 38%

182 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Cluster Resolvers are used for dynamic cluster IP/hostname resolution.""" 

16 

17import abc 

18 

19import collections 

20 

21import six 

22 

23from tensorflow.python.client import session 

24from tensorflow.python.eager import context 

25from tensorflow.python.framework import config 

26from tensorflow.python.framework import ops 

27from tensorflow.python.training.server_lib import ClusterSpec 

28from tensorflow.python.util.tf_export import tf_export 

29 

30 

31def format_master_url(master, rpc_layer=None): 

32 if rpc_layer: 

33 return '%s://%s' % (rpc_layer, master) 

34 else: 

35 return master 

36 

37 

38def get_accelerator_devices(master, config_proto): 

39 """Returns accelerator devices given a master and a configuration.""" 

40 if context.executing_eagerly(): 

41 logical_devices = config.list_logical_devices() 

42 devices = [] 

43 for d in logical_devices: 

44 if d.device_type == 'CPU' or d.device_type == 'XLA_CPU': # Filter CPUs 

45 continue 

46 devices.append(session._DeviceAttributes(d.name, d.device_type, 0, 0)) # pylint: disable=protected-access 

47 return devices 

48 else: 

49 with ops.Graph().as_default(): 

50 with session.Session(master, config=config_proto) as s: 

51 devices = s.list_devices() 

52 return devices 

53 

54 

55@tf_export('distribute.cluster_resolver.ClusterResolver') 

56@six.add_metaclass(abc.ABCMeta) 

57class ClusterResolver(object): 

58 """Abstract class for all implementations of ClusterResolvers. 

59 

60 This defines the skeleton for all implementations of ClusterResolvers. 

61 ClusterResolvers are a way for TensorFlow to communicate with various cluster 

62 management systems (e.g. GCE, AWS, etc...) and gives TensorFlow necessary 

63 information to set up distributed training. 

64 

65 By letting TensorFlow communicate with these systems, we will be able to 

66 automatically discover and resolve IP addresses for various TensorFlow 

67 workers. This will eventually allow us to automatically recover from 

68 underlying machine failures and scale TensorFlow worker clusters up and down. 

69 

70 Note to Implementors of `tf.distribute.cluster_resolver.ClusterResolver` 

71 subclass: In addition to these abstract methods, when task_type, task_id, and 

72 rpc_layer attributes are applicable, you should also implement them either as 

73 properties with getters or setters, or directly set the attributes 

74 `self._task_type`, `self._task_id`, or `self._rpc_layer` so the base class' 

75 getters and setters are used. See 

76 `tf.distribute.cluster_resolver.SimpleClusterResolver.__init__` for an 

77 example. 

78 

79 In general, multi-client tf.distribute strategies such as 

80 `tf.distribute.experimental.MultiWorkerMirroredStrategy` require task_type and 

81 task_id properties to be available in the `ClusterResolver` they are using. On 

82 the other hand, these concepts are not applicable in single-client strategies, 

83 such as `tf.distribute.experimental.TPUStrategy`, because the program is only 

84 expected to be run on one task, so there should not be a need to have code 

85 branches according to task type and task id. 

86 

87 - task_type is the name of the server's current named job (e.g. 'worker', 

88 'ps' in a distributed parameterized training job). 

89 - task_id is the ordinal index of the server within the task type. 

90 - rpc_layer is the protocol used by TensorFlow to communicate with other 

91 TensorFlow servers in a distributed environment. 

92 """ 

93 

94 @abc.abstractmethod 

95 def cluster_spec(self): 

96 """Retrieve the current state of the cluster and return a `tf.train.ClusterSpec`. 

97 

98 Returns: 

99 A `tf.train.ClusterSpec` representing the state of the cluster at the 

100 moment this function is called. 

101 

102 Implementors of this function must take care in ensuring that the 

103 ClusterSpec returned is up-to-date at the time of calling this function. 

104 This usually means retrieving the information from the underlying cluster 

105 management system every time this function is invoked and reconstructing 

106 a cluster_spec, rather than attempting to cache anything. 

107 """ 

108 raise NotImplementedError() 

109 

110 @abc.abstractmethod 

111 def master(self, task_type=None, task_id=None, rpc_layer=None): 

112 """Retrieves the name or URL of the session master. 

113 

114 Note: this is only useful for TensorFlow 1.x. 

115 

116 Args: 

117 task_type: (Optional) The type of the TensorFlow task of the master. 

118 task_id: (Optional) The index of the TensorFlow task of the master. 

119 rpc_layer: (Optional) The RPC protocol for the given cluster. 

120 

121 Returns: 

122 The name or URL of the session master. 

123 

124 Implementors of this function must take care in ensuring that the master 

125 returned is up-to-date at the time to calling this function. This usually 

126 means retrieving the master every time this function is invoked. 

127 """ 

128 raise NotImplementedError() 

129 

130 def num_accelerators(self, 

131 task_type=None, 

132 task_id=None, 

133 config_proto=None): 

134 """Returns the number of accelerator cores per worker. 

135 

136 This returns the number of accelerator cores (such as GPUs and TPUs) 

137 available per worker. 

138 

139 Optionally, we allow callers to specify the task_type, and task_id, for 

140 if they want to target a specific TensorFlow task to query 

141 the number of accelerators. This is to support heterogenous environments, 

142 where the number of accelerators cores per host is different. 

143 

144 Args: 

145 task_type: (Optional) The type of the TensorFlow task of the machine we 

146 want to query. 

147 task_id: (Optional) The index of the TensorFlow task of the machine we 

148 want to query. 

149 config_proto: (Optional) Configuration for starting a new session to 

150 query how many accelerator cores it has. 

151 

152 Returns: 

153 A map of accelerator types to number of cores. 

154 """ 

155 master = self.master(task_type, task_id) 

156 # TODO(b/126786766): in eager mode, we should check whether 

157 # `tf.config.experimental_connect_to_cluster` is called or not. 

158 devices = get_accelerator_devices(master, config_proto) 

159 mapping = collections.defaultdict(int) 

160 for device in devices: 

161 if task_type is not None and task_id is not None: 

162 job_path = '/job:%s' % task_type 

163 task_path = '/task:%s' % task_id 

164 if job_path not in device.name or task_path not in device.name: 

165 continue 

166 mapping[device.device_type] += 1 

167 return mapping 

168 

169 @property 

170 def environment(self): 

171 """Returns the current environment which TensorFlow is running in. 

172 

173 There are two possible return values, "google" (when TensorFlow is running 

174 in a Google-internal environment) or an empty string (when TensorFlow is 

175 running elsewhere). 

176 

177 If you are implementing a ClusterResolver that works in both the Google 

178 environment and the open-source world (for instance, a TPU ClusterResolver 

179 or similar), you will have to return the appropriate string depending on the 

180 environment, which you will have to detect. 

181 

182 Otherwise, if you are implementing a ClusterResolver that will only work 

183 in open-source TensorFlow, you do not need to implement this property. 

184 """ 

185 return '' 

186 

187 @property 

188 def task_type(self): 

189 """Returns the task type this `ClusterResolver` indicates. 

190 

191 In TensorFlow distributed environment, each job may have an applicable 

192 task type. Valid task types in TensorFlow include 

193 'chief': a worker that is designated with more responsibility, 

194 'worker': a regular worker for training/evaluation, 

195 'ps': a parameter server, or 

196 'evaluator': an evaluator that evaluates the checkpoints for metrics. 

197 

198 See [Multi-worker configuration]( 

199 https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration) 

200 for more information about 'chief' and 'worker' task type, which are most 

201 commonly used. 

202 

203 Having access to such information is useful when user needs to run specific 

204 code according to task types. For example, 

205 

206 ```python 

207 cluster_spec = tf.train.ClusterSpec({ 

208 "ps": ["localhost:2222", "localhost:2223"], 

209 "worker": ["localhost:2224", "localhost:2225", "localhost:2226"] 

210 }) 

211 

212 # SimpleClusterResolver is used here for illustration; other cluster 

213 # resolvers may be used for other source of task type/id. 

214 simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker", 

215 task_id=1) 

216 

217 ... 

218 

219 if cluster_resolver.task_type == 'worker': 

220 # Perform something that's only applicable on workers. This block 

221 # will run on this particular instance since we've specified this task to 

222 # be a worker in above cluster resolver. 

223 elif cluster_resolver.task_type == 'ps': 

224 # Perform something that's only applicable on parameter servers. This 

225 # block will not run on this particular instance. 

226 ``` 

227 

228 Returns `None` if such information is not available or is not applicable 

229 in the current distributed environment, such as training with 

230 `tf.distribute.experimental.TPUStrategy`. 

231 

232 For more information, please see 

233 `tf.distribute.cluster_resolver.ClusterResolver`'s class doc. 

234 """ 

235 return getattr(self, '_task_type', None) 

236 

237 @property 

238 def task_id(self): 

239 """Returns the task id this `ClusterResolver` indicates. 

240 

241 In TensorFlow distributed environment, each job may have an applicable 

242 task id, which is the index of the instance within its task type. This is 

243 useful when user needs to run specific code according to task index. For 

244 example, 

245 

246 ```python 

247 cluster_spec = tf.train.ClusterSpec({ 

248 "ps": ["localhost:2222", "localhost:2223"], 

249 "worker": ["localhost:2224", "localhost:2225", "localhost:2226"] 

250 }) 

251 

252 # SimpleClusterResolver is used here for illustration; other cluster 

253 # resolvers may be used for other source of task type/id. 

254 simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker", 

255 task_id=0) 

256 

257 ... 

258 

259 if cluster_resolver.task_type == 'worker' and cluster_resolver.task_id == 0: 

260 # Perform something that's only applicable on 'worker' type, id 0. This 

261 # block will run on this particular instance since we've specified this 

262 # task to be a 'worker', id 0 in above cluster resolver. 

263 else: 

264 # Perform something that's only applicable on other ids. This block will 

265 # not run on this particular instance. 

266 ``` 

267 

268 Returns `None` if such information is not available or is not applicable 

269 in the current distributed environment, such as training with 

270 `tf.distribute.cluster_resolver.TPUClusterResolver`. 

271 

272 For more information, please see 

273 `tf.distribute.cluster_resolver.ClusterResolver`'s class docstring. 

274 """ 

275 return getattr(self, '_task_id', None) 

276 

277 @task_type.setter 

278 def task_type(self, task_type): 

279 """Setter of `task_type` property. See `task_type` property doc.""" 

280 self._task_type = task_type 

281 

282 @task_id.setter 

283 def task_id(self, task_id): 

284 """Setter of `task_id` property. See `task_type` property doc.""" 

285 self._task_id = task_id 

286 

287 

288@tf_export('distribute.cluster_resolver.SimpleClusterResolver') 

289class SimpleClusterResolver(ClusterResolver): 

290 """Simple implementation of ClusterResolver that accepts all attributes. 

291 

292 Please see the base class for documentation of arguments of its constructor. 

293 

294 It is useful if you want to specify some or all attributes. 

295 

296 Usage example with `tf.distribute.Strategy`: 

297 

298 ```Python 

299 cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", 

300 "worker1.example.com:2222"]}) 

301 

302 # On worker 0 

303 cluster_resolver = SimpleClusterResolver(cluster, task_type="worker", 

304 task_id=0, 

305 num_accelerators={"GPU": 8}, 

306 rpc_layer="grpc") 

307 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( 

308 cluster_resolver=cluster_resolver) 

309 

310 # On worker 1 

311 cluster_resolver = SimpleClusterResolver(cluster, task_type="worker", 

312 task_id=1, 

313 num_accelerators={"GPU": 8}, 

314 rpc_layer="grpc") 

315 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( 

316 cluster_resolver=cluster_resolver) 

317 ``` 

318 """ 

319 

320 def __init__(self, cluster_spec, master='', task_type=None, task_id=None, 

321 environment='', num_accelerators=None, 

322 rpc_layer=None): 

323 """Creates a SimpleClusterResolver from a ClusterSpec.""" 

324 super(SimpleClusterResolver, self).__init__() 

325 

326 self._task_type = task_type 

327 self._task_id = task_id 

328 self._environment = environment 

329 

330 self._num_accelerators = num_accelerators 

331 self._rpc_layer = rpc_layer 

332 

333 if not isinstance(cluster_spec, ClusterSpec): 

334 raise TypeError('cluster_spec must be a `tf.train.ClusterSpec`.') 

335 self._cluster_spec = cluster_spec 

336 

337 if not isinstance(master, str): 

338 raise TypeError('master must be a string.') 

339 self._master = master 

340 

341 def cluster_spec(self): 

342 """Returns the ClusterSpec passed into the constructor.""" 

343 return self._cluster_spec 

344 

345 def master(self, task_type=None, task_id=None, rpc_layer=None): 

346 """Returns the master address to use when creating a session. 

347 

348 Note: this is only useful for TensorFlow 1.x. 

349 

350 Args: 

351 task_type: (Optional) The type of the TensorFlow task of the master. 

352 task_id: (Optional) The index of the TensorFlow task of the master. 

353 rpc_layer: (Optional) The RPC used by distributed TensorFlow. 

354 

355 Returns: 

356 The name or URL of the session master. 

357 

358 If a task_type and task_id is given, this will override the `master` 

359 string passed into the initialization function. 

360 """ 

361 if task_type is not None and task_id is not None: 

362 master = self.cluster_spec().task_address(task_type, task_id) 

363 else: 

364 master = self._master 

365 

366 return format_master_url(master, rpc_layer=rpc_layer or self._rpc_layer) 

367 

368 @property 

369 def task_type(self): 

370 return self._task_type 

371 

372 @property 

373 def task_id(self): 

374 return self._task_id 

375 

376 @task_type.setter 

377 def task_type(self, task_type): 

378 self._task_type = task_type 

379 

380 @task_id.setter 

381 def task_id(self, task_id): 

382 self._task_id = task_id 

383 

384 @property 

385 def environment(self): 

386 return self._environment 

387 

388 def num_accelerators(self, 

389 task_type=None, 

390 task_id=None, 

391 config_proto=None): 

392 """Returns the number of accelerator cores per worker. 

393 

394 The SimpleClusterResolver does not do automatic detection of accelerators, 

395 and thus all arguments are unused and we simply return the value provided 

396 in the constructor. 

397 

398 Args: 

399 task_type: Unused. 

400 task_id: Unused. 

401 config_proto: Unused. 

402 """ 

403 # Unused 

404 del task_type, task_id, config_proto 

405 if self._num_accelerators is None: 

406 return {} 

407 return self._num_accelerators 

408 

409 @property 

410 def rpc_layer(self): 

411 return self._rpc_layer 

412 

413 @rpc_layer.setter 

414 def rpc_layer(self, rpc_layer): 

415 self._rpc_layer = rpc_layer 

416 

417 

418@tf_export('distribute.cluster_resolver.UnionResolver') 

419class UnionClusterResolver(ClusterResolver): 

420 """Performs a union on underlying ClusterResolvers. 

421 

422 This class performs a union given two or more existing ClusterResolvers. It 

423 merges the underlying ClusterResolvers, and returns one unified ClusterSpec 

424 when cluster_spec is called. The details of the merge function is 

425 documented in the cluster_spec function. 

426 

427 For additional ClusterResolver properties such as task type, task index, 

428 rpc layer, environment, etc..., we will return the value from the first 

429 ClusterResolver in the union. 

430 

431 An example to combine two cluster resolvers: 

432 

433 ```Python 

434 cluster_0 = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", 

435 "worker1.example.com:2222"]}) 

436 cluster_resolver_0 = SimpleClusterResolver(cluster, task_type="worker", 

437 task_id=0, 

438 rpc_layer="grpc") 

439 

440 cluster_1 = tf.train.ClusterSpec({"ps": ["ps0.example.com:2222", 

441 "ps1.example.com:2222"]}) 

442 cluster_resolver_1 = SimpleClusterResolver(cluster, task_type="ps", 

443 task_id=0, 

444 rpc_layer="grpc") 

445 

446 # Its task type would be "worker". 

447 cluster_resolver = UnionClusterResolver(cluster_resolver_0, 

448 cluster_resolver_1) 

449 ``` 

450 

451 An example to override the number of GPUs in a TFConfigClusterResolver 

452 instance: 

453 

454 ```Python 

455 tf_config = TFConfigClusterResolver() 

456 gpu_override = SimpleClusterResolver(tf_config.cluster_spec(), 

457 num_accelerators={"GPU": 1}) 

458 cluster_resolver = UnionResolver(gpu_override, tf_config) 

459 ``` 

460 """ 

461 

462 def __init__(self, *args, **kwargs): 

463 """Initializes a UnionClusterResolver with other ClusterResolvers. 

464 

465 Args: 

466 *args: `ClusterResolver` objects to be unionized. 

467 **kwargs: 

468 rpc_layer - (Optional) Override value for the RPC layer used by 

469 TensorFlow. 

470 task_type - (Optional) Override value for the current task type. 

471 task_id - (Optional) Override value for the current task index. 

472 

473 Raises: 

474 TypeError: If any argument is not a subclass of `ClusterResolvers`. 

475 ValueError: If there are no arguments passed. 

476 """ 

477 super(UnionClusterResolver, self).__init__() 

478 

479 self._rpc_layer = kwargs.pop('rpc_layer', None) 

480 self._task_type = kwargs.pop('task_type', None) 

481 self._task_id = kwargs.pop('task_id', None) 

482 

483 if kwargs: 

484 raise ValueError('Unexpected kwargs provided {!r}'.format(kwargs)) 

485 

486 if not args: 

487 raise ValueError('At least one ClusterResolver is required.') 

488 

489 for cluster_resolver in args: 

490 if not isinstance(cluster_resolver, ClusterResolver): 

491 raise TypeError('All arguments must be a sub-class of ' 

492 '`ClusterResolver.`') 

493 self._cluster_resolvers = args 

494 

495 def cluster_spec(self): 

496 """Returns a union of all the ClusterSpecs from the ClusterResolvers. 

497 

498 Returns: 

499 A ClusterSpec containing host information merged from all the underlying 

500 ClusterResolvers. 

501 

502 Raises: 

503 KeyError: If there are conflicting keys detected when merging two or 

504 more dictionaries, this exception is raised. 

505 

506 Note: If there are multiple ClusterResolvers exposing ClusterSpecs with the 

507 same job name, we will merge the list/dict of workers. 

508 

509 If *all* underlying ClusterSpecs expose the set of workers as lists, we will 

510 concatenate the lists of workers, starting with the list of workers from 

511 the first ClusterResolver passed into the constructor. 

512 

513 If *any* of the ClusterSpecs expose the set of workers as a dict, we will 

514 treat all the sets of workers as dicts (even if they are returned as lists) 

515 and will only merge them into a dict if there is no conflicting keys. If 

516 there is a conflicting key, we will raise a `KeyError`. 

517 """ 

518 

519 merged_cluster = {} 

520 

521 # We figure out whether it is all lists for a particular job, or whether 

522 # there are dicts inside. 

523 for cluster_resolver in self._cluster_resolvers: 

524 cluster_spec = cluster_resolver.cluster_spec() 

525 cluster_dict = cluster_spec.as_dict() 

526 

527 for job_name, tasks in cluster_dict.items(): 

528 if job_name in merged_cluster: 

529 # If we see a dict, then we write a dict out regardless. 

530 if isinstance(tasks, dict): 

531 merged_cluster[job_name] = {} 

532 else: 

533 # We take whichever type is present. 

534 if isinstance(tasks, list): 

535 merged_cluster[job_name] = [] 

536 else: 

537 merged_cluster[job_name] = {} 

538 

539 # We then do the merge as appropriate in merged_cluster[job]. 

540 for cluster_resolver in self._cluster_resolvers: 

541 cluster_spec = cluster_resolver.cluster_spec() 

542 cluster_dict = cluster_spec.as_dict() 

543 

544 for job_name, tasks in cluster_dict.items(): 

545 if isinstance(merged_cluster[job_name], list): 

546 # We all have lists, we can just concatenate and be done. 

547 merged_cluster[job_name].extend(tasks) 

548 else: 

549 if isinstance(tasks, list): 

550 # We convert to a dictionary if the type is a list. 

551 task_dict = dict(zip(range(0, len(tasks)), tasks)) 

552 else: 

553 # We can simply make a copy (for update) and be done. 

554 task_dict = tasks.copy() 

555 

556 # We detect if there are duplicates, and raise an error if so. 

557 task_keys = set(task_dict) 

558 merged_keys = set(merged_cluster[job_name].keys()) 

559 intersected_keys = task_keys.intersection(merged_keys) 

560 if intersected_keys: 

561 raise KeyError('Duplicate keys detected when merging two ' 

562 'ClusterSpecs: %s' % repr(intersected_keys)) 

563 

564 # We do the merge after all the processing. 

565 merged_cluster[job_name].update(task_dict) 

566 

567 return ClusterSpec(merged_cluster) 

568 

569 def master(self, task_type=None, task_id=None, rpc_layer=None): 

570 """Returns the master address to use when creating a session. 

571 

572 This usually returns the master from the first ClusterResolver passed in, 

573 but you can override this by specifying the task_type and task_id. 

574 

575 Note: this is only useful for TensorFlow 1.x. 

576 

577 Args: 

578 task_type: (Optional) The type of the TensorFlow task of the master. 

579 task_id: (Optional) The index of the TensorFlow task of the master. 

580 rpc_layer: (Optional) The RPC protocol for the given cluster. 

581 

582 Returns: 

583 The name or URL of the session master. 

584 """ 

585 if task_type is not None and task_id is not None: 

586 master = self.cluster_spec().task_address(task_type, task_id) 

587 return format_master_url(master, rpc_layer or self._rpc_layer) 

588 

589 return self._cluster_resolvers[0].master(rpc_layer=rpc_layer) 

590 

591 @property 

592 def task_type(self): 

593 return self._task_type or self._cluster_resolvers[0].task_type 

594 

595 @property 

596 def task_id(self): 

597 return self._task_id or self._cluster_resolvers[0].task_id 

598 

599 @task_type.setter 

600 def task_type(self, task_type): 

601 self._task_type = task_type 

602 

603 @task_id.setter 

604 def task_id(self, task_id): 

605 self._task_id = task_id 

606 

607 @property 

608 def environment(self): 

609 return self._cluster_resolvers[0].environment 

610 

611 def num_accelerators(self, 

612 task_type=None, 

613 task_id=None, 

614 config_proto=None): 

615 return self._cluster_resolvers[0].num_accelerators( 

616 task_type, task_id, config_proto) 

617 

618 @property 

619 def rpc_layer(self): 

620 return self._rpc_layer or self._cluster_resolvers[0].rpc_layer 

621 

622 @rpc_layer.setter 

623 def rpc_layer(self, rpc_layer): 

624 self._rpc_layer = rpc_layer