Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/definitions/asset/__init__.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

310 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import json 

21import logging 

22import operator 

23import os 

24import urllib.parse 

25import warnings 

26from collections.abc import Callable 

27from typing import TYPE_CHECKING, Any, ClassVar, Literal, overload 

28 

29import attrs 

30 

31from airflow.sdk.api.datamodels._generated import AssetProfile 

32from airflow.serialization.dag_dependency import DagDependency 

33 

34if TYPE_CHECKING: 

35 from collections.abc import Iterable, Iterator 

36 from urllib.parse import SplitResult 

37 

38 from pydantic.types import JsonValue 

39 

40 from airflow.models.asset import AssetModel 

41 from airflow.sdk.io.path import ObjectStoragePath 

42 from airflow.serialization.definitions.assets import SerializedAssetWatcher 

43 from airflow.triggers.base import BaseEventTrigger 

44 

45 AttrsInstance = attrs.AttrsInstance 

46else: 

47 AttrsInstance = object 

48 

49 

50__all__ = [ 

51 "Asset", 

52 "Dataset", 

53 "Model", 

54 "AssetAlias", 

55 "AssetAll", 

56 "AssetAny", 

57 "AssetNameRef", 

58 "AssetRef", 

59 "AssetUriRef", 

60 "AssetWatcher", 

61] 

62 

63from airflow.sdk.configuration import conf 

64 

65log = logging.getLogger(__name__) 

66 

67 

68SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE") 

69 

70 

71@attrs.define(frozen=True) 

72class AssetUniqueKey(attrs.AttrsInstance): 

73 """ 

74 Columns to identify an unique asset. 

75 

76 :meta private: 

77 """ 

78 

79 name: str 

80 uri: str 

81 

82 @staticmethod 

83 def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: 

84 return AssetUniqueKey(name=asset.name, uri=asset.uri) 

85 

86 def to_asset(self) -> Asset: 

87 return Asset(name=self.name, uri=self.uri) 

88 

89 @staticmethod 

90 def from_str(key: str) -> AssetUniqueKey: 

91 return AssetUniqueKey(**json.loads(key)) 

92 

93 def to_str(self) -> str: 

94 return json.dumps(attrs.asdict(self)) 

95 

96 @staticmethod 

97 def from_profile(profile: AssetProfile) -> AssetUniqueKey: 

98 if profile.name and profile.uri: 

99 return AssetUniqueKey(name=profile.name, uri=profile.uri) 

100 

101 if name := profile.name: 

102 return AssetUniqueKey(name=name, uri=name) 

103 if uri := profile.uri: 

104 return AssetUniqueKey(name=uri, uri=uri) 

105 

106 raise ValueError("name and uri cannot both be empty") 

107 

108 

109@attrs.define(frozen=True) 

110class AssetAliasUniqueKey: 

111 """ 

112 Columns to identify an unique asset alias. 

113 

114 :meta private: 

115 """ 

116 

117 name: str 

118 

119 @staticmethod 

120 def from_asset_alias(asset_alias: AssetAlias) -> AssetAliasUniqueKey: 

121 return AssetAliasUniqueKey(name=asset_alias.name) 

122 

123 def to_asset_alias(self) -> AssetAlias: 

124 return AssetAlias(name=self.name) 

125 

126 

127BaseAssetUniqueKey = AssetUniqueKey | AssetAliasUniqueKey 

128 

129 

130def normalize_noop(parts: SplitResult) -> SplitResult: 

131 """ 

132 Place-hold a :class:`~urllib.parse.SplitResult`` normalizer. 

133 

134 :meta private: 

135 """ 

136 return parts 

137 

138 

139def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | None: 

140 if scheme == "file": 

141 return normalize_noop 

142 from airflow.providers_manager import ProvidersManager 

143 

144 return ProvidersManager().asset_uri_handlers.get(scheme) 

145 

146 

147def _get_normalized_scheme(uri: str) -> str: 

148 parsed = urllib.parse.urlsplit(uri) 

149 return parsed.scheme.lower() 

150 

151 

152def _sanitize_uri(inp: str | ObjectStoragePath) -> str: 

153 """ 

154 Sanitize an asset URI. 

155 

156 This checks for URI validity, and normalizes the URI if needed. A fully 

157 normalized URI is returned. 

158 """ 

159 uri = str(inp) 

160 parsed = urllib.parse.urlsplit(uri) 

161 if not parsed.scheme and not parsed.netloc: # Does not look like a URI. 

162 return uri 

163 if not (normalized_scheme := _get_normalized_scheme(uri)): 

164 return uri 

