Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/datasets/__init__.py: 37%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
18from __future__ import annotations
20import os
21import urllib.parse
22import warnings
23from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterable, Iterator
25import attr
27if TYPE_CHECKING:
28 from urllib.parse import SplitResult
31from airflow.configuration import conf
33__all__ = ["Dataset", "DatasetAll", "DatasetAny"]
36def normalize_noop(parts: SplitResult) -> SplitResult:
37 """Place-hold a :class:`~urllib.parse.SplitResult`` normalizer.
39 :meta private:
40 """
41 return parts
44def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | None:
45 if scheme == "file":
46 return normalize_noop
47 from airflow.providers_manager import ProvidersManager
49 return ProvidersManager().dataset_uri_handlers.get(scheme)
52def _sanitize_uri(uri: str) -> str:
53 """Sanitize a dataset URI.
55 This checks for URI validity, and normalizes the URI if needed. A fully
56 normalized URI is returned.
57 """
58 if not uri:
59 raise ValueError("Dataset URI cannot be empty")
60 if uri.isspace():
61 raise ValueError("Dataset URI cannot be just whitespace")
62 if not uri.isascii():
63 raise ValueError("Dataset URI must only consist of ASCII characters")
64 parsed = urllib.parse.urlsplit(uri)
65 if not parsed.scheme and not parsed.netloc: # Does not look like a URI.
66 return uri
67 normalized_scheme = parsed.scheme.lower()
68 if normalized_scheme.startswith("x-"):
69 return uri
70 if normalized_scheme == "airflow":
71 raise ValueError("Dataset scheme 'airflow' is reserved")
72 _, auth_exists, normalized_netloc = parsed.netloc.rpartition("@")
73 if auth_exists:
74 # TODO: Collect this into a DagWarning.
75 warnings.warn(
76 "A dataset URI should not contain auth info (e.g. username or "
77 "password). It has been automatically dropped.",
78 UserWarning,
79 stacklevel=3,
80 )
81 if parsed.query:
82 normalized_query = urllib.parse.urlencode(sorted(urllib.parse.parse_qsl(parsed.query)))
83 else:
84 normalized_query = ""
85 parsed = parsed._replace(
86 scheme=normalized_scheme,
87 netloc=normalized_netloc,
88 path=parsed.path.rstrip("/") or "/", # Remove all trailing slashes.
89 query=normalized_query,
90 fragment="", # Ignore any fragments.
91 )
92 if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None:
93 try:
94 parsed = normalizer(parsed)
95 except ValueError as exception:
96 if conf.getboolean("core", "strict_dataset_uri_validation", fallback=False):
97 raise
98 warnings.warn(
99 f"The dataset URI {uri} is not AIP-60 compliant: {exception}. "
100 f"In Airflow 3, this will raise an exception.",
101 UserWarning,
102 stacklevel=3,
103 )
104 return urllib.parse.urlunsplit(parsed)
107def coerce_to_uri(value: str | Dataset) -> str:
108 """Coerce a user input into a sanitized URI.
110 If the input value is a string, it is treated as a URI and sanitized. If the
111 input is a :class:`Dataset`, the URI it contains is considered sanitized and
112 returned directly.
114 :meta private:
115 """
116 if isinstance(value, Dataset):
117 return value.uri
118 return _sanitize_uri(str(value))
121class BaseDataset:
122 """Protocol for all dataset triggers to use in ``DAG(schedule=...)``.
124 :meta private:
125 """
127 def __or__(self, other: BaseDataset) -> DatasetAny:
128 if not isinstance(other, BaseDataset):
129 return NotImplemented
130 return DatasetAny(self, other)
132 def __and__(self, other: BaseDataset) -> DatasetAll:
133 if not isinstance(other, BaseDataset):
134 return NotImplemented
135 return DatasetAll(self, other)
137 def as_expression(self) -> Any:
138 """Serialize the dataset into its scheduling expression.
140 The return value is stored in DagModel for display purposes. It must be
141 JSON-compatible.
143 :meta private:
144 """
145 raise NotImplementedError
147 def evaluate(self, statuses: dict[str, bool]) -> bool:
148 raise NotImplementedError
150 def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
151 raise NotImplementedError
154@attr.define()
155class Dataset(os.PathLike, BaseDataset):
156 """A representation of data dependencies between workflows."""
158 uri: str = attr.field(
159 converter=_sanitize_uri,
160 validator=[attr.validators.min_len(1), attr.validators.max_len(3000)],
161 )
162 extra: dict[str, Any] | None = None
164 __version__: ClassVar[int] = 1
166 def __fspath__(self) -> str:
167 return self.uri
169 def __eq__(self, other: Any) -> bool:
170 if isinstance(other, self.__class__):
171 return self.uri == other.uri
172 return NotImplemented
174 def __hash__(self) -> int:
175 return hash(self.uri)
177 def as_expression(self) -> Any:
178 """Serialize the dataset into its scheduling expression.
180 :meta private:
181 """
182 return self.uri
184 def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
185 yield self.uri, self
187 def evaluate(self, statuses: dict[str, bool]) -> bool:
188 return statuses.get(self.uri, False)
191class _DatasetBooleanCondition(BaseDataset):
192 """Base class for dataset boolean logic."""
194 agg_func: Callable[[Iterable], bool]
196 def __init__(self, *objects: BaseDataset) -> None:
197 if not all(isinstance(o, BaseDataset) for o in objects):
198 raise TypeError("expect dataset expressions in condition")
199 self.objects = objects
201 def evaluate(self, statuses: dict[str, bool]) -> bool:
202 return self.agg_func(x.evaluate(statuses=statuses) for x in self.objects)
204 def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
205 seen = set() # We want to keep the first instance.
206 for o in self.objects:
207 for k, v in o.iter_datasets():
208 if k in seen:
209 continue
210 yield k, v
211 seen.add(k)
214class DatasetAny(_DatasetBooleanCondition):
215 """Use to combine datasets schedule references in an "and" relationship."""
217 agg_func = any
219 def __or__(self, other: BaseDataset) -> DatasetAny:
220 if not isinstance(other, BaseDataset):
221 return NotImplemented
222 # Optimization: X | (Y | Z) is equivalent to X | Y | Z.
223 return DatasetAny(*self.objects, other)
225 def __repr__(self) -> str:
226 return f"DatasetAny({', '.join(map(str, self.objects))})"
228 def as_expression(self) -> dict[str, Any]:
229 """Serialize the dataset into its scheduling expression.
231 :meta private:
232 """
233 return {"any": [o.as_expression() for o in self.objects]}
236class DatasetAll(_DatasetBooleanCondition):
237 """Use to combine datasets schedule references in an "or" relationship."""
239 agg_func = all
241 def __and__(self, other: BaseDataset) -> DatasetAll:
242 if not isinstance(other, BaseDataset):
243 return NotImplemented
244 # Optimization: X & (Y & Z) is equivalent to X & Y & Z.
245 return DatasetAll(*self.objects, other)
247 def __repr__(self) -> str:
248 return f"DatasetAll({', '.join(map(str, self.objects))})"
250 def as_expression(self) -> Any:
251 """Serialize the dataset into its scheduling expression.
253 :meta private:
254 """
255 return {"all": [o.as_expression() for o in self.objects]}