Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/jobs/job.py: 41%
148 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from functools import cached_property
21from time import sleep
22from typing import Callable, NoReturn
24from sqlalchemy import Column, Index, Integer, String, case, select
25from sqlalchemy.exc import OperationalError
26from sqlalchemy.orm import backref, foreign, relationship
27from sqlalchemy.orm.session import Session, make_transient
29from airflow.configuration import conf
30from airflow.exceptions import AirflowException
31from airflow.executors.executor_loader import ExecutorLoader
32from airflow.listeners.listener import get_listener_manager
33from airflow.models.base import ID_LEN, Base
34from airflow.serialization.pydantic.job import JobPydantic
35from airflow.stats import Stats
36from airflow.utils import timezone
37from airflow.utils.helpers import convert_camel_to_snake
38from airflow.utils.log.logging_mixin import LoggingMixin
39from airflow.utils.net import get_hostname
40from airflow.utils.platform import getuser
41from airflow.utils.session import NEW_SESSION, create_session, provide_session
42from airflow.utils.sqlalchemy import UtcDateTime
43from airflow.utils.state import State
46def _resolve_dagrun_model():
47 from airflow.models.dagrun import DagRun
49 return DagRun
52class Job(Base, LoggingMixin):
53 """
54 The ORM class representing Job stored in the database.
56 Jobs are processing items with state and duration that aren't task instances.
57 For instance a BackfillJob is a collection of task instance runs,
58 but should have its own state, start and end time.
59 """
61 __tablename__ = "job"
63 id = Column(Integer, primary_key=True)
64 dag_id = Column(
65 String(ID_LEN),
66 )
67 state = Column(String(20))
68 job_type = Column(String(30))
69 start_date = Column(UtcDateTime())
70 end_date = Column(UtcDateTime())
71 latest_heartbeat = Column(UtcDateTime())
72 executor_class = Column(String(500))
73 hostname = Column(String(500))
74 unixname = Column(String(1000))
76 __table_args__ = (
77 Index("job_type_heart", job_type, latest_heartbeat),
78 Index("idx_job_state_heartbeat", state, latest_heartbeat),
79 Index("idx_job_dag_id", dag_id),
80 )
82 task_instances_enqueued = relationship(
83 "TaskInstance",
84 primaryjoin="Job.id == foreign(TaskInstance.queued_by_job_id)",
85 backref=backref("queued_by_job", uselist=False),
86 )
88 dag_runs = relationship(
89 "DagRun",
90 primaryjoin=lambda: Job.id == foreign(_resolve_dagrun_model().creating_job_id),
91 backref="creating_job",
92 )
94 """
95 TaskInstances which have been enqueued by this Job.
97 Only makes sense for SchedulerJob and BackfillJob instances.
98 """
100 heartrate = conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC")
102 def __init__(self, executor=None, heartrate=None, **kwargs):
103 # Save init parameters as DB fields
104 self.hostname = get_hostname()
105 if executor:
106 self.executor = executor
107 self.executor_class = executor.__class__.__name__
108 else:
109 self.executor_class = conf.get("core", "EXECUTOR")
110 self.start_date = timezone.utcnow()
111 self.latest_heartbeat = timezone.utcnow()
112 if heartrate is not None:
113 self.heartrate = heartrate
114 self.unixname = getuser()
115 self.max_tis_per_query: int = conf.getint("scheduler", "max_tis_per_query")
116 get_listener_manager().hook.on_starting(component=self)
117 super().__init__(**kwargs)
119 @cached_property
120 def executor(self):
121 return ExecutorLoader.get_default_executor()
123 def is_alive(self, grace_multiplier=2.1):
124 """
125 Is this job currently alive.
127 We define alive as in a state of RUNNING, and having sent a heartbeat
128 within a multiple of the heartrate (default of 2.1)
130 :param grace_multiplier: multiplier of heartrate to require heart beat
131 within
132 """
133 if self.job_type == "SchedulerJob":
134 health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
135 else:
136 health_check_threshold: int = self.heartrate * grace_multiplier
137 return (
138 self.state == State.RUNNING
139 and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < health_check_threshold
140 )
142 @provide_session
143 def kill(self, session: Session = NEW_SESSION) -> NoReturn:
144 """Handles on_kill callback and updates state in database."""
145 job = session.scalar(select(Job).where(Job.id == self.id).limit(1))
146 job.end_date = timezone.utcnow()
147 try:
148 self.on_kill()
149 except Exception as e:
150 self.log.error("on_kill() method failed: %s", str(e))
151 session.merge(job)
152 session.commit()
153 raise AirflowException("Job shut down externally.")
155 def on_kill(self):
156 """Will be called when an external kill command is received."""
158 @provide_session
159 def heartbeat(
160 self, heartbeat_callback: Callable[[Session], None], session: Session = NEW_SESSION
161 ) -> None:
162 """
163 Heartbeats update the job's entry in the database with a timestamp
164 for the latest_heartbeat and allows for the job to be killed
165 externally. This allows at the system level to monitor what is
166 actually active.
168 For instance, an old heartbeat for SchedulerJob would mean something
169 is wrong.
171 This also allows for any job to be killed externally, regardless
172 of who is running it or on which machine it is running.
174 Note that if your heart rate is set to 60 seconds and you call this
175 method after 10 seconds of processing since the last heartbeat, it
176 will sleep 50 seconds to complete the 60 seconds and keep a steady
177 heart rate. If you go over 60 seconds before calling it, it won't
178 sleep at all.
180 :param heartbeat_callback: Callback that will be run when the heartbeat is recorded in the Job
181 :param session to use for saving the job
182 """
183 previous_heartbeat = self.latest_heartbeat
185 try:
186 # This will cause it to load from the db
187 session.merge(self)
188 previous_heartbeat = self.latest_heartbeat
190 if self.state in State.terminating_states:
191 # TODO: Make sure it is AIP-44 compliant
192 self.kill()
194 # Figure out how long to sleep for
195 sleep_for = 0
196 if self.latest_heartbeat:
197 seconds_remaining = (
198 self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
199 )
200 sleep_for = max(0, seconds_remaining)
201 sleep(sleep_for)
203 # Update last heartbeat time
204 with create_session() as session:
205 # Make the session aware of this object
206 session.merge(self)
207 self.latest_heartbeat = timezone.utcnow()
208 session.commit()
209 # At this point, the DB has updated.
210 previous_heartbeat = self.latest_heartbeat
212 heartbeat_callback(session)
213 self.log.debug("[heartbeat]")
214 except OperationalError:
215 Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
216 self.log.exception("%s heartbeat got an exception", self.__class__.__name__)
217 # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
218 self.latest_heartbeat = previous_heartbeat
220 @provide_session
221 def prepare_for_execution(self, session: Session = NEW_SESSION):
222 """Prepares the job for execution."""
223 Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
224 self.state = State.RUNNING
225 self.start_date = timezone.utcnow()
226 session.add(self)
227 session.commit()
228 make_transient(self)
230 @provide_session
231 def complete_execution(self, session: Session = NEW_SESSION):
232 get_listener_manager().hook.before_stopping(component=self)
233 self.end_date = timezone.utcnow()
234 session.merge(self)
235 session.commit()
236 Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1)
238 @provide_session
239 def most_recent_job(self, session: Session = NEW_SESSION) -> Job | None:
240 """Returns the most recent job of this type, if any, based on last heartbeat received."""
241 return most_recent_job(self.job_type, session=session)
244@provide_session
245def most_recent_job(job_type: str, session: Session = NEW_SESSION) -> Job | None:
246 """
247 Return the most recent job of this type, if any, based on last heartbeat received.
249 Jobs in "running" state take precedence over others to make sure alive
250 job is returned if it is available.
252 :param job_type: job type to query for to get the most recent job for
253 :param session: Database session
254 """
255 return session.scalar(
256 select(Job)
257 .where(Job.job_type == job_type)
258 .order_by(
259 # Put "running" jobs at the front.
260 case({State.RUNNING: 0}, value=Job.state, else_=1),
261 Job.latest_heartbeat.desc(),
262 )
263 .limit(1)
264 )
267@provide_session
268def run_job(
269 job: Job | JobPydantic, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION
270) -> int | None:
271 """
272 Runs the job.
274 The Job is always an ORM object and setting the state is happening within the
275 same DB session and the session is kept open throughout the whole execution.
277 :meta private:
279 TODO: Maybe we should not keep the session during job execution ?.
280 """
281 # The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it
282 # once final AIP-44 changes are completed.
283 assert not isinstance(job, JobPydantic), "Job should be ORM object not Pydantic one here (AIP-44 WIP)"
284 job.prepare_for_execution(session=session)
285 try:
286 return execute_job(job, execute_callable=execute_callable)
287 finally:
288 job.complete_execution(session=session)
291def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | None]) -> int | None:
292 """
293 Executes the job.
295 Job execution requires no session as generally executing session does not require an
296 active database connection. The session might be temporary acquired and used if the job
297 runs heartbeat during execution, but this connection is only acquired for the time of heartbeat
298 and in case of AIP-44 implementation it happens over the Internal API rather than directly via
299 the database.
301 After the job is completed, state of the Job is updated and it should be updated in the database,
302 which happens in the "complete_execution" step (which again can be executed locally in case of
303 database operations or over the Internal API call.
305 :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does
306 not really matter, because except of running the heartbeat and state setting,
307 the runner should not modify the job state.
309 :param execute_callable: callable to execute when running the job.
311 :meta private:
312 """
313 ret = None
314 try:
315 ret = execute_callable()
316 # In case of max runs or max duration
317 job.state = State.SUCCESS
318 except SystemExit:
319 # In case of ^C or SIGTERM
320 job.state = State.SUCCESS
321 except Exception:
322 job.state = State.FAILED
323 raise
324 return ret
327def perform_heartbeat(
328 job: Job | JobPydantic, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
329) -> None:
330 """
331 Performs heartbeat for the Job passed to it,optionally checking if it is necessary.
333 :param job: job to perform heartbeat for
334 :param heartbeat_callback: callback to run by the heartbeat
335 :param only_if_necessary: only heartbeat if it is necessary (i.e. if there are things to run for
336 triggerer for example)
337 """
338 # The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it
339 # once final AIP-44 changes are completed.
340 assert not isinstance(job, JobPydantic), "Job should be ORM object not Pydantic one here (AIP-44 WIP)"
341 seconds_remaining: float = 0.0
342 if job.latest_heartbeat and job.heartrate:
343 seconds_remaining = job.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds()
344 if seconds_remaining > 0 and only_if_necessary:
345 return
346 with create_session() as session:
347 job.heartbeat(heartbeat_callback=heartbeat_callback, session=session)