Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/jobs/job.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from functools import cached_property, lru_cache
21from time import sleep
22from typing import TYPE_CHECKING, Callable, NoReturn
24from sqlalchemy import Column, Index, Integer, String, case, select
25from sqlalchemy.exc import OperationalError
26from sqlalchemy.orm import backref, foreign, relationship
27from sqlalchemy.orm.session import make_transient
29from airflow.api_internal.internal_api_call import internal_api_call
30from airflow.configuration import conf
31from airflow.exceptions import AirflowException
32from airflow.executors.executor_loader import ExecutorLoader
33from airflow.listeners.listener import get_listener_manager
34from airflow.models.base import ID_LEN, Base
35from airflow.serialization.pydantic.job import JobPydantic
36from airflow.stats import Stats
37from airflow.utils import timezone
38from airflow.utils.helpers import convert_camel_to_snake
39from airflow.utils.log.logging_mixin import LoggingMixin
40from airflow.utils.net import get_hostname
41from airflow.utils.platform import getuser
42from airflow.utils.retries import retry_db_transaction
43from airflow.utils.session import NEW_SESSION, provide_session
44from airflow.utils.sqlalchemy import UtcDateTime
45from airflow.utils.state import JobState
47if TYPE_CHECKING:
48 import datetime
50 from sqlalchemy.orm.session import Session
52 from airflow.executors.base_executor import BaseExecutor
55def _resolve_dagrun_model():
56 from airflow.models.dagrun import DagRun
58 return DagRun
61@lru_cache
62def health_check_threshold(job_type: str, heartrate: int) -> int | float:
63 grace_multiplier = 2.1
64 health_check_threshold_value: int | float
65 if job_type == "SchedulerJob":
66 health_check_threshold_value = conf.getint("scheduler", "scheduler_health_check_threshold")
67 elif job_type == "TriggererJob":
68 health_check_threshold_value = conf.getfloat("triggerer", "triggerer_health_check_threshold")
69 else:
70 health_check_threshold_value = heartrate * grace_multiplier
71 return health_check_threshold_value
74class Job(Base, LoggingMixin):
75 """
76 The ORM class representing Job stored in the database.
78 Jobs are processing items with state and duration that aren't task instances.
79 For instance a BackfillJob is a collection of task instance runs,
80 but should have its own state, start and end time.
81 """
83 __tablename__ = "job"
85 id = Column(Integer, primary_key=True)
86 dag_id = Column(
87 String(ID_LEN),
88 )
89 state = Column(String(20))
90 job_type = Column(String(30))
91 start_date = Column(UtcDateTime())
92 end_date = Column(UtcDateTime())
93 latest_heartbeat = Column(UtcDateTime())
94 executor_class = Column(String(500))
95 hostname = Column(String(500))
96 unixname = Column(String(1000))
98 __table_args__ = (
99 Index("job_type_heart", job_type, latest_heartbeat),
100 Index("idx_job_state_heartbeat", state, latest_heartbeat),
101 Index("idx_job_dag_id", dag_id),
102 )
104 task_instances_enqueued = relationship(
105 "TaskInstance",
106 primaryjoin="Job.id == foreign(TaskInstance.queued_by_job_id)",
107 backref=backref("queued_by_job", uselist=False),
108 )
110 dag_runs = relationship(
111 "DagRun",
112 primaryjoin=lambda: Job.id == foreign(_resolve_dagrun_model().creating_job_id),
113 backref="creating_job",
114 )
116 """
117 TaskInstances which have been enqueued by this Job.
119 Only makes sense for SchedulerJob and BackfillJob instances.
120 """
122 def __init__(self, executor: BaseExecutor | None = None, heartrate=None, **kwargs):
123 # Save init parameters as DB fields
124 self.heartbeat_failed = False
125 self.hostname = get_hostname()
126 if executor:
127 self.executor = executor
128 self.executors = [executor]
129 self.start_date = timezone.utcnow()
130 self.latest_heartbeat = timezone.utcnow()
131 self.previous_heartbeat = None
132 if heartrate is not None:
133 self.heartrate = heartrate
134 self.unixname = getuser()
135 self.max_tis_per_query: int = conf.getint("scheduler", "max_tis_per_query")
136 get_listener_manager().hook.on_starting(component=self)
137 super().__init__(**kwargs)
139 @cached_property
140 def executor(self):
141 return ExecutorLoader.get_default_executor()
143 @cached_property
144 def executors(self):
145 return ExecutorLoader.init_executors()
147 @cached_property
148 def heartrate(self) -> float:
149 return Job._heartrate(self.job_type)
151 def is_alive(self) -> bool:
152 """
153 Is this job currently alive.
155 We define alive as in a state of RUNNING, and having sent a heartbeat
156 within a multiple of the heartrate (default of 2.1)
157 """
158 threshold_value = health_check_threshold(self.job_type, self.heartrate)
159 return Job._is_alive(
160 state=self.state,
161 health_check_threshold_value=threshold_value,
162 latest_heartbeat=self.latest_heartbeat,
163 )
165 @provide_session
166 def kill(self, session: Session = NEW_SESSION) -> NoReturn:
167 """Handle on_kill callback and updates state in database."""
168 try:
169 self.on_kill()
170 except Exception as e:
171 self.log.error("on_kill() method failed: %s", e)
173 Job._kill(job_id=self.id, session=session)
174 raise AirflowException("Job shut down externally.")
176 def on_kill(self):
177 """Will be called when an external kill command is received."""
179 @provide_session
180 def heartbeat(
181 self, heartbeat_callback: Callable[[Session], None], session: Session = NEW_SESSION
182 ) -> None:
183 """
184 Update the job's entry in the database with the latest_heartbeat timestamp.
186 This allows for the job to be killed externally and allows the system
187 to monitor what is actually active. For instance, an old heartbeat
188 for SchedulerJob would mean something is wrong. This also allows for
189 any job to be killed externally, regardless of who is running it or on
190 which machine it is running.
192 Note that if your heart rate is set to 60 seconds and you call this
193 method after 10 seconds of processing since the last heartbeat, it
194 will sleep 50 seconds to complete the 60 seconds and keep a steady
195 heart rate. If you go over 60 seconds before calling it, it won't
196 sleep at all.
198 :param heartbeat_callback: Callback that will be run when the heartbeat is recorded in the Job
199 :param session to use for saving the job
200 """
201 previous_heartbeat = self.latest_heartbeat
203 try:
204 # This will cause it to load from the db
205 self._merge_from(Job._fetch_from_db(self, session))
206 previous_heartbeat = self.latest_heartbeat
208 if self.state == JobState.RESTARTING:
209 self.kill()
211 # Figure out how long to sleep for
212 sleep_for = 0
213 if self.latest_heartbeat:
214 seconds_remaining = (
215 self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
216 )
217 sleep_for = max(0, seconds_remaining)
218 sleep(sleep_for)
220 job = Job._update_heartbeat(job=self, session=session)
221 self._merge_from(job)
222 time_since_last_heartbeat = (timezone.utcnow() - previous_heartbeat).total_seconds()
223 health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate)
224 if time_since_last_heartbeat > health_check_threshold_value:
225 self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat)
226 # At this point, the DB has updated.
227 previous_heartbeat = self.latest_heartbeat
229 heartbeat_callback(session)
230 self.log.debug("[heartbeat]")
231 self.heartbeat_failed = False
232 except OperationalError:
233 Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
234 if not self.heartbeat_failed:
235 self.log.exception("%s heartbeat failed with error", self.__class__.__name__)
236 self.heartbeat_failed = True
237 if self.is_alive():
238 self.log.error(
239 "%s heartbeat failed with error. Scheduler may go into unhealthy state",
240 self.__class__.__name__,
241 )
242 else:
243 self.log.error(
244 "%s heartbeat failed with error. Scheduler is in unhealthy state", self.__class__.__name__
245 )
246 # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
247 self.latest_heartbeat = previous_heartbeat
249 @provide_session
250 def prepare_for_execution(self, session: Session = NEW_SESSION):
251 """Prepare the job for execution."""
252 Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
253 self.state = JobState.RUNNING
254 self.start_date = timezone.utcnow()
255 self._merge_from(Job._add_to_db(job=self, session=session))
256 make_transient(self)
258 @provide_session
259 def complete_execution(self, session: Session = NEW_SESSION):
260 get_listener_manager().hook.before_stopping(component=self)
261 self.end_date = timezone.utcnow()
262 Job._update_in_db(job=self, session=session)
263 Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1)
265 @provide_session
266 def most_recent_job(self, session: Session = NEW_SESSION) -> Job | JobPydantic | None:
267 """Return the most recent job of this type, if any, based on last heartbeat received."""
268 return most_recent_job(self.job_type, session=session)
270 def _merge_from(self, job: Job | JobPydantic | None):
271 if job is None:
272 self.log.error("Job is empty: %s", self.id)
273 return
274 self.id = job.id
275 self.dag_id = job.dag_id
276 self.state = job.state
277 self.job_type = job.job_type
278 self.start_date = job.start_date
279 self.end_date = job.end_date
280 self.latest_heartbeat = job.latest_heartbeat
281 self.executor_class = job.executor_class
282 self.hostname = job.hostname
283 self.unixname = job.unixname
285 @staticmethod
286 def _heartrate(job_type: str) -> float:
287 if job_type == "TriggererJob":
288 return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
289 elif job_type == "SchedulerJob":
290 return conf.getfloat("scheduler", "SCHEDULER_HEARTBEAT_SEC")
291 else:
292 # Heartrate used to be hardcoded to scheduler, so in all other
293 # cases continue to use that value for back compat
294 return conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC")
296 @staticmethod
297 def _is_alive(
298 state: JobState | str | None,
299 health_check_threshold_value: float | int,
300 latest_heartbeat: datetime.datetime,
301 ) -> bool:
302 return (
303 state == JobState.RUNNING
304 and (timezone.utcnow() - latest_heartbeat).total_seconds() < health_check_threshold_value
305 )
307 @staticmethod
308 @internal_api_call
309 @provide_session
310 def _kill(job_id: str, session: Session = NEW_SESSION) -> Job | JobPydantic:
311 job = session.scalar(select(Job).where(Job.id == job_id).limit(1))
312 job.end_date = timezone.utcnow()
313 session.merge(job)
314 session.commit()
315 return job
317 @staticmethod
318 @internal_api_call
319 @provide_session
320 @retry_db_transaction
321 def _fetch_from_db(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic | None:
322 if isinstance(job, Job):
323 # not Internal API
324 session.merge(job)
325 return job
326 # Internal API,
327 return session.scalar(select(Job).where(Job.id == job.id).limit(1))
329 @staticmethod
330 @internal_api_call
331 @provide_session
332 def _add_to_db(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic:
333 if isinstance(job, JobPydantic):
334 orm_job = Job()
335 orm_job._merge_from(job)
336 else:
337 orm_job = job
338 session.add(orm_job)
339 session.commit()
340 return orm_job
342 @staticmethod
343 @internal_api_call
344 @provide_session
345 def _update_in_db(job: Job | JobPydantic, session: Session = NEW_SESSION):
346 if isinstance(job, Job):
347 # not Internal API
348 session.merge(job)
349 session.commit()
350 # Internal API.
351 orm_job: Job | None = session.scalar(select(Job).where(Job.id == job.id).limit(1))
352 if orm_job is None:
353 return
354 orm_job._merge_from(job)
355 session.merge(orm_job)
356 session.commit()
358 @staticmethod
359 @internal_api_call
360 @provide_session
361 @retry_db_transaction
362 def _update_heartbeat(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic:
363 orm_job: Job | None = session.scalar(select(Job).where(Job.id == job.id).limit(1))
364 if orm_job is None:
365 return job
366 orm_job.latest_heartbeat = timezone.utcnow()
367 session.merge(orm_job)
368 session.commit()
369 return orm_job
372@internal_api_call
373@provide_session
374def most_recent_job(job_type: str, session: Session = NEW_SESSION) -> Job | JobPydantic | None:
375 """
376 Return the most recent job of this type, if any, based on last heartbeat received.
378 Jobs in "running" state take precedence over others to make sure alive
379 job is returned if it is available.
381 :param job_type: job type to query for to get the most recent job for
382 :param session: Database session
383 """
384 return session.scalar(
385 select(Job)
386 .where(Job.job_type == job_type)
387 .order_by(
388 # Put "running" jobs at the front.
389 case({JobState.RUNNING: 0}, value=Job.state, else_=1),
390 Job.latest_heartbeat.desc(),
391 )
392 .limit(1)
393 )
396@provide_session
397def run_job(
398 job: Job, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION
399) -> int | None:
400 """
401 Run the job.
403 The Job is always an ORM object and setting the state is happening within the
404 same DB session and the session is kept open throughout the whole execution.
406 :meta private:
407 """
408 job.prepare_for_execution(session=session)
409 try:
410 return execute_job(job, execute_callable=execute_callable)
411 finally:
412 job.complete_execution(session=session)
415def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | None:
416 """
417 Execute the job.
419 Job execution requires no session as generally executing session does not require an
420 active database connection. The session might be temporary acquired and used if the job
421 runs heartbeat during execution, but this connection is only acquired for the time of heartbeat
422 and in case of AIP-44 implementation it happens over the Internal API rather than directly via
423 the database.
425 After the job is completed, state of the Job is updated and it should be updated in the database,
426 which happens in the "complete_execution" step (which again can be executed locally in case of
427 database operations or over the Internal API call.
429 :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does
430 not really matter, because except of running the heartbeat and state setting,
431 the runner should not modify the job state.
433 :param execute_callable: callable to execute when running the job.
435 :meta private:
436 """
437 ret = None
438 try:
439 ret = execute_callable()
440 # In case of max runs or max duration
441 job.state = JobState.SUCCESS
442 except SystemExit:
443 # In case of ^C or SIGTERM
444 job.state = JobState.SUCCESS
445 except Exception:
446 job.state = JobState.FAILED
447 raise
448 return ret
451def perform_heartbeat(
452 job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
453) -> None:
454 """
455 Perform heartbeat for the Job passed to it,optionally checking if it is necessary.
457 :param job: job to perform heartbeat for
458 :param heartbeat_callback: callback to run by the heartbeat
459 :param only_if_necessary: only heartbeat if it is necessary (i.e. if there are things to run for
460 triggerer for example)
461 """
462 seconds_remaining: float = 0.0
463 if job.latest_heartbeat and job.heartrate:
464 seconds_remaining = job.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds()
465 if seconds_remaining > 0 and only_if_necessary:
466 return
467 job.heartbeat(heartbeat_callback=heartbeat_callback)