165 if normalized_scheme.startswith("x-"): 

166 return uri 

167 if normalized_scheme == "airflow": 

168 raise ValueError("Asset scheme 'airflow' is reserved") 

169 if parsed.password: 

170 # TODO: Collect this into a DagWarning. 

171 warnings.warn( 

172 "An Asset URI should not contain a password. User info has been automatically dropped.", 

173 UserWarning, 

174 stacklevel=3, 

175 ) 

176 _, _, normalized_netloc = parsed.netloc.rpartition("@") 

177 else: 

178 normalized_netloc = parsed.netloc 

179 if parsed.query: 

180 normalized_query = urllib.parse.urlencode(sorted(urllib.parse.parse_qsl(parsed.query))) 

181 else: 

182 normalized_query = "" 

183 parsed = parsed._replace( 

184 scheme=normalized_scheme, 

185 netloc=normalized_netloc, 

186 path=parsed.path.rstrip("/") or "/", # Remove all trailing slashes. 

187 query=normalized_query, 

188 fragment="", # Ignore any fragments. 

189 ) 

190 if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None: 

191 parsed = normalizer(parsed) 

192 return urllib.parse.urlunsplit(parsed) 

193 

194 

195def _validate_identifier(instance, attribute, value): 

196 if not isinstance(value, str): 

197 raise ValueError(f"{type(instance).__name__} {attribute.name} must be a string") 

198 if len(value) > 1500: 

199 raise ValueError(f"{type(instance).__name__} {attribute.name} cannot exceed 1500 characters") 

200 if value.isspace(): 

201 raise ValueError(f"{type(instance).__name__} {attribute.name} cannot be just whitespace") 

202 # We use latin1_general_cs to store the name (and group, asset values etc.) on MySQL. 

203 # relaxing this check for non mysql backend 

204 if SQL_ALCHEMY_CONN.startswith("mysql") and not value.isascii(): 

205 raise ValueError(f"{type(instance).__name__} {attribute.name} must only consist of ASCII characters") 

206 return value 

207 

208 

209def _validate_non_empty_identifier(instance, attribute, value): 

210 if not _validate_identifier(instance, attribute, value): 

211 raise ValueError(f"{type(instance).__name__} {attribute.name} cannot be empty") 

212 return value 

213 

214 

215def _validate_asset_name(instance, attribute, value): 

216 _validate_non_empty_identifier(instance, attribute, value) 

217 if value == "self" or value == "context": 

218 raise ValueError(f"prohibited name for asset: {value}") 

219 return value 

220 

221 

222def _set_extra_default(extra: dict[str, JsonValue] | None) -> dict: 

223 """ 

224 Automatically convert None to an empty dict. 

225 

226 This allows the caller site to continue doing ``Asset(uri, extra=None)``, 

227 but still allow the ``extra`` attribute to always be a dict. 

228 """ 

229 if extra is None: 

230 return {} 

231 return extra 

232 

233 

234class BaseAsset: 

235 """ 

236 Protocol for all asset triggers to use in ``DAG(schedule=...)``. 

237 

238 :meta private: 

239 """ 

240 

241 def __bool__(self) -> bool: 

242 return True 

243 

244 def __or__(self, other: BaseAsset) -> BaseAsset: 

245 if not isinstance(other, BaseAsset): 

246 return NotImplemented 

247 return AssetAny(self, other) 

248 

249 def __and__(self, other: BaseAsset) -> BaseAsset: 

250 if not isinstance(other, BaseAsset): 

251 return NotImplemented 

252 return AssetAll(self, other) 

253 

254 def as_expression(self) -> Any: 

255 """ 

256 Serialize the asset into its scheduling expression. 

257 

258 The return value is stored in DagModel for display purposes. It must be 

259 JSON-compatible. 

260 

261 :meta private: 

262 """ 

263 raise NotImplementedError 

264 

265 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: 

266 raise NotImplementedError 

267 

268 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: 

269 raise NotImplementedError 

270 

271 def iter_asset_refs(self) -> Iterator[AssetRef]: 

272 raise NotImplementedError 

273 

274 def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]: 

275 """ 

276 Iterate a base asset as dag dependency. 

277 

278 :meta private: 

279 """ 

280 raise NotImplementedError 

281 

282 

283@attrs.define(init=False) 

284class AssetWatcher: 

285 """A representation of an asset watcher. The name uniquely identifies the watch.""" 

286 

287 name: str 

288 # This attribute serves double purpose. 

289 # For a "normal" asset instance loaded from Dag, this holds the trigger used to monitor an external 

