Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/serialization/pydantic/job.py: 71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

35 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17import datetime 

18from functools import cached_property 

19from typing import TYPE_CHECKING, Optional 

20 

21from airflow.executors.executor_loader import ExecutorLoader 

22from airflow.jobs.base_job_runner import BaseJobRunner 

23from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict 

24 

25 

26def check_runner_initialized(job_runner: Optional[BaseJobRunner], job_type: str) -> BaseJobRunner: 

27 if job_runner is None: 

28 raise ValueError(f"In order to run {job_type} you need to initialize the {job_type}Runner first.") 

29 return job_runner 

30 

31 

32class JobPydantic(BaseModelPydantic): 

33 """Serializable representation of the Job ORM SqlAlchemyModel used by internal API.""" 

34 

35 id: Optional[int] 

36 dag_id: Optional[str] 

37 state: Optional[str] 

38 job_type: Optional[str] 

39 start_date: Optional[datetime.datetime] 

40 end_date: Optional[datetime.datetime] 

41 latest_heartbeat: datetime.datetime 

42 executor_class: Optional[str] 

43 hostname: Optional[str] 

44 unixname: Optional[str] 

45 grace_multiplier: float = 2.1 

46 

47 model_config = ConfigDict(from_attributes=True) 

48 

49 @cached_property 

50 def executor(self): 

51 return ExecutorLoader.get_default_executor() 

52 

53 @cached_property 

54 def heartrate(self) -> float: 

55 from airflow.jobs.job import Job 

56 

57 if TYPE_CHECKING: 

58 assert self.job_type is not None 

59 return Job._heartrate(self.job_type) 

60 

61 def is_alive(self) -> bool: 

62 """Is this job currently alive.""" 

63 from airflow.jobs.job import Job, health_check_threshold 

64 

65 return Job._is_alive( 

66 state=self.state, 

67 health_check_threshold_value=health_check_threshold(self.job_type, self.heartrate), 

68 latest_heartbeat=self.latest_heartbeat, 

69 )