Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/distribute/cluster_resolver/slurm_cluster_resolver.py: 16%

158 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2018-2020 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Implementation of Cluster Resolvers for Slurm workload manager.""" 

16 

17import os 

18import re 

19import subprocess 

20 

21from tensorflow.python.distribute.cluster_resolver.cluster_resolver import ClusterResolver 

22from tensorflow.python.distribute.cluster_resolver.cluster_resolver import format_master_url 

23from tensorflow.python.training.server_lib import ClusterSpec 

24from tensorflow.python.util.tf_export import tf_export 

25 

26 

27def expand_hostlist(hostlist): 

28 """Create a list of hosts out of a SLURM hostlist. 

29 

30 The order of nodes is preserved and no deduplication is done 

31 Input: 'n[1-2],m5,o[3-4,6,7-9]') 

32 Output: ['n1', 'n2', 'm5', 'o3', 'o4', 'o6', 'o7', 'o8', 'o9'] 

33 """ 

34 

35 def split_hostlist(hostlist): 

36 """Split hostlist at commas outside of range expressions ('[3-5]').""" 

37 in_brackets = False 

38 cur_host = '' 

39 for c in hostlist: 

40 if in_brackets: 

41 assert c != '[' 

42 if c == ']': 

43 in_brackets = False 

44 elif c == '[': 

45 in_brackets = True 

46 elif c == ',': 

47 assert cur_host != '' 

48 yield cur_host 

49 cur_host = '' 

50 continue 

51 cur_host += c 

52 if cur_host: 

53 yield cur_host 

54 

55 def expand_range_expression(range_exp): 

56 """Expand a range expression like '3-5' to values 3,4,5.""" 

57 for part in range_exp.split(','): 

58 sub_range = part.split('-') 

59 if len(sub_range) == 1: 

60 sub_range = sub_range * 2 

61 else: 

62 assert len(sub_range) == 2 

63 num_digits = len(sub_range[0]) 

64 for i in range(int(sub_range[0]), int(sub_range[1]) + 1): 

65 yield str(i).zfill(num_digits) 

66 

67 hosts = [] 

68 try: 

69 for part in split_hostlist(hostlist): 

70 # Match prefix (anything but a range expression) and range expression 

71 # Both are optional 

72 m = re.match(r'([^,[\]]*)(\[([^\]]+)\])?$', part) 

73 if m is None: 

74 raise ValueError('Invalid part: %s' % part) 

75 prefix = m.group(1) or '' 

76 if m.group(3) is None: 

77 hosts.append(prefix) 

78 else: 

79 hosts.extend(prefix + i for i in expand_range_expression(m.group(3))) 

80 except Exception as e: 

81 raise ValueError('Invalid hostlist format "%s": %s' % (hostlist, e)) 

82 return hosts 

83 

84 

85def expand_tasks_per_node(tasks_per_node): 

86 """Expands the tasks per node expression from SLURM. 

87 

88 The order is preserved so it can be matched to the hostlist 

89 Input: '3(x2),2,1' 

90 Output: [3, 3, 2, 1] 

91 """ 

92 result = [] 

93 try: 

94 for part in tasks_per_node.split(','): 

95 m = re.match(r'(\d+)(\(x(\d+)\))?$', part) 

96 assert m is not None 

97 num_tasks = int(m.group(1)) 

98 num_repetitions = int(m.group(3) or 1) 

99 result.extend([num_tasks] * num_repetitions) 

100 except Exception as e: 

101 raise ValueError('Invalid tasks-per-node list format "%s": %s' % 

102 (tasks_per_node, e)) 

103 return result 

104 

105 

106def _get_slurm_var(name): 

107 """Gets the SLURM variable from the environment. 

108 

109 Args: 

110 name: Name of the step variable 

111 

112 Returns: 

113 SLURM_<name> from os.environ 

114 Raises: 

115 RuntimeError if variable is not found 

