Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/jobs/job.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

246 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from functools import cached_property, lru_cache 

21from time import sleep 

22from typing import TYPE_CHECKING, Callable, NoReturn 

23 

24from sqlalchemy import Column, Index, Integer, String, case, select 

25from sqlalchemy.exc import OperationalError 

26from sqlalchemy.orm import backref, foreign, relationship 

27from sqlalchemy.orm.session import make_transient 

28 

29from airflow.api_internal.internal_api_call import internal_api_call 

30from airflow.configuration import conf 

31from airflow.exceptions import AirflowException 

32from airflow.executors.executor_loader import ExecutorLoader 

33from airflow.listeners.listener import get_listener_manager 

34from airflow.models.base import ID_LEN, Base 

35from airflow.serialization.pydantic.job import JobPydantic 

36from airflow.stats import Stats 

37from airflow.utils import timezone 

38from airflow.utils.helpers import convert_camel_to_snake 

39from airflow.utils.log.logging_mixin import LoggingMixin 

40from airflow.utils.net import get_hostname 

41from airflow.utils.platform import getuser 

42from airflow.utils.retries import retry_db_transaction 

43from airflow.utils.session import NEW_SESSION, provide_session 

44from airflow.utils.sqlalchemy import UtcDateTime 

45from airflow.utils.state import JobState 

46 

47if TYPE_CHECKING: 

48 import datetime 

49 

50 from sqlalchemy.orm.session import Session 

51 

52 from airflow.executors.base_executor import BaseExecutor 

53 

54 

55def _resolve_dagrun_model(): 

56 from airflow.models.dagrun import DagRun 

57 

58 return DagRun 

59 

60 

61@lru_cache 

62def health_check_threshold(job_type: str, heartrate: int) -> int | float: 

63 grace_multiplier = 2.1 

64 health_check_threshold_value: int | float 

65 if job_type == "SchedulerJob": 

66 health_check_threshold_value = conf.getint("scheduler", "scheduler_health_check_threshold") 

67 elif job_type == "TriggererJob": 

68 health_check_threshold_value = conf.getfloat("triggerer", "triggerer_health_check_threshold") 

69 else: 

70 health_check_threshold_value = heartrate * grace_multiplier 

71 return health_check_threshold_value 

72 

73 

74class Job(Base, LoggingMixin): 

75 """ 

76 The ORM class representing Job stored in the database. 

77 

78 Jobs are processing items with state and duration that aren't task instances. 

79 For instance a BackfillJob is a collection of task instance runs, 

80 but should have its own state, start and end time. 

81 """ 

82 

83 __tablename__ = "job" 

84 

85 id = Column(Integer, primary_key=True) 

86 dag_id = Column( 

87 String(ID_LEN), 

88 ) 

89 state = Column(String(20)) 

90 job_type = Column(String(30)) 

91 start_date = Column(UtcDateTime()) 

92 end_date = Column(UtcDateTime()) 

93 latest_heartbeat = Column(UtcDateTime()) 

94 executor_class = Column(String(500)) 

95 hostname = Column(String(500)) 

96 unixname = Column(String(1000)) 

97 

98 __table_args__ = ( 

99 Index("job_type_heart", job_type, latest_heartbeat), 

100 Index("idx_job_state_heartbeat", state, latest_heartbeat), 

101 Index("idx_job_dag_id", dag_id), 

102 ) 

103 

104 task_instances_enqueued = relationship( 

105 "TaskInstance", 

106 primaryjoin="Job.id == foreign(TaskInstance.queued_by_job_id)", 

107 backref=backref("queued_by_job", uselist=False), 

108 ) 

109 

110 dag_runs = relationship( 

111 "DagRun", 

112 primaryjoin=lambda: Job.id == foreign(_resolve_dagrun_model().creating_job_id), 

113 backref="creating_job", 

114 ) 

115 

116 """ 

117 TaskInstances which have been enqueued by this Job. 

118 

119 Only makes sense for SchedulerJob and BackfillJob instances. 

120 """ 

121 

122 def __init__(self, executor: BaseExecutor | None = None, heartrate=None, **kwargs): 

123 # Save init parameters as DB fields 

