Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/models/dagbundle.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

50 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from datetime import datetime 

20 

21from sqlalchemy import Boolean, String 

22from sqlalchemy.orm import Mapped, relationship 

23from sqlalchemy_utils import JSONType 

24 

25from airflow.models.base import Base, StringID 

26from airflow.models.team import dag_bundle_team_association_table 

27from airflow.utils.log.logging_mixin import LoggingMixin 

28from airflow.utils.sqlalchemy import UtcDateTime, mapped_column 

29 

30 

31class DagBundleModel(Base, LoggingMixin): 

32 """ 

33 A table for storing DAG bundle metadata. 

34 

35 We track the following information about each bundle, as it can be useful for 

36 informational purposes and for debugging: 

37 

38 - active: Is the bundle currently found in configuration? 

39 - version: The latest version Airflow has seen for the bundle. 

40 - last_refreshed: When the bundle was last refreshed. 

41 - signed_url_template: Signed URL template for viewing the bundle 

42 - template_params: JSON object containing template parameters for constructing view url (e.g., {"subdir": "dags"}) 

43 

44 """ 

45 

46 __tablename__ = "dag_bundle" 

47 name: Mapped[str] = mapped_column(StringID(length=250), primary_key=True, nullable=False) 

48 active: Mapped[bool | None] = mapped_column(Boolean, default=True, nullable=True) 

49 version: Mapped[str | None] = mapped_column(String(200), nullable=True) 

50 last_refreshed: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True) 

51 signed_url_template: Mapped[str | None] = mapped_column(String(200), nullable=True) 

52 template_params: Mapped[dict | None] = mapped_column(JSONType, nullable=True) 

53 teams = relationship("Team", secondary=dag_bundle_team_association_table, back_populates="dag_bundles") 

54 

55 def __init__(self, *, name: str, version: str | None = None): 

56 super().__init__() 

57 self.name = name 

58 self.version = version 

59 

60 def _unsign_url(self) -> str | None: 

61 """ 

62 Unsign a URL token to get the original URL template. 

63 

64 :param signed_url: The signed URL token 

65 :return: The original URL template or None if unsigning fails 

66 """ 

67 try: 

68 from itsdangerous import BadSignature, URLSafeSerializer 

69 

70 from airflow.configuration import conf 

71 

72 if not self.signed_url_template: 

73 return None 

74 

75 serializer = URLSafeSerializer(conf.get_mandatory_value("core", "fernet_key")) 

76 payload = serializer.loads(self.signed_url_template) 

77 if isinstance(payload, dict) and "url" in payload and "bundle_name" in payload: 

78 if payload["bundle_name"] == self.name: 

79 return payload["url"] 

80 

81 return None 

82 except (BadSignature, Exception): 

83 return None 

84 

85 def render_url(self, version: str | None = None) -> str | None: 

86 """ 

87 Render the URL template with the given version and stored template parameters. 

88 

89 First unsigns the URL to get the original template, then formats it with 

90 the provided version and any additional parameters. 

91 

92 :param version: The version to substitute in the template 

93 :return: The rendered URL or None if no template is available 

94 """ 

95 if not self.signed_url_template: 

96 return None 

97 

98 url_template = self._unsign_url() 

99 

100 if url_template is None: 

101 return None 

102 

103 params = dict(self.template_params or {}) 

104 params["version"] = version 

105 

106 try: 

107 return url_template.format(**params) 

108 except (KeyError, ValueError) as e: 

109 self.log.warning("Failed to render URL template for bundle %s: %s", self.name, e) 

110 return None