Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/serialization/pydantic/dag_run.py: 77%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19from datetime import datetime
20from typing import TYPE_CHECKING, Iterable, List, Optional
22from airflow.serialization.pydantic.dag import PydanticDag
23from airflow.serialization.pydantic.dataset import DatasetEventPydantic
24from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict, is_pydantic_2_installed
25from airflow.utils.session import NEW_SESSION, provide_session
27if TYPE_CHECKING:
28 from sqlalchemy.orm import Session
30 from airflow.jobs.scheduler_job_runner import TI
31 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
32 from airflow.utils.state import TaskInstanceState
35class DagRunPydantic(BaseModelPydantic):
36 """Serializable representation of the DagRun ORM SqlAlchemyModel used by internal API."""
38 id: int
39 dag_id: str
40 queued_at: Optional[datetime]
41 execution_date: datetime
42 start_date: Optional[datetime]
43 end_date: Optional[datetime]
44 state: str
45 run_id: str
46 creating_job_id: Optional[int]
47 external_trigger: bool
48 run_type: str
49 conf: dict
50 data_interval_start: Optional[datetime]
51 data_interval_end: Optional[datetime]
52 last_scheduling_decision: Optional[datetime]
53 dag_hash: Optional[str]
54 updated_at: Optional[datetime]
55 dag: Optional[PydanticDag]
56 consumed_dataset_events: List[DatasetEventPydantic] # noqa: UP006
57 log_template_id: Optional[int]
59 model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
61 @property
62 def logical_date(self) -> datetime:
63 return self.execution_date
65 @provide_session
66 def get_task_instances(
67 self,
68 state: Iterable[TaskInstanceState | None] | None = None,
69 session: Session = NEW_SESSION,
70 ) -> list[TI]:
71 """
72 Return the task instances for this dag run.
74 TODO: make it works for AIP-44
75 """
76 raise NotImplementedError()
78 def get_task_instance(
79 self,
80 task_id: str,
81 session: Session,
82 *,
83 map_index: int = -1,
84 ) -> TI | TaskInstancePydantic | None:
85 """
86 Return the task instance specified by task_id for this dag run.
88 :param task_id: the task id
89 :param session: Sqlalchemy ORM Session
90 """
91 from airflow.models.dagrun import DagRun
93 return DagRun.fetch_task_instance(
94 dag_id=self.dag_id,
95 dag_run_id=self.run_id,
96 task_id=task_id,
97 session=session,
98 map_index=map_index,
99 )
101 def get_log_template(self, session: Session):
102 from airflow.models.dagrun import DagRun
104 return DagRun._get_log_template(log_template_id=self.log_template_id)
107if is_pydantic_2_installed():
108 DagRunPydantic.model_rebuild()