124 self.heartbeat_failed = False 

125 self.hostname = get_hostname() 

126 if executor: 

127 self.executor = executor 

128 self.executors = [executor] 

129 self.start_date = timezone.utcnow() 

130 self.latest_heartbeat = timezone.utcnow() 

131 self.previous_heartbeat = None 

132 if heartrate is not None: 

133 self.heartrate = heartrate 

134 self.unixname = getuser() 

135 self.max_tis_per_query: int = conf.getint("scheduler", "max_tis_per_query") 

136 get_listener_manager().hook.on_starting(component=self) 

137 super().__init__(**kwargs) 

138 

139 @cached_property 

140 def executor(self): 

141 return ExecutorLoader.get_default_executor() 

142 

143 @cached_property 

144 def executors(self): 

145 return ExecutorLoader.init_executors() 

146 

147 @cached_property 

148 def heartrate(self) -> float: 

149 return Job._heartrate(self.job_type) 

150 

151 def is_alive(self) -> bool: 

152 """ 

153 Is this job currently alive. 

154 

155 We define alive as in a state of RUNNING, and having sent a heartbeat 

156 within a multiple of the heartrate (default of 2.1) 

157 """ 

158 threshold_value = health_check_threshold(self.job_type, self.heartrate) 

159 return Job._is_alive( 

160 state=self.state, 

161 health_check_threshold_value=threshold_value, 

162 latest_heartbeat=self.latest_heartbeat, 

163 ) 

164 

165 @provide_session 

166 def kill(self, session: Session = NEW_SESSION) -> NoReturn: 

167 """Handle on_kill callback and updates state in database.""" 

168 try: 

169 self.on_kill() 

170 except Exception as e: 

171 self.log.error("on_kill() method failed: %s", e) 

172 

173 Job._kill(job_id=self.id, session=session) 

174 raise AirflowException("Job shut down externally.") 

175 

176 def on_kill(self): 

177 """Will be called when an external kill command is received.""" 

178 

179 @provide_session 

180 def heartbeat( 

181 self, heartbeat_callback: Callable[[Session], None], session: Session = NEW_SESSION 

182 ) -> None: 

183 """ 

184 Update the job's entry in the database with the latest_heartbeat timestamp. 

185 

186 This allows for the job to be killed externally and allows the system 

187 to monitor what is actually active. For instance, an old heartbeat 

188 for SchedulerJob would mean something is wrong. This also allows for 

189 any job to be killed externally, regardless of who is running it or on 

190 which machine it is running. 

191 

192 Note that if your heart rate is set to 60 seconds and you call this 

193 method after 10 seconds of processing since the last heartbeat, it 

194 will sleep 50 seconds to complete the 60 seconds and keep a steady 

195 heart rate. If you go over 60 seconds before calling it, it won't 

196 sleep at all. 

197 

198 :param heartbeat_callback: Callback that will be run when the heartbeat is recorded in the Job 

199 :param session to use for saving the job 

200 """ 

201 previous_heartbeat = self.latest_heartbeat 

202 

203 try: 

204 # This will cause it to load from the db 

205 self._merge_from(Job._fetch_from_db(self, session)) 

206 previous_heartbeat = self.latest_heartbeat 

207 

208 if self.state == JobState.RESTARTING: 

209 self.kill() 

210 

211 # Figure out how long to sleep for 

212 sleep_for = 0 

213 if self.latest_heartbeat: 

214 seconds_remaining = ( 

215 self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds() 

216 ) 

217 sleep_for = max(0, seconds_remaining) 

218 sleep(sleep_for) 

219 

220 job = Job._update_heartbeat(job=self, session=session) 

221 self._merge_from(job) 

222 time_since_last_heartbeat = (timezone.utcnow() - previous_heartbeat).total_seconds() 

223 health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate) 

224 if time_since_last_heartbeat > health_check_threshold_value: 

225 self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat) 

226 # At this point, the DB has updated. 

227 previous_heartbeat = self.latest_heartbeat 

228 

229 heartbeat_callback(session) 

230 self.log.debug("[heartbeat]") 

231 self.heartbeat_failed = False 

232 except OperationalError: 

233 Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) 

