Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/cluster/_winhpcjob.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

213 statements  

1""" 

2Job and task components for writing .xml files that the Windows HPC Server 

32008 can use to start jobs. 

4""" 

5 

6import os 

7import re 

8import uuid 

9from xml.etree import ElementTree as ET 

10 

11from traitlets import Bool, Enum, Instance, Integer, List, Unicode 

12from traitlets.config.configurable import Configurable 

13 

14# ----------------------------------------------------------------------------- 

15# Job and Task classes 

16# ----------------------------------------------------------------------------- 

17 

18 

19def as_str(value): 

20 if isinstance(value, str): 

21 return value 

22 elif isinstance(value, bool): 

23 if value: 

24 return 'true' 

25 else: 

26 return 'false' 

27 elif isinstance(value, (int, float)): 

28 return repr(value) 

29 else: 

30 return value 

31 

32 

33def indent(elem, level=0): 

34 i = "\n" + level * " " 

35 if len(elem): 

36 if not elem.text or not elem.text.strip(): 

37 elem.text = i + " " 

38 if not elem.tail or not elem.tail.strip(): 

39 elem.tail = i 

40 for elem in elem: 

41 indent(elem, level + 1) 

42 if not elem.tail or not elem.tail.strip(): 

43 elem.tail = i 

44 else: 

45 if level and (not elem.tail or not elem.tail.strip()): 

46 elem.tail = i 

47 

48 

49def find_username(): 

50 domain = os.environ.get('USERDOMAIN') 

51 username = os.environ.get('USERNAME', '') 

52 if domain is None: 

53 return username 

54 else: 

55 return f'{domain}\\{username}' 

56 

57 

58class WinHPCJob(Configurable): 

59 job_id = Unicode('') 

60 job_name = Unicode('MyJob', config=True) 

61 min_cores = Integer(1, config=True) 

62 max_cores = Integer(1, config=True) 

63 min_sockets = Integer(1, config=True) 

64 max_sockets = Integer(1, config=True) 

65 min_nodes = Integer(1, config=True) 

66 max_nodes = Integer(1, config=True) 

67 unit_type = Unicode("Core", config=True) 

68 auto_calculate_min = Bool(True, config=True) 

69 auto_calculate_max = Bool(True, config=True) 

70 run_until_canceled = Bool(False, config=True) 

71 is_exclusive = Bool(False, config=True) 

72 username = Unicode(find_username(), config=True) 

73 job_type = Unicode('Batch', config=True) 

74 priority = Enum( 

75 ('Lowest', 'BelowNormal', 'Normal', 'AboveNormal', 'Highest'), 

76 default_value='Highest', 

77 config=True, 

78 ) 

79 requested_nodes = Unicode('', config=True) 

80 project = Unicode('IPython', config=True) 

81 xmlns = Unicode('http://schemas.microsoft.com/HPCS2008/scheduler/') 

82 version = Unicode("2.000") 

83 tasks = List([]) 

84 

85 @property 

86 def owner(self): 

87 return self.username 

88 

89 def _write_attr(self, root, attr, key): 

90 s = as_str(getattr(self, attr, '')) 

91 if s: 

92 root.set(key, s) 

93 

94 def as_element(self): 

95 # We have to add _A_ type things to get the right order than 

96 # the MSFT XML parser expects. 

97 root = ET.Element('Job') 

98 self._write_attr(root, 'version', '_A_Version') 

99 self._write_attr(root, 'job_name', '_B_Name') 

100 self._write_attr(root, 'unit_type', '_C_UnitType') 

101 self._write_attr(root, 'min_cores', '_D_MinCores') 

102 self._write_attr(root, 'max_cores', '_E_MaxCores') 

103 self._write_attr(root, 'min_sockets', '_F_MinSockets') 

104 self._write_attr(root, 'max_sockets', '_G_MaxSockets') 

105 self._write_attr(root, 'min_nodes', '_H_MinNodes') 

