1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import json
21import logging
22import operator
23import os
24import urllib.parse
25import warnings
26from collections.abc import Callable
27from typing import TYPE_CHECKING, Any, ClassVar, Literal, overload
28
29import attrs
30
31from airflow.sdk.api.datamodels._generated import AssetProfile
32from airflow.serialization.dag_dependency import DagDependency
33
34if TYPE_CHECKING:
35 from collections.abc import Iterable, Iterator
36 from urllib.parse import SplitResult
37
38 from pydantic.types import JsonValue
39
40 from airflow.models.asset import AssetModel
41 from airflow.sdk.io.path import ObjectStoragePath
42 from airflow.serialization.definitions.assets import SerializedAssetWatcher
43 from airflow.triggers.base import BaseEventTrigger
44
45 AttrsInstance = attrs.AttrsInstance
46else:
47 AttrsInstance = object
48
49
50__all__ = [
51 "Asset",
52 "Dataset",
53 "Model",
54 "AssetAlias",
55 "AssetAll",
56 "AssetAny",
57 "AssetNameRef",
58 "AssetRef",
59 "AssetUriRef",
60 "AssetWatcher",
61]
62
63from airflow.sdk.configuration import conf
64
65log = logging.getLogger(__name__)
66
67
68SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE")
69
70
71@attrs.define(frozen=True)
72class AssetUniqueKey(attrs.AttrsInstance):
73 """
74 Columns to identify an unique asset.
75
76 :meta private:
77 """
78
79 name: str
80 uri: str
81
82 @staticmethod
83 def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
84 return AssetUniqueKey(name=asset.name, uri=asset.uri)
85
86 def to_asset(self) -> Asset:
87 return Asset(name=self.name, uri=self.uri)
88
89 @staticmethod
90 def from_str(key: str) -> AssetUniqueKey:
91 return AssetUniqueKey(**json.loads(key))
92
93 def to_str(self) -> str:
94 return json.dumps(attrs.asdict(self))
95
96 @staticmethod
97 def from_profile(profile: AssetProfile) -> AssetUniqueKey:
98 if profile.name and profile.uri:
99 return AssetUniqueKey(name=profile.name, uri=profile.uri)
100
101 if name := profile.name:
102 return AssetUniqueKey(name=name, uri=name)
103 if uri := profile.uri:
104 return AssetUniqueKey(name=uri, uri=uri)
105
106 raise ValueError("name and uri cannot both be empty")
107
108
109@attrs.define(frozen=True)
110class AssetAliasUniqueKey:
111 """
112 Columns to identify an unique asset alias.
113
114 :meta private:
115 """
116
117 name: str
118
119 @staticmethod
120 def from_asset_alias(asset_alias: AssetAlias) -> AssetAliasUniqueKey:
121 return AssetAliasUniqueKey(name=asset_alias.name)
122
123 def to_asset_alias(self) -> AssetAlias:
124 return AssetAlias(name=self.name)
125
126
127BaseAssetUniqueKey = AssetUniqueKey | AssetAliasUniqueKey
128
129
130def normalize_noop(parts: SplitResult) -> SplitResult:
131 """
132 Place-hold a :class:`~urllib.parse.SplitResult`` normalizer.
133
134 :meta private:
135 """
136 return parts
137
138
139def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | None:
140 if scheme == "file":
141 return normalize_noop
142 from airflow.providers_manager import ProvidersManager
143
144 return ProvidersManager().asset_uri_handlers.get(scheme)
145
146
147def _get_normalized_scheme(uri: str) -> str:
148 parsed = urllib.parse.urlsplit(uri)
149 return parsed.scheme.lower()
150
151
152def _sanitize_uri(inp: str | ObjectStoragePath) -> str:
153 """
154 Sanitize an asset URI.
155
156 This checks for URI validity, and normalizes the URI if needed. A fully
157 normalized URI is returned.
158 """
159 uri = str(inp)
160 parsed = urllib.parse.urlsplit(uri)
161 if not parsed.scheme and not parsed.netloc: # Does not look like a URI.
162 return uri
163 if not (normalized_scheme := _get_normalized_scheme(uri)):
164 return uri
165 if normalized_scheme.startswith("x-"):
166 return uri
167 if normalized_scheme == "airflow":
168 raise ValueError("Asset scheme 'airflow' is reserved")
169 if parsed.password:
170 # TODO: Collect this into a DagWarning.
171 warnings.warn(
172 "An Asset URI should not contain a password. User info has been automatically dropped.",
173 UserWarning,
174 stacklevel=3,
175 )
176 _, _, normalized_netloc = parsed.netloc.rpartition("@")
177 else:
178 normalized_netloc = parsed.netloc
179 if parsed.query:
180 normalized_query = urllib.parse.urlencode(sorted(urllib.parse.parse_qsl(parsed.query)))
181 else:
182 normalized_query = ""
183 parsed = parsed._replace(
184 scheme=normalized_scheme,
185 netloc=normalized_netloc,
186 path=parsed.path.rstrip("/") or "/", # Remove all trailing slashes.
187 query=normalized_query,
188 fragment="", # Ignore any fragments.
189 )
190 if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None:
191 parsed = normalizer(parsed)
192 return urllib.parse.urlunsplit(parsed)
193
194
195def _validate_identifier(instance, attribute, value):
196 if not isinstance(value, str):
197 raise ValueError(f"{type(instance).__name__} {attribute.name} must be a string")
198 if len(value) > 1500:
199 raise ValueError(f"{type(instance).__name__} {attribute.name} cannot exceed 1500 characters")
200 if value.isspace():
201 raise ValueError(f"{type(instance).__name__} {attribute.name} cannot be just whitespace")
202 # We use latin1_general_cs to store the name (and group, asset values etc.) on MySQL.
203 # relaxing this check for non mysql backend
204 if SQL_ALCHEMY_CONN.startswith("mysql") and not value.isascii():
205 raise ValueError(f"{type(instance).__name__} {attribute.name} must only consist of ASCII characters")
206 return value
207
208
209def _validate_non_empty_identifier(instance, attribute, value):
210 if not _validate_identifier(instance, attribute, value):
211 raise ValueError(f"{type(instance).__name__} {attribute.name} cannot be empty")
212 return value
213
214
215def _validate_asset_name(instance, attribute, value):
216 _validate_non_empty_identifier(instance, attribute, value)
217 if value == "self" or value == "context":
218 raise ValueError(f"prohibited name for asset: {value}")
219 return value
220
221
222def _set_extra_default(extra: dict[str, JsonValue] | None) -> dict:
223 """
224 Automatically convert None to an empty dict.
225
226 This allows the caller site to continue doing ``Asset(uri, extra=None)``,
227 but still allow the ``extra`` attribute to always be a dict.
228 """
229 if extra is None:
230 return {}
231 return extra
232
233
234class BaseAsset:
235 """
236 Protocol for all asset triggers to use in ``DAG(schedule=...)``.
237
238 :meta private:
239 """
240
241 def __bool__(self) -> bool:
242 return True
243
244 def __or__(self, other: BaseAsset) -> BaseAsset:
245 if not isinstance(other, BaseAsset):
246 return NotImplemented
247 return AssetAny(self, other)
248
249 def __and__(self, other: BaseAsset) -> BaseAsset:
250 if not isinstance(other, BaseAsset):
251 return NotImplemented
252 return AssetAll(self, other)
253
254 def as_expression(self) -> Any:
255 """
256 Serialize the asset into its scheduling expression.
257
258 The return value is stored in DagModel for display purposes. It must be
259 JSON-compatible.
260
261 :meta private:
262 """
263 raise NotImplementedError
264
265 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]:
266 raise NotImplementedError
267
268 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
269 raise NotImplementedError
270
271 def iter_asset_refs(self) -> Iterator[AssetRef]:
272 raise NotImplementedError
273
274 def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]:
275 """
276 Iterate a base asset as dag dependency.
277
278 :meta private:
279 """
280 raise NotImplementedError
281
282
283@attrs.define(init=False)
284class AssetWatcher:
285 """A representation of an asset watcher. The name uniquely identifies the watch."""
286
287 name: str
288 # This attribute serves double purpose.
289 # For a "normal" asset instance loaded from Dag, this holds the trigger used to monitor an external
290 # resource. In that case, ``AssetWatcher`` is used directly by users.
291 # For an asset recreated from a serialized Dag, this holds the serialized data of the trigger. In that
292 # case, `SerializedAssetWatcher` is used. We need to keep the two types to make mypy happy because
293 # `SerializedAssetWatcher` is a subclass of `AssetWatcher`.
294 trigger: BaseEventTrigger | dict
295
296 def __init__(
297 self,
298 name: str,
299 trigger: BaseEventTrigger | dict,
300 ) -> None:
301 from airflow.triggers.base import BaseEventTrigger, BaseTrigger
302
303 if isinstance(trigger, BaseTrigger) and not isinstance(trigger, BaseEventTrigger):
304 raise ValueError("The trigger used to watch an asset must inherit ``BaseEventTrigger``")
305
306 self.name = name
307 self.trigger = trigger
308
309
310@attrs.define(init=False, unsafe_hash=False)
311class Asset(os.PathLike, BaseAsset):
312 """A representation of data asset dependencies between workflows."""
313
314 name: str = attrs.field(
315 validator=[_validate_asset_name],
316 )
317 uri: str = attrs.field(
318 validator=[_validate_non_empty_identifier],
319 converter=_sanitize_uri,
320 )
321 group: str = attrs.field(
322 default=attrs.Factory(operator.attrgetter("asset_type"), takes_self=True),
323 validator=[_validate_identifier],
324 )
325 extra: dict[str, JsonValue] = attrs.field(
326 factory=dict,
327 converter=_set_extra_default,
328 )
329 watchers: list[AssetWatcher | SerializedAssetWatcher] = attrs.field(
330 factory=list,
331 )
332
333 asset_type: ClassVar[str] = "asset"
334 __version__: ClassVar[int] = 1
335
336 @overload
337 def __init__(
338 self,
339 name: str,
340 uri: str | ObjectStoragePath,
341 *,
342 group: str = ...,
343 extra: dict[str, JsonValue] | None = None,
344 watchers: list[AssetWatcher | SerializedAssetWatcher] = ...,
345 ) -> None:
346 """Canonical; both name and uri are provided."""
347
348 @overload
349 def __init__(
350 self,
351 name: str,
352 *,
353 group: str = ...,
354 extra: dict[str, JsonValue] | None = None,
355 watchers: list[AssetWatcher | SerializedAssetWatcher] = ...,
356 ) -> None:
357 """It's possible to only provide the name, either by keyword or as the only positional argument."""
358
359 @overload
360 def __init__(
361 self,
362 *,
363 uri: str | ObjectStoragePath,
364 group: str = ...,
365 extra: dict[str, JsonValue] | None = None,
366 watchers: list[AssetWatcher | SerializedAssetWatcher] = ...,
367 ) -> None:
368 """It's possible to only provide the URI as a keyword argument."""
369
370 def __init__(
371 self,
372 name: str | None = None,
373 uri: str | ObjectStoragePath | None = None,
374 *,
375 group: str | None = None,
376 extra: dict[str, JsonValue] | None = None,
377 watchers: list[AssetWatcher | SerializedAssetWatcher] | None = None,
378 ) -> None:
379 if name is None and uri is None:
380 raise TypeError("Asset() requires either 'name' or 'uri'")
381 if name is None:
382 name = str(uri)
383 elif uri is None:
384 uri = name
385
386 if TYPE_CHECKING:
387 assert name is not None
388 assert uri is not None
389
390 # attrs default (and factory) does not kick in if any value is given to
391 # the argument. We need to exclude defaults from the custom ___init___.
392 kwargs: dict[str, Any] = {}
393 if group is not None:
394 kwargs["group"] = group
395 if extra is not None:
396 kwargs["extra"] = extra
397 if watchers is not None:
398 kwargs["watchers"] = watchers
399
400 self.__attrs_init__(name=name, uri=uri, **kwargs)
401
402 @overload
403 @staticmethod
404 def ref(*, name: str) -> AssetNameRef: ...
405
406 @overload
407 @staticmethod
408 def ref(*, uri: str) -> AssetUriRef: ...
409
410 @staticmethod
411 def ref(*, name: str = "", uri: str = "") -> AssetRef:
412 if name and uri:
413 raise TypeError("Asset reference must be made to either name or URI, not both")
414 if name:
415 return AssetNameRef(name)
416 if uri:
417 return AssetUriRef(uri)
418 raise TypeError("Asset reference expects keyword argument 'name' or 'uri'")
419
420 def __fspath__(self) -> str:
421 return self.uri
422
423 def __eq__(self, other: Any) -> bool:
424 # The Asset class can be subclassed, and we don't want fields added by a
425 # subclass to break equality. This explicitly filters out only fields
426 # defined by the Asset class for comparison.
427 if not isinstance(other, Asset):
428 return NotImplemented
429 f = attrs.filters.include(*attrs.fields_dict(Asset))
430 return attrs.asdict(self, filter=f) == attrs.asdict(other, filter=f)
431
432 def __hash__(self):
433 f = attrs.filters.include(*attrs.fields_dict(Asset))
434 return hash(attrs.asdict(self, filter=f))
435
436 @property
437 def normalized_uri(self) -> str | None:
438 """
439 Returns the normalized and AIP-60 compliant URI whenever possible.
440
441 If we can't retrieve the scheme from URI or no normalizer is provided or if parsing fails,
442 it returns None.
443
444 If a normalizer for the scheme exists and parsing is successful we return the normalizer result.
445 """
446 if not (normalized_scheme := _get_normalized_scheme(self.uri)):
447 return None
448
449 if (normalizer := _get_uri_normalizer(normalized_scheme)) is None:
450 return None
451 parsed = urllib.parse.urlsplit(self.uri)
452 try:
453 normalized_uri = normalizer(parsed)
454 return urllib.parse.urlunsplit(normalized_uri)
455 except ValueError:
456 return None
457
458 def as_expression(self) -> Any:
459 """
460 Serialize the asset into its scheduling expression.
461
462 :meta private:
463 """
464 return {"asset": {"uri": self.uri, "name": self.name, "group": self.group}}
465
466 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]:
467 yield AssetUniqueKey.from_asset(self), self
468
469 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
470 return iter(())
471
472 def iter_asset_refs(self) -> Iterator[AssetRef]:
473 return iter(())
474
475 def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]:
476 """
477 Iterate an asset as dag dependency.
478
479 :meta private:
480 """
481 yield DagDependency(
482 source=source or "asset",
483 target=target or "asset",
484 label=self.name,
485 dependency_type="asset",
486 # We can't get asset id at this stage.
487 # This will be updated when running SerializedDagModel.get_dag_dependencies
488 dependency_id=AssetUniqueKey.from_asset(self).to_str(),
489 )
490
491 def asprofile(self) -> AssetProfile:
492 """
493 Profiles Asset to AssetProfile.
494
495 :meta private:
496 """
497 return AssetProfile(name=self.name or None, uri=self.uri or None, type=Asset.__name__)
498
499
500class AssetRef(BaseAsset, AttrsInstance):
501 """
502 Reference to an asset.
503
504 This class is not intended to be instantiated directly. Call ``Asset.ref``
505 instead to create one of the subclasses.
506
507 :meta private:
508 """
509
510 _dependency_type: Literal["asset-name-ref", "asset-uri-ref"]
511
512 def as_expression(self) -> Any:
513 return {"asset_ref": attrs.asdict(self)}
514
515 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]:
516 return iter(())
517
518 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
519 return iter(())
520
521 def iter_asset_refs(self) -> Iterator[AssetRef]:
522 yield self
523
524 def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterator[DagDependency]:
525 (dependency_id,) = attrs.astuple(self)
526 yield DagDependency(
527 source=source or self._dependency_type,
528 target=target or self._dependency_type,
529 label=dependency_id,
530 dependency_type=self._dependency_type,
531 dependency_id=dependency_id,
532 )
533
534
535@attrs.define(hash=True)
536class AssetNameRef(AssetRef):
537 """Name reference to an asset."""
538
539 name: str
540
541 _dependency_type = "asset-name-ref"
542
543
544@attrs.define(hash=True)
545class AssetUriRef(AssetRef):
546 """URI reference to an asset."""
547
548 uri: str
549
550 _dependency_type = "asset-uri-ref"
551
552
553class Dataset(Asset):
554 """A representation of dataset dependencies between workflows."""
555
556 asset_type: ClassVar[str] = "dataset"
557
558
559class Model(Asset):
560 """A representation of model dependencies between workflows."""
561
562 asset_type: ClassVar[str] = "model"
563
564
565@attrs.define(unsafe_hash=False)
566class AssetAlias(BaseAsset):
567 """A representation of asset alias which is used to create asset during the runtime."""
568
569 name: str = attrs.field(validator=_validate_non_empty_identifier)
570 group: str = attrs.field(kw_only=True, default="asset", validator=_validate_identifier)
571
572 def as_expression(self) -> Any:
573 """
574 Serialize the asset alias into its scheduling expression.
575
576 :meta private:
577 """
578 return {"alias": {"name": self.name, "group": self.group}}
579
580 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]:
581 return iter(())
582
583 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
584 yield self.name, self
585
586 def iter_asset_refs(self) -> Iterator[AssetRef]:
587 return iter(())
588
589 def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterator[DagDependency]:
590 """
591 Iterate an asset alias and its resolved assets as dag dependency.
592
593 :meta private:
594 """
595 yield DagDependency(
596 source=source or "asset-alias",
597 target=target or "asset-alias",
598 label=self.name,
599 dependency_type="asset-alias",
600 dependency_id=self.name,
601 )
602
603
604class AssetBooleanCondition(BaseAsset):
605 """
606 Base class for asset boolean logic.
607
608 :meta private:
609 """
610
611 agg_func: Callable[[Iterable], bool]
612
613 def __init__(self, *objects: BaseAsset) -> None:
614 if not all(isinstance(o, BaseAsset) for o in objects):
615 raise TypeError("expect asset expressions in condition")
616 self.objects = objects
617
618 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]:
619 for o in self.objects:
620 yield from o.iter_assets()
621
622 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
623 for o in self.objects:
624 yield from o.iter_asset_aliases()
625
626 def iter_asset_refs(self) -> Iterator[AssetRef]:
627 for o in self.objects:
628 yield from o.iter_asset_refs()
629
630 def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]:
631 """
632 Iterate asset, asset aliases and their resolved assets as dag dependency.
633
634 :meta private:
635 """
636 for obj in self.objects:
637 yield from obj.iter_dag_dependencies(source=source, target=target)
638
639
640class AssetAny(AssetBooleanCondition):
641 """Use to combine assets schedule references in an "or" relationship."""
642
643 agg_func = any # type: ignore[assignment]
644
645 def __or__(self, other: BaseAsset) -> BaseAsset:
646 if not isinstance(other, BaseAsset):
647 return NotImplemented
648 # Optimization: X | (Y | Z) is equivalent to X | Y | Z.
649 return AssetAny(*self.objects, other)
650
651 def __repr__(self) -> str:
652 return f"AssetAny({', '.join(map(str, self.objects))})"
653
654 def as_expression(self) -> dict[str, Any]:
655 """
656 Serialize the asset into its scheduling expression.
657
658 :meta private:
659 """
660 return {"any": [o.as_expression() for o in self.objects]}
661
662
663class AssetAll(AssetBooleanCondition):
664 """Use to combine assets schedule references in an "and" relationship."""
665
666 agg_func = all # type: ignore[assignment]
667
668 def __and__(self, other: BaseAsset) -> BaseAsset:
669 if not isinstance(other, BaseAsset):
670 return NotImplemented
671 # Optimization: X & (Y & Z) is equivalent to X & Y & Z.
672 return AssetAll(*self.objects, other)
673
674 def __repr__(self) -> str:
675 return f"AssetAll({', '.join(map(str, self.objects))})"
676
677 def as_expression(self) -> Any:
678 """
679 Serialize the assets into its scheduling expression.
680
681 :meta private:
682 """
683 return {"all": [o.as_expression() for o in self.objects]}
684
685
686@attrs.define
687class AssetAliasEvent(attrs.AttrsInstance):
688 """Representation of asset event to be triggered by an asset alias."""
689
690 source_alias_name: str
691 dest_asset_key: AssetUniqueKey
692 dest_asset_extra: dict[str, JsonValue]
693 extra: dict[str, JsonValue]