234 if not self.heartbeat_failed: 

235 self.log.exception("%s heartbeat failed with error", self.__class__.__name__) 

236 self.heartbeat_failed = True 

237 if self.is_alive(): 

238 self.log.error( 

239 "%s heartbeat failed with error. Scheduler may go into unhealthy state", 

240 self.__class__.__name__, 

241 ) 

242 else: 

243 self.log.error( 

244 "%s heartbeat failed with error. Scheduler is in unhealthy state", self.__class__.__name__ 

245 ) 

246 # We didn't manage to heartbeat, so make sure that the timestamp isn't updated 

247 self.latest_heartbeat = previous_heartbeat 

248 

249 @provide_session 

250 def prepare_for_execution(self, session: Session = NEW_SESSION): 

251 """Prepare the job for execution.""" 

252 Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1) 

253 self.state = JobState.RUNNING 

254 self.start_date = timezone.utcnow() 

255 self._merge_from(Job._add_to_db(job=self, session=session)) 

256 make_transient(self) 

257 

258 @provide_session 

259 def complete_execution(self, session: Session = NEW_SESSION): 

260 get_listener_manager().hook.before_stopping(component=self) 

261 self.end_date = timezone.utcnow() 

262 Job._update_in_db(job=self, session=session) 

263 Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1) 

264 

265 @provide_session 

266 def most_recent_job(self, session: Session = NEW_SESSION) -> Job | JobPydantic | None: 

267 """Return the most recent job of this type, if any, based on last heartbeat received.""" 

268 return most_recent_job(self.job_type, session=session) 

269 

270 def _merge_from(self, job: Job | JobPydantic | None): 

271 if job is None: 

272 self.log.error("Job is empty: %s", self.id) 

273 return 

274 self.id = job.id 

275 self.dag_id = job.dag_id 

276 self.state = job.state 

277 self.job_type = job.job_type 

278 self.start_date = job.start_date 

279 self.end_date = job.end_date 

280 self.latest_heartbeat = job.latest_heartbeat 

281 self.executor_class = job.executor_class 

282 self.hostname = job.hostname 

283 self.unixname = job.unixname 

284 

285 @staticmethod 

286 def _heartrate(job_type: str) -> float: 

287 if job_type == "TriggererJob": 

288 return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") 

289 elif job_type == "SchedulerJob": 

290 return conf.getfloat("scheduler", "SCHEDULER_HEARTBEAT_SEC") 

291 else: 

292 # Heartrate used to be hardcoded to scheduler, so in all other 

293 # cases continue to use that value for back compat 

294 return conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC") 

295 

296 @staticmethod 

297 def _is_alive( 

298 state: JobState | str | None, 

299 health_check_threshold_value: float | int, 

300 latest_heartbeat: datetime.datetime, 

301 ) -> bool: 

302 return ( 

303 state == JobState.RUNNING 

304 and (timezone.utcnow() - latest_heartbeat).total_seconds() < health_check_threshold_value 

305 ) 

306 

307 @staticmethod 

308 @internal_api_call 

309 @provide_session 

310 def _kill(job_id: str, session: Session = NEW_SESSION) -> Job | JobPydantic: 

311 job = session.scalar(select(Job).where(Job.id == job_id).limit(1)) 

312 job.end_date = timezone.utcnow() 

313 session.merge(job) 

314 session.commit() 

315 return job 

316 

317 @staticmethod 

318 @internal_api_call 

319 @provide_session 

320 @retry_db_transaction 