116 """ 

117 name = 'SLURM_' + name 

118 try: 

119 return os.environ[name] 

120 except KeyError: 

121 raise RuntimeError('%s not found in environment. ' 

122 'Not running inside a SLURM step?' % name) 

123 

124 

125def _get_num_slurm_tasks(): 

126 """Returns the number of SLURM tasks of the current job step. 

127 

128 Returns: 

129 The number of tasks as an int 

130 """ 

131 return int(_get_slurm_var('STEP_NUM_TASKS')) 

132 

133 

134def _get_num_nvidia_gpus(): 

135 """Gets the number of NVIDIA GPUs by using CUDA_VISIBLE_DEVICES and nvidia-smi. 

136 

137 Returns: 

138 Number of GPUs available on the node 

139 Raises: 

140 RuntimeError if executing nvidia-smi failed 

141 """ 

142 try: 

143 return len(os.environ['CUDA_VISIBLE_DEVICES'].split(',')) 

144 except KeyError: 

145 pass # Ignore and fallback to using nvidia-smi 

146 try: 

147 output = subprocess.check_output(['nvidia-smi', '--list-gpus'], 

148 encoding='utf-8') 

149 return sum(l.startswith('GPU ') for l in output.strip().split('\n')) 

150 except subprocess.CalledProcessError as e: 

151 raise RuntimeError('Could not get number of GPUs from nvidia-smi. ' 

152 'Maybe it is missing?\nOutput: %s' % e.output) 

153 

154 

155def get_num_gpus(): 

156 """Returns the number of GPUs visible on the current node. 

157 

158 Currently only implemented for NVIDIA GPUs. 

159 """ 

160 return _get_num_nvidia_gpus() 

161 

162 

163@tf_export('distribute.cluster_resolver.SlurmClusterResolver') 

164class SlurmClusterResolver(ClusterResolver): 

165 """ClusterResolver for system with Slurm workload manager. 

166 

167 This is an implementation of ClusterResolver for Slurm clusters. This allows 

168 the specification of jobs and task counts, number of tasks per node, number 

169 of GPUs on each node and number of GPUs for each task. It retrieves system 

170 attributes by Slurm environment variables, resolves allocated computing node 

171 names, constructs a cluster and returns a ClusterResolver object which can be 

172 used for distributed TensorFlow. 

173 """ 

174 

175 def __init__(self, 

176 jobs=None, 

177 port_base=8888, 

178 gpus_per_node=None, 

179 gpus_per_task=None, 

180 tasks_per_node=None, 

181 auto_set_gpu=True, 

182 rpc_layer='grpc'): 

183 """Creates a new SlurmClusterResolver object. 

184 

185 For any parameter not set it will query the environment for the value. 

186 It uses those parameters to check which nodes have processes reside on and 

187 resolves their hostnames. 

188 With the number tasks per node it offsets the port number for each process. 

189 With the number of GPUs per node and per task it allocates GPUs to tasks by 

190 setting environment variables. 

191 Using the resolver works best (and is easier) with homogeneous tasks but 

192 heterogeneous tasks (number of tasks varying per node) are also possible as 

193 long as the number of GPUs per task stays constant. 

194 

195 Used environment variables: 

196 - SLURM_PROCID 

197 - (opt) SLURM_STEP_NUM_TASKS 

198 - (opt) SLURM_STEP_NODELIST 

199 - (opt) SLURM_STEP_TASKS_PER_NODE 

200 

201 Args: 

202 jobs: Dictionary with job names as key and number of tasks in the job as 

203 value. Defaults to as many 'worker's as there are (Slurm) tasks. 

204 port_base: The first port number to start with for processes on a node. 

205 gpus_per_node: Number of GPUs available on each node. Defaults to the 

206 number of GPUs reported by nvidia-smi 

207 gpus_per_task: Number of GPUs to be used for each task. Default is to 

208 evenly distribute the gpus_per_node to tasks_per_node. 

