Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/serialization/pydantic/job.py: 71%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17import datetime
18from functools import cached_property
19from typing import TYPE_CHECKING, Optional
21from airflow.executors.executor_loader import ExecutorLoader
22from airflow.jobs.base_job_runner import BaseJobRunner
23from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict
26def check_runner_initialized(job_runner: Optional[BaseJobRunner], job_type: str) -> BaseJobRunner:
27 if job_runner is None:
28 raise ValueError(f"In order to run {job_type} you need to initialize the {job_type}Runner first.")
29 return job_runner
32class JobPydantic(BaseModelPydantic):
33 """Serializable representation of the Job ORM SqlAlchemyModel used by internal API."""
35 id: Optional[int]
36 dag_id: Optional[str]
37 state: Optional[str]
38 job_type: Optional[str]
39 start_date: Optional[datetime.datetime]
40 end_date: Optional[datetime.datetime]
41 latest_heartbeat: datetime.datetime
42 executor_class: Optional[str]
43 hostname: Optional[str]
44 unixname: Optional[str]
45 grace_multiplier: float = 2.1
47 model_config = ConfigDict(from_attributes=True)
49 @cached_property
50 def executor(self):
51 return ExecutorLoader.get_default_executor()
53 @cached_property
54 def heartrate(self) -> float:
55 from airflow.jobs.job import Job
57 if TYPE_CHECKING:
58 assert self.job_type is not None
59 return Job._heartrate(self.job_type)
61 def is_alive(self) -> bool:
62 """Is this job currently alive."""
63 from airflow.jobs.job import Job, health_check_threshold
65 return Job._is_alive(
66 state=self.state,
67 health_check_threshold_value=health_check_threshold(self.job_type, self.heartrate),
68 latest_heartbeat=self.latest_heartbeat,
69 )