Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/serialization/pydantic/dag_run.py: 77%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

48 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from datetime import datetime 

20from typing import TYPE_CHECKING, Iterable, List, Optional 

21 

22from airflow.serialization.pydantic.dag import PydanticDag 

23from airflow.serialization.pydantic.dataset import DatasetEventPydantic 

24from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict, is_pydantic_2_installed 

25from airflow.utils.session import NEW_SESSION, provide_session 

26 

27if TYPE_CHECKING: 

28 from sqlalchemy.orm import Session 

29 

30 from airflow.jobs.scheduler_job_runner import TI 

31 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic 

32 from airflow.utils.state import TaskInstanceState 

33 

34 

35class DagRunPydantic(BaseModelPydantic): 

36 """Serializable representation of the DagRun ORM SqlAlchemyModel used by internal API.""" 

37 

38 id: int 

39 dag_id: str 

40 queued_at: Optional[datetime] 

41 execution_date: datetime 

42 start_date: Optional[datetime] 

43 end_date: Optional[datetime] 

44 state: str 

45 run_id: str 

46 creating_job_id: Optional[int] 

47 external_trigger: bool 

48 run_type: str 

49 conf: dict 

50 data_interval_start: Optional[datetime] 

51 data_interval_end: Optional[datetime] 

52 last_scheduling_decision: Optional[datetime] 

53 dag_hash: Optional[str] 

54 updated_at: Optional[datetime] 

55 dag: Optional[PydanticDag] 

56 consumed_dataset_events: List[DatasetEventPydantic] # noqa: UP006 

57 log_template_id: Optional[int] 

58 

59 model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) 

60 

61 @property 

62 def logical_date(self) -> datetime: 

63 return self.execution_date 

64 

65 @provide_session 

66 def get_task_instances( 

67 self, 

68 state: Iterable[TaskInstanceState | None] | None = None, 

69 session: Session = NEW_SESSION, 

70 ) -> list[TI]: 

71 """ 

72 Return the task instances for this dag run. 

73 

74 TODO: make it works for AIP-44 

75 """ 

76 raise NotImplementedError() 

77 

78 def get_task_instance( 

79 self, 

80 task_id: str, 

81 session: Session, 

82 *, 

83 map_index: int = -1, 

84 ) -> TI | TaskInstancePydantic | None: 

85 """ 

86 Return the task instance specified by task_id for this dag run. 

87 

88 :param task_id: the task id 

89 :param session: Sqlalchemy ORM Session 

90 """ 

91 from airflow.models.dagrun import DagRun 

92 

93 return DagRun.fetch_task_instance( 

94 dag_id=self.dag_id, 

95 dag_run_id=self.run_id, 

96 task_id=task_id, 

97 session=session, 

98 map_index=map_index, 

99 ) 

100 

101 def get_log_template(self, session: Session): 

102 from airflow.models.dagrun import DagRun 

103 

104 return DagRun._get_log_template(log_template_id=self.log_template_id) 

105 

106 

107if is_pydantic_2_installed(): 

108 DagRunPydantic.model_rebuild()