290 # resource. In that case, ``AssetWatcher`` is used directly by users. 

291 # For an asset recreated from a serialized Dag, this holds the serialized data of the trigger. In that 

292 # case, `SerializedAssetWatcher` is used. We need to keep the two types to make mypy happy because 

293 # `SerializedAssetWatcher` is a subclass of `AssetWatcher`. 

294 trigger: BaseEventTrigger | dict 

295 

296 def __init__( 

297 self, 

298 name: str, 

299 trigger: BaseEventTrigger | dict, 

300 ) -> None: 

301 from airflow.triggers.base import BaseEventTrigger, BaseTrigger 

302 

303 if isinstance(trigger, BaseTrigger) and not isinstance(trigger, BaseEventTrigger): 

304 raise ValueError("The trigger used to watch an asset must inherit ``BaseEventTrigger``") 

305 

306 self.name = name 

307 self.trigger = trigger 

308 

309 

310@attrs.define(init=False, unsafe_hash=False) 

311class Asset(os.PathLike, BaseAsset): 

312 """A representation of data asset dependencies between workflows.""" 

313 

314 name: str = attrs.field( 

315 validator=[_validate_asset_name], 

316 ) 

317 uri: str = attrs.field( 

318 validator=[_validate_non_empty_identifier], 

319 converter=_sanitize_uri, 

320 ) 

321 group: str = attrs.field( 

322 default=attrs.Factory(operator.attrgetter("asset_type"), takes_self=True), 

323 validator=[_validate_identifier], 

324 ) 

325 extra: dict[str, JsonValue] = attrs.field( 

326 factory=dict, 

327 converter=_set_extra_default, 

328 ) 

329 watchers: list[AssetWatcher | SerializedAssetWatcher] = attrs.field( 

330 factory=list, 

331 ) 

332 

333 asset_type: ClassVar[str] = "asset" 

334 __version__: ClassVar[int] = 1 

335 

336 @overload 

337 def __init__( 

338 self, 

339 name: str, 

340 uri: str | ObjectStoragePath, 

341 *, 

342 group: str = ..., 

343 extra: dict[str, JsonValue] | None = None, 

344 watchers: list[AssetWatcher | SerializedAssetWatcher] = ..., 

345 ) -> None: 

346 """Canonical; both name and uri are provided.""" 

347 

348 @overload 

349 def __init__( 

350 self, 

351 name: str, 

352 *, 

353 group: str = ..., 

354 extra: dict[str, JsonValue] | None = None, 

355 watchers: list[AssetWatcher | SerializedAssetWatcher] = ..., 

356 ) -> None: 

357 """It's possible to only provide the name, either by keyword or as the only positional argument.""" 

358 

359 @overload 

360 def __init__( 

361 self, 

362 *, 

363 uri: str | ObjectStoragePath, 

364 group: str = ..., 

365 extra: dict[str, JsonValue] | None = None, 

366 watchers: list[AssetWatcher | SerializedAssetWatcher] = ..., 

367 ) -> None: 

368 """It's possible to only provide the URI as a keyword argument.""" 

369 

370 def __init__( 

371 self, 

372 name: str | None = None, 

373 uri: str | ObjectStoragePath | None = None, 

374 *, 

375 group: str | None = None, 

376 extra: dict[str, JsonValue] | None = None, 

377 watchers: list[AssetWatcher | SerializedAssetWatcher] | None = None, 

378 ) -> None: 

379 if name is None and uri is None: 

380 raise TypeError("Asset() requires either 'name' or 'uri'") 

381 if name is None: 

382 name = str(uri) 

383 elif uri is None: 

384 uri = name 

385 

386 if TYPE_CHECKING: 

387 assert name is not None 

388 assert uri is not None 

389 

390 # attrs default (and factory) does not kick in if any value is given to 

391 # the argument. We need to exclude defaults from the custom ___init___. 

392 kwargs: dict[str, Any] = {} 

393 if group is not None: 

394 kwargs["group"] = group 

395 if extra is not None: 

396 kwargs["extra"] = extra 

397 if watchers is not None: 

398 kwargs["watchers"] = watchers 

399 

400 self.__attrs_init__(name=name, uri=uri, **kwargs) 

401 

402 @overload 

403 @staticmethod 

404 def ref(*, name: str) -> AssetNameRef: ... 

405 

406 @overload 

407 @staticmethod 

408 def ref(*, uri: str) -> AssetUriRef: ... 

409 

410 @staticmethod 

411 def ref(*, name: str = "", uri: str = "") -> AssetRef: 

412 if name and uri: 

413 raise TypeError("Asset reference must be made to either name or URI, not both") 