209 tasks_per_node: Number of tasks running on each node. Can be an integer if 

210 the number of tasks per node is constant or a dictionary mapping 

211 hostnames to number of tasks on that node. If not set the Slurm 

212 environment is queried for the correct mapping. 

213 auto_set_gpu: Set the visible CUDA devices automatically while resolving 

214 the cluster by setting CUDA_VISIBLE_DEVICES environment variable. 

215 Defaults to True. 

216 rpc_layer: The protocol TensorFlow used to communicate between nodes. 

217 Defaults to 'grpc'. 

218 

219 Returns: 

220 A ClusterResolver object which can be used with distributed TensorFlow. 

221 

222 Raises: 

223 RuntimeError: If requested more GPUs per node than available or 

224 requested more tasks than assigned tasks or 

225 resolving missing values from the environment failed. 

226 """ 

227 

228 self._rank = self._resolve_own_rank() 

229 

230 if jobs is None: 

231 jobs = {'worker': self._resolve_num_tasks()} 

232 

233 self._jobs = jobs 

234 self._port_base = port_base 

235 

236 if tasks_per_node is None: 

237 self._task_configuration = self._resolve_task_configuration() 

238 elif isinstance(tasks_per_node, dict): 

239 # User can pass in an explicit configuration as a dict 

240 self._task_configuration = tasks_per_node 

241 else: 

242 # User can pass a fixed number of tasks per node 

243 hostlist = self._resolve_hostlist() 

244 self._task_configuration = { 

245 host: int(tasks_per_node) for host in hostlist 

246 } 

247 

248 max_tasks_per_node = max(self._task_configuration.values()) 

249 num_tasks = sum(self._task_configuration.values()) 

250 

251 if gpus_per_node is None: 

252 gpus_per_node = get_num_gpus() 

253 if gpus_per_task is None: 

254 gpus_per_task = gpus_per_node // max_tasks_per_node 

255 self._gpus_per_node = gpus_per_node 

256 self._gpus_per_task = gpus_per_task 

257 

258 self._auto_set_gpu = auto_set_gpu 

259 self.task_type = None 

260 self.task_id = None 

261 self.rpc_layer = rpc_layer 

262 

263 self._gpu_allocation = [] 

264 self._cluster_allocation = {} 

265 

266 if max_tasks_per_node * self._gpus_per_task > self._gpus_per_node: 

267 raise RuntimeError('Requested more GPUs per node than available.') 

268 

269 if sum(self._jobs.values()) != num_tasks: 

270 raise RuntimeError('Requested {} tasks but only {} were assigned.'.format( 

271 sum(self._jobs.values()), num_tasks)) 

272 

273 def _resolve_own_rank(self): 

274 """Returns the rank of the current task in range [0, num_tasks).""" 

275 return int(_get_slurm_var('PROCID')) 

276 

277 def _resolve_num_tasks(self): 

278 """Returns the number of tasks for the current job step.""" 

279 return _get_num_slurm_tasks() 

280 

281 def _resolve_hostlist(self): 

282 """Returns a list of hostnames for nodes running the current job step.""" 

283 return expand_hostlist(_get_slurm_var('STEP_NODELIST')) 

284 

285 def _resolve_task_configuration(self): 

286 """Creates a mapping of hostnames to the number of tasks allocated on it. 

287 

288 Reads the SLURM environment to determine the nodes involved in the current 

289 job step and number of tasks running on each node. 

290 

291 Returns a dictionary mapping each hostname to the number of tasks. 

292 """ 

293 hostlist = self._resolve_hostlist() 

294 tasks_per_node = expand_tasks_per_node( 

295 _get_slurm_var('STEP_TASKS_PER_NODE')) 

296 return { 

297 host: num_tasks for (host, num_tasks) in zip(hostlist, tasks_per_node) 

298 } 

299 

300 def cluster_spec(self): 

301 """Returns a ClusterSpec object based on the latest instance group info. 

302 

