Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/jobs/job.py: 41%

148 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from functools import cached_property 

21from time import sleep 

22from typing import Callable, NoReturn 

23 

24from sqlalchemy import Column, Index, Integer, String, case, select 

25from sqlalchemy.exc import OperationalError 

26from sqlalchemy.orm import backref, foreign, relationship 

27from sqlalchemy.orm.session import Session, make_transient 

28 

29from airflow.configuration import conf 

30from airflow.exceptions import AirflowException 

31from airflow.executors.executor_loader import ExecutorLoader 

32from airflow.listeners.listener import get_listener_manager 

33from airflow.models.base import ID_LEN, Base 

34from airflow.serialization.pydantic.job import JobPydantic 

35from airflow.stats import Stats 

36from airflow.utils import timezone 

37from airflow.utils.helpers import convert_camel_to_snake 

38from airflow.utils.log.logging_mixin import LoggingMixin 

39from airflow.utils.net import get_hostname 

40from airflow.utils.platform import getuser 

41from airflow.utils.session import NEW_SESSION, create_session, provide_session 

42from airflow.utils.sqlalchemy import UtcDateTime 

43from airflow.utils.state import State 

44 

45 

46def _resolve_dagrun_model(): 

47 from airflow.models.dagrun import DagRun 

48 

49 return DagRun 

50 

51 

52class Job(Base, LoggingMixin): 

53 """ 

54 The ORM class representing Job stored in the database. 

55 

56 Jobs are processing items with state and duration that aren't task instances. 

57 For instance a BackfillJob is a collection of task instance runs, 

58 but should have its own state, start and end time. 

59 """ 

60 

61 __tablename__ = "job" 

62 

63 id = Column(Integer, primary_key=True) 

64 dag_id = Column( 

65 String(ID_LEN), 

66 ) 

67 state = Column(String(20)) 

68 job_type = Column(String(30)) 

69 start_date = Column(UtcDateTime()) 

70 end_date = Column(UtcDateTime()) 

71 latest_heartbeat = Column(UtcDateTime()) 

72 executor_class = Column(String(500)) 

73 hostname = Column(String(500)) 

74 unixname = Column(String(1000)) 

75 

76 __table_args__ = ( 

77 Index("job_type_heart", job_type, latest_heartbeat), 

78 Index("idx_job_state_heartbeat", state, latest_heartbeat), 

79 Index("idx_job_dag_id", dag_id), 

80 ) 

81 

82 task_instances_enqueued = relationship( 

83 "TaskInstance", 

84 primaryjoin="Job.id == foreign(TaskInstance.queued_by_job_id)", 

85 backref=backref("queued_by_job", uselist=False), 

86 ) 

87 

88 dag_runs = relationship( 

89 "DagRun", 

90 primaryjoin=lambda: Job.id == foreign(_resolve_dagrun_model().creating_job_id), 

91 backref="creating_job", 

92 ) 

93 

94 """ 

95 TaskInstances which have been enqueued by this Job. 

96 

97 Only makes sense for SchedulerJob and BackfillJob instances. 

98 """ 

99 

100 heartrate = conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC") 

101 

102 def __init__(self, executor=None, heartrate=None, **kwargs): 

103 # Save init parameters as DB fields 

104 self.hostname = get_hostname() 

105 if executor: 

106 self.executor = executor 

107 self.executor_class = executor.__class__.__name__ 

108 else: 

109 self.executor_class = conf.get("core", "EXECUTOR") 

110 self.start_date = timezone.utcnow() 

111 self.latest_heartbeat = timezone.utcnow() 

112 if heartrate is not None: 

113 self.heartrate = heartrate 

114 self.unixname = getuser() 

115 self.max_tis_per_query: int = conf.getint("scheduler", "max_tis_per_query") 

116 get_listener_manager().hook.on_starting(component=self) 

117 super().__init__(**kwargs) 

118 

119 @cached_property 

120 def executor(self): 

121 return ExecutorLoader.get_default_executor() 

122 

123 def is_alive(self, grace_multiplier=2.1): 