321 def _fetch_from_db(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic | None: 

322 if isinstance(job, Job): 

323 # not Internal API 

324 session.merge(job) 

325 return job 

326 # Internal API, 

327 return session.scalar(select(Job).where(Job.id == job.id).limit(1)) 

328 

329 @staticmethod 

330 @internal_api_call 

331 @provide_session 

332 def _add_to_db(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic: 

333 if isinstance(job, JobPydantic): 

334 orm_job = Job() 

335 orm_job._merge_from(job) 

336 else: 

337 orm_job = job 

338 session.add(orm_job) 

339 session.commit() 

340 return orm_job 

341 

342 @staticmethod 

343 @internal_api_call 

344 @provide_session 

345 def _update_in_db(job: Job | JobPydantic, session: Session = NEW_SESSION): 

346 if isinstance(job, Job): 

347 # not Internal API 

348 session.merge(job) 

349 session.commit() 

350 # Internal API. 

351 orm_job: Job | None = session.scalar(select(Job).where(Job.id == job.id).limit(1)) 

352 if orm_job is None: 

353 return 

354 orm_job._merge_from(job) 

355 session.merge(orm_job) 

356 session.commit() 

357 

358 @staticmethod 

359 @internal_api_call 

360 @provide_session 

361 @retry_db_transaction 

362 def _update_heartbeat(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic: 

363 orm_job: Job | None = session.scalar(select(Job).where(Job.id == job.id).limit(1)) 

364 if orm_job is None: 

365 return job 

366 orm_job.latest_heartbeat = timezone.utcnow() 

367 session.merge(orm_job) 

368 session.commit() 

369 return orm_job 

370 

371 

372@internal_api_call 

373@provide_session 

374def most_recent_job(job_type: str, session: Session = NEW_SESSION) -> Job | JobPydantic | None: 

375 """ 

376 Return the most recent job of this type, if any, based on last heartbeat received. 

377 

378 Jobs in "running" state take precedence over others to make sure alive 

379 job is returned if it is available. 

380 

381 :param job_type: job type to query for to get the most recent job for 

382 :param session: Database session 

383 """ 

384 return session.scalar( 

385 select(Job) 

386 .where(Job.job_type == job_type) 

387 .order_by( 

388 # Put "running" jobs at the front. 

389 case({JobState.RUNNING: 0}, value=Job.state, else_=1), 

390 Job.latest_heartbeat.desc(), 

391 ) 

392 .limit(1) 

393 ) 

394 

395 

396@provide_session 

397def run_job( 

398 job: Job, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION 

399) -> int | None: 

400 """ 

401 Run the job. 

402 

403 The Job is always an ORM object and setting the state is happening within the 

404 same DB session and the session is kept open throughout the whole execution. 

405 

406 :meta private: 

407 """ 

408 job.prepare_for_execution(session=session) 

409 try: 

410 return execute_job(job, execute_callable=execute_callable) 

411 finally: 

412 job.complete_execution(session=session) 

413 

414 

415def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | None: 

416 """ 

417 Execute the job. 

418 

419 Job execution requires no session as generally executing session does not require an 

420 active database connection. The session might be temporary acquired and used if the job 

421 runs heartbeat during execution, but this connection is only acquired for the time of heartbeat 

422 and in case of AIP-44 implementation it happens over the Internal API rather than directly via 

423 the database. 

424 

425 After the job is completed, state of the Job is updated and it should be updated in the database, 

426 which happens in the "complete_execution" step (which again can be executed locally in case of 

427 database operations or over the Internal API call. 

428 

429 :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does 

430 not really matter, because except of running the heartbeat and state setting, 

431 the runner should not modify the job state. 

432 

433 :param execute_callable: callable to execute when running the job. 

434 

435 :meta private: 

436 """ 

437 ret = None 

438 try: 

439 ret = execute_callable() 

440 # In case of max runs or max duration 

441 job.state = JobState.SUCCESS 

442 except SystemExit: 

443 # In case of ^C or SIGTERM 

444 job.state = JobState.SUCCESS 

445 except Exception: 

446 job.state = JobState.FAILED 

447 raise 

448 return ret 

449 

450 

451def perform_heartbeat( 

452 job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool 

453) -> None: 

454 """ 

455 Perform heartbeat for the Job passed to it,optionally checking if it is necessary. 

456 

457 :param job: job to perform heartbeat for 

458 :param heartbeat_callback: callback to run by the heartbeat 

459 :param only_if_necessary: only heartbeat if it is necessary (i.e. if there are things to run for 

460 triggerer for example) 

461 """ 

462 seconds_remaining: float = 0.0 

463 if job.latest_heartbeat and job.heartrate: 

464 seconds_remaining = job.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds() 

465 if seconds_remaining > 0 and only_if_necessary: 

466 return 

467 job.heartbeat(heartbeat_callback=heartbeat_callback)