1"""
2Job and task components for writing .xml files that the Windows HPC Server
32008 can use to start jobs.
4"""
5
6import os
7import re
8import uuid
9from xml.etree import ElementTree as ET
10
11from traitlets import Bool, Enum, Instance, Integer, List, Unicode
12from traitlets.config.configurable import Configurable
13
14# -----------------------------------------------------------------------------
15# Job and Task classes
16# -----------------------------------------------------------------------------
17
18
19def as_str(value):
20 if isinstance(value, str):
21 return value
22 elif isinstance(value, bool):
23 if value:
24 return 'true'
25 else:
26 return 'false'
27 elif isinstance(value, (int, float)):
28 return repr(value)
29 else:
30 return value
31
32
33def indent(elem, level=0):
34 i = "\n" + level * " "
35 if len(elem):
36 if not elem.text or not elem.text.strip():
37 elem.text = i + " "
38 if not elem.tail or not elem.tail.strip():
39 elem.tail = i
40 for elem in elem:
41 indent(elem, level + 1)
42 if not elem.tail or not elem.tail.strip():
43 elem.tail = i
44 else:
45 if level and (not elem.tail or not elem.tail.strip()):
46 elem.tail = i
47
48
49def find_username():
50 domain = os.environ.get('USERDOMAIN')
51 username = os.environ.get('USERNAME', '')
52 if domain is None:
53 return username
54 else:
55 return f'{domain}\\{username}'
56
57
58class WinHPCJob(Configurable):
59 job_id = Unicode('')
60 job_name = Unicode('MyJob', config=True)
61 min_cores = Integer(1, config=True)
62 max_cores = Integer(1, config=True)
63 min_sockets = Integer(1, config=True)
64 max_sockets = Integer(1, config=True)
65 min_nodes = Integer(1, config=True)
66 max_nodes = Integer(1, config=True)
67 unit_type = Unicode("Core", config=True)
68 auto_calculate_min = Bool(True, config=True)
69 auto_calculate_max = Bool(True, config=True)
70 run_until_canceled = Bool(False, config=True)
71 is_exclusive = Bool(False, config=True)
72 username = Unicode(find_username(), config=True)
73 job_type = Unicode('Batch', config=True)
74 priority = Enum(
75 ('Lowest', 'BelowNormal', 'Normal', 'AboveNormal', 'Highest'),
76 default_value='Highest',
77 config=True,
78 )
79 requested_nodes = Unicode('', config=True)
80 project = Unicode('IPython', config=True)
81 xmlns = Unicode('http://schemas.microsoft.com/HPCS2008/scheduler/')
82 version = Unicode("2.000")
83 tasks = List([])
84
85 @property
86 def owner(self):
87 return self.username
88
89 def _write_attr(self, root, attr, key):
90 s = as_str(getattr(self, attr, ''))
91 if s:
92 root.set(key, s)
93
94 def as_element(self):
95 # We have to add _A_ type things to get the right order than
96 # the MSFT XML parser expects.
97 root = ET.Element('Job')
98 self._write_attr(root, 'version', '_A_Version')
99 self._write_attr(root, 'job_name', '_B_Name')
100 self._write_attr(root, 'unit_type', '_C_UnitType')
101 self._write_attr(root, 'min_cores', '_D_MinCores')
102 self._write_attr(root, 'max_cores', '_E_MaxCores')
103 self._write_attr(root, 'min_sockets', '_F_MinSockets')
104 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
105 self._write_attr(root, 'min_nodes', '_H_MinNodes')
106 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
107 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
108 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
109 self._write_attr(root, 'username', '_L_UserName')
110 self._write_attr(root, 'job_type', '_M_JobType')
111 self._write_attr(root, 'priority', '_N_Priority')
112 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
113 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
114 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
115 self._write_attr(root, 'project', '_R_Project')
116 self._write_attr(root, 'owner', '_S_Owner')
117 self._write_attr(root, 'xmlns', '_T_xmlns')
118 dependencies = ET.SubElement(root, "Dependencies")
119 etasks = ET.SubElement(root, "Tasks")
120 for t in self.tasks:
121 etasks.append(t.as_element())
122 return root
123
124 def tostring(self):
125 """Return the string representation of the job description XML."""
126 root = self.as_element()
127 indent(root)
128 txt = ET.tostring(root, encoding="utf-8").decode('utf-8')
129 # Now remove the tokens used to order the attributes.
130 txt = re.sub(r'_[A-Z]_', '', txt)
131 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
132 return txt
133
134 def write(self, filename):
135 """Write the XML job description to a file."""
136 txt = self.tostring()
137 with open(filename, 'w') as f:
138 f.write(txt)
139
140 def add_task(self, task):
141 """Add a task to the job.
142
143 Parameters
144 ----------
145 task : :class:`WinHPCTask`
146 The task object to add.
147 """
148 self.tasks.append(task)
149
150
151class WinHPCTask(Configurable):
152 task_id = Unicode('')
153 task_name = Unicode('')
154 version = Unicode("2.000")
155 min_cores = Integer(1, config=True)
156 max_cores = Integer(1, config=True)
157 min_sockets = Integer(1, config=True)
158 max_sockets = Integer(1, config=True)
159 min_nodes = Integer(1, config=True)
160 max_nodes = Integer(1, config=True)
161 unit_type = Unicode("Core", config=True)
162 command_line = Unicode('', config=True)
163 work_directory = Unicode('', config=True)
164 is_rerunnaable = Bool(True, config=True)
165 std_out_file_path = Unicode('', config=True)
166 std_err_file_path = Unicode('', config=True)
167 is_parametric = Bool(False, config=True)
168 environment_variables = Instance(dict, args=(), config=True)
169
170 def _write_attr(self, root, attr, key):
171 s = as_str(getattr(self, attr, ''))
172 if s:
173 root.set(key, s)
174
175 def as_element(self):
176 root = ET.Element('Task')
177 self._write_attr(root, 'version', '_A_Version')
178 self._write_attr(root, 'task_name', '_B_Name')
179 self._write_attr(root, 'min_cores', '_C_MinCores')
180 self._write_attr(root, 'max_cores', '_D_MaxCores')
181 self._write_attr(root, 'min_sockets', '_E_MinSockets')
182 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
183 self._write_attr(root, 'min_nodes', '_G_MinNodes')
184 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
185 self._write_attr(root, 'command_line', '_I_CommandLine')
186 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
187 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
188 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
189 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
190 self._write_attr(root, 'is_parametric', '_N_IsParametric')
191 self._write_attr(root, 'unit_type', '_O_UnitType')
192 root.append(self.get_env_vars())
193 return root
194
195 def get_env_vars(self):
196 env_vars = ET.Element('EnvironmentVariables')
197 for k, v in self.environment_variables.items():
198 variable = ET.SubElement(env_vars, "Variable")
199 name = ET.SubElement(variable, "Name")
200 name.text = k
201 value = ET.SubElement(variable, "Value")
202 value.text = v
203 return env_vars
204
205
206# By declaring these, we can configure the controller and engine separately!
207
208
209class IPControllerJob(WinHPCJob):
210 job_name = Unicode('IPController', config=False)
211 is_exclusive = Bool(False, config=True)
212 username = Unicode(find_username(), config=True)
213 priority = Enum(
214 ('Lowest', 'BelowNormal', 'Normal', 'AboveNormal', 'Highest'),
215 default_value='Highest',
216 config=True,
217 )
218 requested_nodes = Unicode('', config=True)
219 project = Unicode('IPython', config=True)
220
221
222class IPEngineSetJob(WinHPCJob):
223 job_name = Unicode('IPEngineSet', config=False)
224 is_exclusive = Bool(False, config=True)
225 username = Unicode(find_username(), config=True)
226 priority = Enum(
227 ('Lowest', 'BelowNormal', 'Normal', 'AboveNormal', 'Highest'),
228 default_value='Highest',
229 config=True,
230 )
231 requested_nodes = Unicode('', config=True)
232 project = Unicode('IPython', config=True)
233
234
235class IPControllerTask(WinHPCTask):
236 task_name = Unicode('IPController', config=True)
237 controller_cmd = List(['ipcontroller.exe'], config=True)
238 controller_args = List(['--log-level=40'], config=True)
239 # I don't want these to be configurable
240 std_out_file_path = Unicode('', config=False)
241 std_err_file_path = Unicode('', config=False)
242 min_cores = Integer(1, config=False)
243 max_cores = Integer(1, config=False)
244 min_sockets = Integer(1, config=False)
245 max_sockets = Integer(1, config=False)
246 min_nodes = Integer(1, config=False)
247 max_nodes = Integer(1, config=False)
248 unit_type = Unicode("Core", config=False)
249 work_directory = Unicode('', config=False)
250
251 def __init__(self, **kwargs):
252 super().__init__(**kwargs)
253 the_uuid = uuid.uuid1()
254 self.std_out_file_path = os.path.join('log', f'ipcontroller-{the_uuid}.out')
255 self.std_err_file_path = os.path.join('log', f'ipcontroller-{the_uuid}.err')
256
257 @property
258 def command_line(self):
259 return ' '.join(self.controller_cmd + self.controller_args)
260
261
262class IPEngineTask(WinHPCTask):
263 task_name = Unicode('IPEngine', config=True)
264 engine_cmd = List(['ipengine.exe'], config=True)
265 engine_args = List(['--log-level=40'], config=True)
266 # I don't want these to be configurable
267 std_out_file_path = Unicode('', config=False)
268 std_err_file_path = Unicode('', config=False)
269 min_cores = Integer(1, config=False)
270 max_cores = Integer(1, config=False)
271 min_sockets = Integer(1, config=False)
272 max_sockets = Integer(1, config=False)
273 min_nodes = Integer(1, config=False)
274 max_nodes = Integer(1, config=False)
275 unit_type = Unicode("Core", config=False)
276 work_directory = Unicode('', config=False)
277
278 def __init__(self, **kwargs):
279 super().__init__(**kwargs)
280 the_uuid = uuid.uuid1()
281 self.std_out_file_path = os.path.join('log', f'ipengine-{the_uuid}.out')
282 self.std_err_file_path = os.path.join('log', f'ipengine-{the_uuid}.err')
283
284 @property
285 def command_line(self):
286 return ' '.join(self.engine_cmd + self.engine_args)