414 if name: 

415 return AssetNameRef(name) 

416 if uri: 

417 return AssetUriRef(uri) 

418 raise TypeError("Asset reference expects keyword argument 'name' or 'uri'") 

419 

420 def __fspath__(self) -> str: 

421 return self.uri 

422 

423 def __eq__(self, other: Any) -> bool: 

424 # The Asset class can be subclassed, and we don't want fields added by a 

425 # subclass to break equality. This explicitly filters out only fields 

426 # defined by the Asset class for comparison. 

427 if not isinstance(other, Asset): 

428 return NotImplemented 

429 f = attrs.filters.include(*attrs.fields_dict(Asset)) 

430 return attrs.asdict(self, filter=f) == attrs.asdict(other, filter=f) 

431 

432 def __hash__(self): 

433 f = attrs.filters.include(*attrs.fields_dict(Asset)) 

434 return hash(attrs.asdict(self, filter=f)) 

435 

436 @property 

437 def normalized_uri(self) -> str | None: 

438 """ 

439 Returns the normalized and AIP-60 compliant URI whenever possible. 

440 

441 If we can't retrieve the scheme from URI or no normalizer is provided or if parsing fails, 

442 it returns None. 

443 

444 If a normalizer for the scheme exists and parsing is successful we return the normalizer result. 

445 """ 

446 if not (normalized_scheme := _get_normalized_scheme(self.uri)): 

447 return None 

448 

449 if (normalizer := _get_uri_normalizer(normalized_scheme)) is None: 

450 return None 

451 parsed = urllib.parse.urlsplit(self.uri) 

452 try: 

453 normalized_uri = normalizer(parsed) 

454 return urllib.parse.urlunsplit(normalized_uri) 

455 except ValueError: 

456 return None 

457 

458 def as_expression(self) -> Any: 

459 """ 

460 Serialize the asset into its scheduling expression. 

461 

462 :meta private: 

463 """ 

464 return {"asset": {"uri": self.uri, "name": self.name, "group": self.group}} 

465 

466 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: 

467 yield AssetUniqueKey.from_asset(self), self 

468 

469 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: 

470 return iter(()) 

471 

472 def iter_asset_refs(self) -> Iterator[AssetRef]: 

473 return iter(()) 

474 

475 def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]: 

476 """ 

477 Iterate an asset as dag dependency. 

478 

479 :meta private: 

480 """ 

481 yield DagDependency( 

482 source=source or "asset", 

483 target=target or "asset", 

484 label=self.name, 

485 dependency_type="asset", 

486 # We can't get asset id at this stage. 

487 # This will be updated when running SerializedDagModel.get_dag_dependencies 

488 dependency_id=AssetUniqueKey.from_asset(self).to_str(), 

489 ) 

490 

491 def asprofile(self) -> AssetProfile: 

492 """ 

493 Profiles Asset to AssetProfile. 

494 

495 :meta private: 

496 """ 

497 return AssetProfile(name=self.name or None, uri=self.uri or None, type=Asset.__name__) 

498 

499 

500class AssetRef(BaseAsset, AttrsInstance): 

501 """ 

502 Reference to an asset. 

503 

504 This class is not intended to be instantiated directly. Call ``Asset.ref`` 

505 instead to create one of the subclasses. 

506 

507 :meta private: 

508 """ 

509 

510 _dependency_type: Literal["asset-name-ref", "asset-uri-ref"] 

511 

512 def as_expression(self) -> Any: 

513 return {"asset_ref": attrs.asdict(self)} 

514 

515 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: 

516 return iter(()) 

517 

518 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: 

519 return iter(()) 

520 

521 def iter_asset_refs(self) -> Iterator[AssetRef]: 

522 yield self 

523 

524 def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterator[DagDependency]: 

525 (dependency_id,) = attrs.astuple(self) 

526 yield DagDependency( 

527 source=source or self._dependency_type, 

528 target=target or self._dependency_type, 

529 label=dependency_id, 

530 dependency_type=self._dependency_type, 

531 dependency_id=dependency_id, 

532 ) 

533 

534 

535@attrs.define(hash=True) 

536class AssetNameRef(AssetRef): 

537 """Name reference to an asset.""" 

538 

539 name: str 

540 

541 _dependency_type = "asset-name-ref" 

542 

543 

544@attrs.define(hash=True) 

545class AssetUriRef(AssetRef): 

546 """URI reference to an asset.""" 

547 

548 uri: str 

549 

550 _dependency_type = "asset-uri-ref" 

551 

552 

553class Dataset(Asset): 

