1# Copyright 2018-2020 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Implementation of Cluster Resolvers for Slurm workload manager."""
16
17import os
18import re
19import subprocess
20
21from tensorflow.python.distribute.cluster_resolver.cluster_resolver import ClusterResolver
22from tensorflow.python.distribute.cluster_resolver.cluster_resolver import format_master_url
23from tensorflow.python.training.server_lib import ClusterSpec
24from tensorflow.python.util.tf_export import tf_export
25
26
27def expand_hostlist(hostlist):
28 """Create a list of hosts out of a SLURM hostlist.
29
30 The order of nodes is preserved and no deduplication is done
31 Input: 'n[1-2],m5,o[3-4,6,7-9]')
32 Output: ['n1', 'n2', 'm5', 'o3', 'o4', 'o6', 'o7', 'o8', 'o9']
33 """
34
35 def split_hostlist(hostlist):
36 """Split hostlist at commas outside of range expressions ('[3-5]')."""
37 in_brackets = False
38 cur_host = ''
39 for c in hostlist:
40 if in_brackets:
41 assert c != '['
42 if c == ']':
43 in_brackets = False
44 elif c == '[':
45 in_brackets = True
46 elif c == ',':
47 assert cur_host != ''
48 yield cur_host
49 cur_host = ''
50 continue
51 cur_host += c
52 if cur_host:
53 yield cur_host
54
55 def expand_range_expression(range_exp):
56 """Expand a range expression like '3-5' to values 3,4,5."""
57 for part in range_exp.split(','):
58 sub_range = part.split('-')
59 if len(sub_range) == 1:
60 sub_range = sub_range * 2
61 else:
62 assert len(sub_range) == 2
63 num_digits = len(sub_range[0])
64 for i in range(int(sub_range[0]), int(sub_range[1]) + 1):
65 yield str(i).zfill(num_digits)
66
67 hosts = []
68 try:
69 for part in split_hostlist(hostlist):
70 # Match prefix (anything but a range expression) and range expression
71 # Both are optional
72 m = re.match(r'([^,[\]]*)(\[([^\]]+)\])?$', part)
73 if m is None:
74 raise ValueError('Invalid part: %s' % part)
75 prefix = m.group(1) or ''
76 if m.group(3) is None:
77 hosts.append(prefix)
78 else:
79 hosts.extend(prefix + i for i in expand_range_expression(m.group(3)))
80 except Exception as e:
81 raise ValueError('Invalid hostlist format "%s": %s' % (hostlist, e))
82 return hosts
83
84
85def expand_tasks_per_node(tasks_per_node):
86 """Expands the tasks per node expression from SLURM.
87
88 The order is preserved so it can be matched to the hostlist
89 Input: '3(x2),2,1'
90 Output: [3, 3, 2, 1]
91 """
92 result = []
93 try:
94 for part in tasks_per_node.split(','):
95 m = re.match(r'(\d+)(\(x(\d+)\))?$', part)
96 assert m is not None
97 num_tasks = int(m.group(1))
98 num_repetitions = int(m.group(3) or 1)
99 result.extend([num_tasks] * num_repetitions)
100 except Exception as e:
101 raise ValueError('Invalid tasks-per-node list format "%s": %s' %
102 (tasks_per_node, e))
103 return result
104
105
106def _get_slurm_var(name):
107 """Gets the SLURM variable from the environment.
108
109 Args:
110 name: Name of the step variable
111
112 Returns:
113 SLURM_<name> from os.environ
114 Raises:
115 RuntimeError if variable is not found
116 """
117 name = 'SLURM_' + name
118 try:
119 return os.environ[name]
120 except KeyError:
121 raise RuntimeError('%s not found in environment. '
122 'Not running inside a SLURM step?' % name)
123
124
125def _get_num_slurm_tasks():
126 """Returns the number of SLURM tasks of the current job step.
127
128 Returns:
129 The number of tasks as an int
130 """
131 return int(_get_slurm_var('STEP_NUM_TASKS'))
132
133
134def _get_num_nvidia_gpus():
135 """Gets the number of NVIDIA GPUs by using CUDA_VISIBLE_DEVICES and nvidia-smi.
136
137 Returns:
138 Number of GPUs available on the node
139 Raises:
140 RuntimeError if executing nvidia-smi failed
141 """
142 try:
143 return len(os.environ['CUDA_VISIBLE_DEVICES'].split(','))
144 except KeyError:
145 pass # Ignore and fallback to using nvidia-smi
146 try:
147 output = subprocess.check_output(['nvidia-smi', '--list-gpus'],
148 encoding='utf-8')
149 return sum(l.startswith('GPU ') for l in output.strip().split('\n'))
150 except subprocess.CalledProcessError as e:
151 raise RuntimeError('Could not get number of GPUs from nvidia-smi. '
152 'Maybe it is missing?\nOutput: %s' % e.output)
153
154
155def get_num_gpus():
156 """Returns the number of GPUs visible on the current node.
157
158 Currently only implemented for NVIDIA GPUs.
159 """
160 return _get_num_nvidia_gpus()
161
162
163@tf_export('distribute.cluster_resolver.SlurmClusterResolver')
164class SlurmClusterResolver(ClusterResolver):
165 """ClusterResolver for system with Slurm workload manager.
166
167 This is an implementation of ClusterResolver for Slurm clusters. This allows
168 the specification of jobs and task counts, number of tasks per node, number
169 of GPUs on each node and number of GPUs for each task. It retrieves system
170 attributes by Slurm environment variables, resolves allocated computing node
171 names, constructs a cluster and returns a ClusterResolver object which can be
172 used for distributed TensorFlow.
173 """
174
175 def __init__(self,
176 jobs=None,
177 port_base=8888,
178 gpus_per_node=None,
179 gpus_per_task=None,
180 tasks_per_node=None,
181 auto_set_gpu=True,
182 rpc_layer='grpc'):
183 """Creates a new SlurmClusterResolver object.
184
185 For any parameter not set it will query the environment for the value.
186 It uses those parameters to check which nodes have processes reside on and
187 resolves their hostnames.
188 With the number tasks per node it offsets the port number for each process.
189 With the number of GPUs per node and per task it allocates GPUs to tasks by
190 setting environment variables.
191 Using the resolver works best (and is easier) with homogeneous tasks but
192 heterogeneous tasks (number of tasks varying per node) are also possible as
193 long as the number of GPUs per task stays constant.
194
195 Used environment variables:
196 - SLURM_PROCID
197 - (opt) SLURM_STEP_NUM_TASKS
198 - (opt) SLURM_STEP_NODELIST
199 - (opt) SLURM_STEP_TASKS_PER_NODE
200
201 Args:
202 jobs: Dictionary with job names as key and number of tasks in the job as
203 value. Defaults to as many 'worker's as there are (Slurm) tasks.
204 port_base: The first port number to start with for processes on a node.
205 gpus_per_node: Number of GPUs available on each node. Defaults to the
206 number of GPUs reported by nvidia-smi
207 gpus_per_task: Number of GPUs to be used for each task. Default is to
208 evenly distribute the gpus_per_node to tasks_per_node.
209 tasks_per_node: Number of tasks running on each node. Can be an integer if
210 the number of tasks per node is constant or a dictionary mapping
211 hostnames to number of tasks on that node. If not set the Slurm
212 environment is queried for the correct mapping.
213 auto_set_gpu: Set the visible CUDA devices automatically while resolving
214 the cluster by setting CUDA_VISIBLE_DEVICES environment variable.
215 Defaults to True.
216 rpc_layer: The protocol TensorFlow used to communicate between nodes.
217 Defaults to 'grpc'.
218
219 Returns:
220 A ClusterResolver object which can be used with distributed TensorFlow.
221
222 Raises:
223 RuntimeError: If requested more GPUs per node than available or
224 requested more tasks than assigned tasks or
225 resolving missing values from the environment failed.
226 """
227
228 self._rank = self._resolve_own_rank()
229
230 if jobs is None:
231 jobs = {'worker': self._resolve_num_tasks()}
232
233 self._jobs = jobs
234 self._port_base = port_base
235
236 if tasks_per_node is None:
237 self._task_configuration = self._resolve_task_configuration()
238 elif isinstance(tasks_per_node, dict):
239 # User can pass in an explicit configuration as a dict
240 self._task_configuration = tasks_per_node
241 else:
242 # User can pass a fixed number of tasks per node
243 hostlist = self._resolve_hostlist()
244 self._task_configuration = {
245 host: int(tasks_per_node) for host in hostlist
246 }
247
248 max_tasks_per_node = max(self._task_configuration.values())
249 num_tasks = sum(self._task_configuration.values())
250
251 if gpus_per_node is None:
252 gpus_per_node = get_num_gpus()
253 if gpus_per_task is None:
254 gpus_per_task = gpus_per_node // max_tasks_per_node
255 self._gpus_per_node = gpus_per_node
256 self._gpus_per_task = gpus_per_task
257
258 self._auto_set_gpu = auto_set_gpu
259 self.task_type = None
260 self.task_id = None
261 self.rpc_layer = rpc_layer
262
263 self._gpu_allocation = []
264 self._cluster_allocation = {}
265
266 if max_tasks_per_node * self._gpus_per_task > self._gpus_per_node:
267 raise RuntimeError('Requested more GPUs per node than available.')
268
269 if sum(self._jobs.values()) != num_tasks:
270 raise RuntimeError('Requested {} tasks but only {} were assigned.'.format(
271 sum(self._jobs.values()), num_tasks))
272
273 def _resolve_own_rank(self):
274 """Returns the rank of the current task in range [0, num_tasks)."""
275 return int(_get_slurm_var('PROCID'))
276
277 def _resolve_num_tasks(self):
278 """Returns the number of tasks for the current job step."""
279 return _get_num_slurm_tasks()
280
281 def _resolve_hostlist(self):
282 """Returns a list of hostnames for nodes running the current job step."""
283 return expand_hostlist(_get_slurm_var('STEP_NODELIST'))
284
285 def _resolve_task_configuration(self):
286 """Creates a mapping of hostnames to the number of tasks allocated on it.
287
288 Reads the SLURM environment to determine the nodes involved in the current
289 job step and number of tasks running on each node.
290
291 Returns a dictionary mapping each hostname to the number of tasks.
292 """
293 hostlist = self._resolve_hostlist()
294 tasks_per_node = expand_tasks_per_node(
295 _get_slurm_var('STEP_TASKS_PER_NODE'))
296 return {
297 host: num_tasks for (host, num_tasks) in zip(hostlist, tasks_per_node)
298 }
299
300 def cluster_spec(self):
301 """Returns a ClusterSpec object based on the latest instance group info.
302
303 This returns a ClusterSpec object for use based on information from the
304 specified initialization parameters and Slurm environment variables. The
305 cluster specification is resolved each time this function is called. The
306 resolver extract hostnames of nodes by scontrol and pack tasks in that
307 order until a node a has number of tasks that is equal to specification.
308 GPUs on nodes are allocated to tasks by specification through setting
309 CUDA_VISIBLE_DEVICES environment variable.
310
311 Returns:
312 A ClusterSpec containing host information retrieved from Slurm's
313 environment variables.
314 """
315
316 task_list = []
317 self._gpu_allocation = []
318 self._cluster_allocation = {}
319
320 # Sort to make sure the order is the same for each run
321 for host, num_tasks in sorted(self._task_configuration.items()):
322 for port_offset, gpu_offset in zip(
323 range(num_tasks), range(0, self._gpus_per_node, self._gpus_per_task)):
324
325 host_addr = '%s:%d' % (host, self._port_base + port_offset)
326 task_list.append(host_addr)
327 gpu_id_list = []
328
329 for gpu_id in range(gpu_offset, gpu_offset + self._gpus_per_task):
330 gpu_id_list.append(str(gpu_id))
331
332 self._gpu_allocation.append(','.join(gpu_id_list))
333
334 cluster_rank_offset_start = 0
335 cluster_rank_offset_end = 0
336
337 # Sort to make sure the order is the same for each run
338 for task_type, num_tasks in sorted(self._jobs.items()):
339 cluster_rank_offset_end = cluster_rank_offset_start + num_tasks
340
341 self._cluster_allocation[task_type] = (
342 task_list[cluster_rank_offset_start:cluster_rank_offset_end])
343
344 if cluster_rank_offset_start <= self._rank < cluster_rank_offset_end:
345 self.task_type = task_type
346 self.task_id = self._rank - cluster_rank_offset_start
347
348 cluster_rank_offset_start = cluster_rank_offset_end
349
350 if self._auto_set_gpu:
351 os.environ['CUDA_VISIBLE_DEVICES'] = self._gpu_allocation[self._rank]
352
353 return ClusterSpec(self._cluster_allocation)
354
355 def get_task_info(self):
356 """Returns job name and task_id for the process which calls this.
357
358 This returns the job name and task index for the process which calls this
359 function according to its rank and cluster specification. The job name and
360 task index are set after a cluster is constructed by cluster_spec otherwise
361 defaults to None.
362
363 Returns:
364 A string specifying job name the process belongs to and an integer
365 specifying the task index the process belongs to in that job.
366 """
367 return self.task_type, self.task_id
368
369 def master(self, task_type=None, task_id=None, rpc_layer=None):
370 """Returns the master string for connecting to a TensorFlow master.
371
372 Args:
373 task_type: (Optional) Overrides the default auto-selected task type.
374 task_id: (Optional) Overrides the default auto-selected task index.
375 rpc_layer: (Optional) Overrides the default RPC protocol TensorFlow uses
376 to communicate across nodes.
377
378 Returns:
379 A connection string for connecting to a TensorFlow master.
380 """
381 task_type = task_type if task_type is not None else self.task_type
382 task_id = task_id if task_id is not None else self.task_id
383
384 if task_type is not None and task_id is not None:
385 return format_master_url(
386 self.cluster_spec().task_address(task_type, task_id),
387 rpc_layer or self.rpc_layer)
388
389 return ''
390
391 def num_accelerators(self,
392 task_type=None,
393 task_id=None,
394 config_proto=None):
395 # Unused, since this is set in __init__ manually.
396 del task_type, task_id, config_proto
397 return {'GPU': self._gpus_per_task}