124 """ 

125 Is this job currently alive. 

126 

127 We define alive as in a state of RUNNING, and having sent a heartbeat 

128 within a multiple of the heartrate (default of 2.1) 

129 

130 :param grace_multiplier: multiplier of heartrate to require heart beat 

131 within 

132 """ 

133 if self.job_type == "SchedulerJob": 

134 health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold") 

135 else: 

136 health_check_threshold: int = self.heartrate * grace_multiplier 

137 return ( 

138 self.state == State.RUNNING 

139 and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < health_check_threshold 

140 ) 

141 

142 @provide_session 

143 def kill(self, session: Session = NEW_SESSION) -> NoReturn: 

144 """Handles on_kill callback and updates state in database.""" 

145 job = session.scalar(select(Job).where(Job.id == self.id).limit(1)) 

146 job.end_date = timezone.utcnow() 

147 try: 

148 self.on_kill() 

149 except Exception as e: 

150 self.log.error("on_kill() method failed: %s", str(e)) 

151 session.merge(job) 

152 session.commit() 

153 raise AirflowException("Job shut down externally.") 

154 

155 def on_kill(self): 

156 """Will be called when an external kill command is received.""" 

157 

158 @provide_session 

159 def heartbeat( 

160 self, heartbeat_callback: Callable[[Session], None], session: Session = NEW_SESSION 

161 ) -> None: 

162 """ 

163 Heartbeats update the job's entry in the database with a timestamp 

164 for the latest_heartbeat and allows for the job to be killed 

165 externally. This allows at the system level to monitor what is 

166 actually active. 

167 

168 For instance, an old heartbeat for SchedulerJob would mean something 

169 is wrong. 

170 

171 This also allows for any job to be killed externally, regardless 

172 of who is running it or on which machine it is running. 

173 

174 Note that if your heart rate is set to 60 seconds and you call this 

175 method after 10 seconds of processing since the last heartbeat, it 

176 will sleep 50 seconds to complete the 60 seconds and keep a steady 

177 heart rate. If you go over 60 seconds before calling it, it won't 

178 sleep at all. 

179 

180 :param heartbeat_callback: Callback that will be run when the heartbeat is recorded in the Job 

181 :param session to use for saving the job 

182 """ 

183 previous_heartbeat = self.latest_heartbeat 

184 

185 try: 

186 # This will cause it to load from the db 

187 session.merge(self) 

188 previous_heartbeat = self.latest_heartbeat 

189 

190 if self.state in State.terminating_states: 

191 # TODO: Make sure it is AIP-44 compliant 

192 self.kill() 

193 

194 # Figure out how long to sleep for 

195 sleep_for = 0 

196 if self.latest_heartbeat: 

197 seconds_remaining = ( 

198 self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds() 

199 ) 

200 sleep_for = max(0, seconds_remaining) 

201 sleep(sleep_for) 

202 

203 # Update last heartbeat time 

204 with create_session() as session: 

205 # Make the session aware of this object 

206 session.merge(self) 

207 self.latest_heartbeat = timezone.utcnow() 

208 session.commit() 

209 # At this point, the DB has updated. 

210 previous_heartbeat = self.latest_heartbeat 

211 

212 heartbeat_callback(session) 

213 self.log.debug("[heartbeat]") 

214 except OperationalError: 

215 Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) 

216 self.log.exception("%s heartbeat got an exception", self.__class__.__name__) 

217 # We didn't manage to heartbeat, so make sure that the timestamp isn't updated 

218 self.latest_heartbeat = previous_heartbeat 

219 

220 @provide_session 

221 def prepare_for_execution(self, session: Session = NEW_SESSION): 

222 """Prepares the job for execution.""" 

223 Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1) 

224 self.state = State.RUNNING 

225 self.start_date = timezone.utcnow() 

226 session.add(self) 

227 session.commit() 

228 make_transient(self) 

229 

230 @provide_session 

231 def complete_execution(self, session: Session = NEW_SESSION): 

232 get_listener_manager().hook.before_stopping(component=self) 

233 self.end_date = timezone.utcnow() 

