Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/models/taskmap.py: 65%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Table to store information about mapped task instances (AIP-42)."""
20from __future__ import annotations
22import collections.abc
23import enum
24from typing import TYPE_CHECKING, Any, Collection
26from sqlalchemy import CheckConstraint, Column, ForeignKeyConstraint, Integer, String
28from airflow.models.base import COLLATION_ARGS, ID_LEN, TaskInstanceDependencies
29from airflow.utils.sqlalchemy import ExtendedJSON
31if TYPE_CHECKING:
32 from airflow.models.taskinstance import TaskInstance
33 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
36class TaskMapVariant(enum.Enum):
37 """Task map variant.
39 Possible values are **dict** (for a key-value mapping) and **list** (for an
40 ordered value sequence).
41 """
43 DICT = "dict"
44 LIST = "list"
47class TaskMap(TaskInstanceDependencies):
48 """Model to track dynamic task-mapping information.
50 This is currently only populated by an upstream TaskInstance pushing an
51 XCom that's pulled by a downstream for mapping purposes.
52 """
54 __tablename__ = "task_map"
56 # Link to upstream TaskInstance creating this dynamic mapping information.
57 dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
58 task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
59 run_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
60 map_index = Column(Integer, primary_key=True)
62 length = Column(Integer, nullable=False)
63 keys = Column(ExtendedJSON, nullable=True)
65 __table_args__ = (
66 CheckConstraint(length >= 0, name="task_map_length_not_negative"),
67 ForeignKeyConstraint(
68 [dag_id, task_id, run_id, map_index],
69 [
70 "task_instance.dag_id",
71 "task_instance.task_id",
72 "task_instance.run_id",
73 "task_instance.map_index",
74 ],
75 name="task_map_task_instance_fkey",
76 ondelete="CASCADE",
77 onupdate="CASCADE",
78 ),
79 )
81 def __init__(
82 self,
83 dag_id: str,
84 task_id: str,
85 run_id: str,
86 map_index: int,
87 length: int,
88 keys: list[Any] | None,
89 ) -> None:
90 self.dag_id = dag_id
91 self.task_id = task_id
92 self.run_id = run_id
93 self.map_index = map_index
94 self.length = length
95 self.keys = keys
97 @classmethod
98 def from_task_instance_xcom(cls, ti: TaskInstance | TaskInstancePydantic, value: Collection) -> TaskMap:
99 if ti.run_id is None:
100 raise ValueError("cannot record task map for unrun task instance")
101 return cls(
102 dag_id=ti.dag_id,
103 task_id=ti.task_id,
104 run_id=ti.run_id,
105 map_index=ti.map_index,
106 length=len(value),
107 keys=(list(value) if isinstance(value, collections.abc.Mapping) else None),
108 )
110 @property
111 def variant(self) -> TaskMapVariant:
112 if self.keys is None:
113 return TaskMapVariant.LIST
114 return TaskMapVariant.DICT