303 This returns a ClusterSpec object for use based on information from the 

304 specified initialization parameters and Slurm environment variables. The 

305 cluster specification is resolved each time this function is called. The 

306 resolver extract hostnames of nodes by scontrol and pack tasks in that 

307 order until a node a has number of tasks that is equal to specification. 

308 GPUs on nodes are allocated to tasks by specification through setting 

309 CUDA_VISIBLE_DEVICES environment variable. 

310 

311 Returns: 

312 A ClusterSpec containing host information retrieved from Slurm's 

313 environment variables. 

314 """ 

315 

316 task_list = [] 

317 self._gpu_allocation = [] 

318 self._cluster_allocation = {} 

319 

320 # Sort to make sure the order is the same for each run 

321 for host, num_tasks in sorted(self._task_configuration.items()): 

322 for port_offset, gpu_offset in zip( 

323 range(num_tasks), range(0, self._gpus_per_node, self._gpus_per_task)): 

324 

325 host_addr = '%s:%d' % (host, self._port_base + port_offset) 

326 task_list.append(host_addr) 

327 gpu_id_list = [] 

328 

329 for gpu_id in range(gpu_offset, gpu_offset + self._gpus_per_task): 

330 gpu_id_list.append(str(gpu_id)) 

331 

332 self._gpu_allocation.append(','.join(gpu_id_list)) 

333 

334 cluster_rank_offset_start = 0 

335 cluster_rank_offset_end = 0 

336 

337 # Sort to make sure the order is the same for each run 

338 for task_type, num_tasks in sorted(self._jobs.items()): 

339 cluster_rank_offset_end = cluster_rank_offset_start + num_tasks 

340 

341 self._cluster_allocation[task_type] = ( 

342 task_list[cluster_rank_offset_start:cluster_rank_offset_end]) 

343 

344 if cluster_rank_offset_start <= self._rank < cluster_rank_offset_end: 

345 self.task_type = task_type 

346 self.task_id = self._rank - cluster_rank_offset_start 

347 

348 cluster_rank_offset_start = cluster_rank_offset_end 

349 

350 if self._auto_set_gpu: 

351 os.environ['CUDA_VISIBLE_DEVICES'] = self._gpu_allocation[self._rank] 

352 

353 return ClusterSpec(self._cluster_allocation) 

354 

355 def get_task_info(self): 

356 """Returns job name and task_id for the process which calls this. 

357 

358 This returns the job name and task index for the process which calls this 

359 function according to its rank and cluster specification. The job name and 

360 task index are set after a cluster is constructed by cluster_spec otherwise 

361 defaults to None. 

362 

363 Returns: 

364 A string specifying job name the process belongs to and an integer 

365 specifying the task index the process belongs to in that job. 

366 """ 

367 return self.task_type, self.task_id 

368 

369 def master(self, task_type=None, task_id=None, rpc_layer=None): 

370 """Returns the master string for connecting to a TensorFlow master. 

371 

372 Args: 

373 task_type: (Optional) Overrides the default auto-selected task type. 

374 task_id: (Optional) Overrides the default auto-selected task index. 

375 rpc_layer: (Optional) Overrides the default RPC protocol TensorFlow uses 

376 to communicate across nodes. 

377 

378 Returns: 

379 A connection string for connecting to a TensorFlow master. 

380 """ 

381 task_type = task_type if task_type is not None else self.task_type 

382 task_id = task_id if task_id is not None else self.task_id 

383 

384 if task_type is not None and task_id is not None: 

385 return format_master_url( 

386 self.cluster_spec().task_address(task_type, task_id), 

387 rpc_layer or self.rpc_layer) 

388 

389 return '' 

390 

391 def num_accelerators(self, 

392 task_type=None, 

393 task_id=None, 

394 config_proto=None): 

395 # Unused, since this is set in __init__ manually. 

396 del task_type, task_id, config_proto 

397 return {'GPU': self._gpus_per_task}