234 session.merge(self) 

235 session.commit() 

236 Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1) 

237 

238 @provide_session 

239 def most_recent_job(self, session: Session = NEW_SESSION) -> Job | None: 

240 """Returns the most recent job of this type, if any, based on last heartbeat received.""" 

241 return most_recent_job(self.job_type, session=session) 

242 

243 

244@provide_session 

245def most_recent_job(job_type: str, session: Session = NEW_SESSION) -> Job | None: 

246 """ 

247 Return the most recent job of this type, if any, based on last heartbeat received. 

248 

249 Jobs in "running" state take precedence over others to make sure alive 

250 job is returned if it is available. 

251 

252 :param job_type: job type to query for to get the most recent job for 

253 :param session: Database session 

254 """ 

255 return session.scalar( 

256 select(Job) 

257 .where(Job.job_type == job_type) 

258 .order_by( 

259 # Put "running" jobs at the front. 

260 case({State.RUNNING: 0}, value=Job.state, else_=1), 

261 Job.latest_heartbeat.desc(), 

262 ) 

263 .limit(1) 

264 ) 

265 

266 

267@provide_session 

268def run_job( 

269 job: Job | JobPydantic, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION 

270) -> int | None: 

271 """ 

272 Runs the job. 

273 

274 The Job is always an ORM object and setting the state is happening within the 

275 same DB session and the session is kept open throughout the whole execution. 

276 

277 :meta private: 

278 

279 TODO: Maybe we should not keep the session during job execution ?. 

280 """ 

281 # The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it 

282 # once final AIP-44 changes are completed. 

283 assert not isinstance(job, JobPydantic), "Job should be ORM object not Pydantic one here (AIP-44 WIP)" 

284 job.prepare_for_execution(session=session) 

285 try: 

286 return execute_job(job, execute_callable=execute_callable) 

287 finally: 

288 job.complete_execution(session=session) 

289 

290 

291def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | None]) -> int | None: 

292 """ 

293 Executes the job. 

294 

295 Job execution requires no session as generally executing session does not require an 

296 active database connection. The session might be temporary acquired and used if the job 

297 runs heartbeat during execution, but this connection is only acquired for the time of heartbeat 

298 and in case of AIP-44 implementation it happens over the Internal API rather than directly via 

299 the database. 

300 

301 After the job is completed, state of the Job is updated and it should be updated in the database, 

302 which happens in the "complete_execution" step (which again can be executed locally in case of 

303 database operations or over the Internal API call. 

304 

305 :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does 

306 not really matter, because except of running the heartbeat and state setting, 

307 the runner should not modify the job state. 

308 

309 :param execute_callable: callable to execute when running the job. 

310 

311 :meta private: 

312 """ 

313 ret = None 

314 try: 

315 ret = execute_callable() 

316 # In case of max runs or max duration 

317 job.state = State.SUCCESS 

318 except SystemExit: 

319 # In case of ^C or SIGTERM 

320 job.state = State.SUCCESS 

321 except Exception: 

322 job.state = State.FAILED 

323 raise 

324 return ret 

325 

326 

327def perform_heartbeat( 

328 job: Job | JobPydantic, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool 

329) -> None: 

330 """ 

331 Performs heartbeat for the Job passed to it,optionally checking if it is necessary. 

332 

333 :param job: job to perform heartbeat for 

334 :param heartbeat_callback: callback to run by the heartbeat 

335 :param only_if_necessary: only heartbeat if it is necessary (i.e. if there are things to run for 

336 triggerer for example) 

337 """ 

338 # The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it 

339 # once final AIP-44 changes are completed. 

340 assert not isinstance(job, JobPydantic), "Job should be ORM object not Pydantic one here (AIP-44 WIP)" 

341 seconds_remaining: float = 0.0 

342 if job.latest_heartbeat and job.heartrate: 

343 seconds_remaining = job.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds() 

344 if seconds_remaining > 0 and only_if_necessary: 

345 return 

346 with create_session() as session: 

347 job.heartbeat(heartbeat_callback=heartbeat_callback, session=session)