106 self._write_attr(root, 'max_nodes', '_I_MaxNodes') 

107 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled') 

108 self._write_attr(root, 'is_exclusive', '_K_IsExclusive') 

109 self._write_attr(root, 'username', '_L_UserName') 

110 self._write_attr(root, 'job_type', '_M_JobType') 

111 self._write_attr(root, 'priority', '_N_Priority') 

112 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes') 

113 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax') 

114 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin') 

115 self._write_attr(root, 'project', '_R_Project') 

116 self._write_attr(root, 'owner', '_S_Owner') 

117 self._write_attr(root, 'xmlns', '_T_xmlns') 

118 dependencies = ET.SubElement(root, "Dependencies") 

119 etasks = ET.SubElement(root, "Tasks") 

120 for t in self.tasks: 

121 etasks.append(t.as_element()) 

122 return root 

123 

124 def tostring(self): 

125 """Return the string representation of the job description XML.""" 

126 root = self.as_element() 

127 indent(root) 

128 txt = ET.tostring(root, encoding="utf-8").decode('utf-8') 

129 # Now remove the tokens used to order the attributes. 

130 txt = re.sub(r'_[A-Z]_', '', txt) 

131 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt 

132 return txt 

133 

134 def write(self, filename): 

135 """Write the XML job description to a file.""" 

136 txt = self.tostring() 

137 with open(filename, 'w') as f: 

138 f.write(txt) 

139 

140 def add_task(self, task): 

141 """Add a task to the job. 

142 

143 Parameters 

144 ---------- 

145 task : :class:`WinHPCTask` 

146 The task object to add. 

147 """ 

148 self.tasks.append(task) 

149 

150 

151class WinHPCTask(Configurable): 

152 task_id = Unicode('') 

153 task_name = Unicode('') 

154 version = Unicode("2.000") 

155 min_cores = Integer(1, config=True) 

156 max_cores = Integer(1, config=True) 

157 min_sockets = Integer(1, config=True) 

158 max_sockets = Integer(1, config=True) 

159 min_nodes = Integer(1, config=True) 

160 max_nodes = Integer(1, config=True) 

161 unit_type = Unicode("Core", config=True) 

162 command_line = Unicode('', config=True) 

163 work_directory = Unicode('', config=True) 

164 is_rerunnaable = Bool(True, config=True) 

165 std_out_file_path = Unicode('', config=True) 

166 std_err_file_path = Unicode('', config=True) 

167 is_parametric = Bool(False, config=True) 

168 environment_variables = Instance(dict, args=(), config=True) 

169 

170 def _write_attr(self, root, attr, key): 

171 s = as_str(getattr(self, attr, '')) 

172 if s: 

173 root.set(key, s) 

174 

175 def as_element(self): 

176 root = ET.Element('Task') 

177 self._write_attr(root, 'version', '_A_Version') 

178 self._write_attr(root, 'task_name', '_B_Name') 

179 self._write_attr(root, 'min_cores', '_C_MinCores') 

180 self._write_attr(root, 'max_cores', '_D_MaxCores') 

181 self._write_attr(root, 'min_sockets', '_E_MinSockets') 

182 self._write_attr(root, 'max_sockets', '_F_MaxSockets') 

183 self._write_attr(root, 'min_nodes', '_G_MinNodes') 

184 self._write_attr(root, 'max_nodes', '_H_MaxNodes') 

185 self._write_attr(root, 'command_line', '_I_CommandLine') 

186 self._write_attr(root, 'work_directory', '_J_WorkDirectory') 

187 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable') 

188 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath') 

189 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath') 

190 self._write_attr(root, 'is_parametric', '_N_IsParametric') 

191 self._write_attr(root, 'unit_type', '_O_UnitType') 

192 root.append(self.get_env_vars()) 

193 return root 

194 

195 def get_env_vars(self): 

196 env_vars = ET.Element('EnvironmentVariables') 