554 """A representation of dataset dependencies between workflows.""" 

555 

556 asset_type: ClassVar[str] = "dataset" 

557 

558 

559class Model(Asset): 

560 """A representation of model dependencies between workflows.""" 

561 

562 asset_type: ClassVar[str] = "model" 

563 

564 

565@attrs.define(unsafe_hash=False) 

566class AssetAlias(BaseAsset): 

567 """A representation of asset alias which is used to create asset during the runtime.""" 

568 

569 name: str = attrs.field(validator=_validate_non_empty_identifier) 

570 group: str = attrs.field(kw_only=True, default="asset", validator=_validate_identifier) 

571 

572 def as_expression(self) -> Any: 

573 """ 

574 Serialize the asset alias into its scheduling expression. 

575 

576 :meta private: 

577 """ 

578 return {"alias": {"name": self.name, "group": self.group}} 

579 

580 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: 

581 return iter(()) 

582 

583 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: 

584 yield self.name, self 

585 

586 def iter_asset_refs(self) -> Iterator[AssetRef]: 

587 return iter(()) 

588 

589 def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterator[DagDependency]: 

590 """ 

591 Iterate an asset alias and its resolved assets as dag dependency. 

592 

593 :meta private: 

594 """ 

595 yield DagDependency( 

596 source=source or "asset-alias", 

597 target=target or "asset-alias", 

598 label=self.name, 

599 dependency_type="asset-alias", 

600 dependency_id=self.name, 

601 ) 

602 

603 

604class AssetBooleanCondition(BaseAsset): 

605 """ 

606 Base class for asset boolean logic. 

607 

608 :meta private: 

609 """ 

610 

611 agg_func: Callable[[Iterable], bool] 

612 

613 def __init__(self, *objects: BaseAsset) -> None: 

614 if not all(isinstance(o, BaseAsset) for o in objects): 

615 raise TypeError("expect asset expressions in condition") 

616 self.objects = objects 

617 

618 def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: 

619 for o in self.objects: 

620 yield from o.iter_assets() 

621 

622 def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: 

623 for o in self.objects: 

624 yield from o.iter_asset_aliases() 

625 

626 def iter_asset_refs(self) -> Iterator[AssetRef]: 

627 for o in self.objects: 

628 yield from o.iter_asset_refs() 

629 

630 def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]: 

631 """ 

632 Iterate asset, asset aliases and their resolved assets as dag dependency. 

633 

634 :meta private: 

635 """ 

636 for obj in self.objects: 

637 yield from obj.iter_dag_dependencies(source=source, target=target) 

638 

639 

640class AssetAny(AssetBooleanCondition): 

641 """Use to combine assets schedule references in an "or" relationship.""" 

642 

643 agg_func = any # type: ignore[assignment] 

644 

645 def __or__(self, other: BaseAsset) -> BaseAsset: 

646 if not isinstance(other, BaseAsset): 

647 return NotImplemented 

648 # Optimization: X | (Y | Z) is equivalent to X | Y | Z. 

649 return AssetAny(*self.objects, other) 

650 

651 def __repr__(self) -> str: 

652 return f"AssetAny({', '.join(map(str, self.objects))})" 

653 

654 def as_expression(self) -> dict[str, Any]: 

655 """ 

656 Serialize the asset into its scheduling expression. 

657 

658 :meta private: 

659 """ 

660 return {"any": [o.as_expression() for o in self.objects]} 

661 

662 

663class AssetAll(AssetBooleanCondition): 

664 """Use to combine assets schedule references in an "and" relationship.""" 

665 

666 agg_func = all # type: ignore[assignment] 

667 

668 def __and__(self, other: BaseAsset) -> BaseAsset: 

669 if not isinstance(other, BaseAsset): 

670 return NotImplemented 

671 # Optimization: X & (Y & Z) is equivalent to X & Y & Z. 

672 return AssetAll(*self.objects, other) 

673 

674 def __repr__(self) -> str: 

675 return f"AssetAll({', '.join(map(str, self.objects))})" 

676 

677 def as_expression(self) -> Any: 

678 """ 

679 Serialize the assets into its scheduling expression. 

680 

681 :meta private: 

682 """ 

683 return {"all": [o.as_expression() for o in self.objects]} 

684 

685 

686@attrs.define 

687class AssetAliasEvent(attrs.AttrsInstance): 

688 """Representation of asset event to be triggered by an asset alias.""" 

689 

690 source_alias_name: str 

691 dest_asset_key: AssetUniqueKey 

692 dest_asset_extra: dict[str, JsonValue] 

693 extra: dict[str, JsonValue]