197 for k, v in self.environment_variables.items(): 

198 variable = ET.SubElement(env_vars, "Variable") 

199 name = ET.SubElement(variable, "Name") 

200 name.text = k 

201 value = ET.SubElement(variable, "Value") 

202 value.text = v 

203 return env_vars 

204 

205 

206# By declaring these, we can configure the controller and engine separately! 

207 

208 

209class IPControllerJob(WinHPCJob): 

210 job_name = Unicode('IPController', config=False) 

211 is_exclusive = Bool(False, config=True) 

212 username = Unicode(find_username(), config=True) 

213 priority = Enum( 

214 ('Lowest', 'BelowNormal', 'Normal', 'AboveNormal', 'Highest'), 

215 default_value='Highest', 

216 config=True, 

217 ) 

218 requested_nodes = Unicode('', config=True) 

219 project = Unicode('IPython', config=True) 

220 

221 

222class IPEngineSetJob(WinHPCJob): 

223 job_name = Unicode('IPEngineSet', config=False) 

224 is_exclusive = Bool(False, config=True) 

225 username = Unicode(find_username(), config=True) 

226 priority = Enum( 

227 ('Lowest', 'BelowNormal', 'Normal', 'AboveNormal', 'Highest'), 

228 default_value='Highest', 

229 config=True, 

230 ) 

231 requested_nodes = Unicode('', config=True) 

232 project = Unicode('IPython', config=True) 

233 

234 

235class IPControllerTask(WinHPCTask): 

236 task_name = Unicode('IPController', config=True) 

237 controller_cmd = List(['ipcontroller.exe'], config=True) 

238 controller_args = List(['--log-level=40'], config=True) 

239 # I don't want these to be configurable 

240 std_out_file_path = Unicode('', config=False) 

241 std_err_file_path = Unicode('', config=False) 

242 min_cores = Integer(1, config=False) 

243 max_cores = Integer(1, config=False) 

244 min_sockets = Integer(1, config=False) 

245 max_sockets = Integer(1, config=False) 

246 min_nodes = Integer(1, config=False) 

247 max_nodes = Integer(1, config=False) 

248 unit_type = Unicode("Core", config=False) 

249 work_directory = Unicode('', config=False) 

250 

251 def __init__(self, **kwargs): 

252 super().__init__(**kwargs) 

253 the_uuid = uuid.uuid1() 

254 self.std_out_file_path = os.path.join('log', f'ipcontroller-{the_uuid}.out') 

255 self.std_err_file_path = os.path.join('log', f'ipcontroller-{the_uuid}.err') 

256 

257 @property 

258 def command_line(self): 

259 return ' '.join(self.controller_cmd + self.controller_args) 

260 

261 

262class IPEngineTask(WinHPCTask): 

263 task_name = Unicode('IPEngine', config=True) 

264 engine_cmd = List(['ipengine.exe'], config=True) 

265 engine_args = List(['--log-level=40'], config=True) 

266 # I don't want these to be configurable 

267 std_out_file_path = Unicode('', config=False) 

268 std_err_file_path = Unicode('', config=False) 

269 min_cores = Integer(1, config=False) 

270 max_cores = Integer(1, config=False) 

271 min_sockets = Integer(1, config=False) 

272 max_sockets = Integer(1, config=False) 

273 min_nodes = Integer(1, config=False) 

274 max_nodes = Integer(1, config=False) 

275 unit_type = Unicode("Core", config=False) 

276 work_directory = Unicode('', config=False) 

277 

278 def __init__(self, **kwargs): 

279 super().__init__(**kwargs) 

280 the_uuid = uuid.uuid1() 

281 self.std_out_file_path = os.path.join('log', f'ipengine-{the_uuid}.out') 

282 self.std_err_file_path = os.path.join('log', f'ipengine-{the_uuid}.err') 

283 

284 @property 

285 def command_line(self): 

286 return ' '.join(self.engine_cmd + self.engine_args)