Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/datasets/__init__.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

121 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import os 

21import urllib.parse 

22import warnings 

23from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterable, Iterator 

24 

25import attr 

26 

27if TYPE_CHECKING: 

28 from urllib.parse import SplitResult 

29 

30 

31from airflow.configuration import conf 

32 

33__all__ = ["Dataset", "DatasetAll", "DatasetAny"] 

34 

35 

36def normalize_noop(parts: SplitResult) -> SplitResult: 

37 """Place-hold a :class:`~urllib.parse.SplitResult`` normalizer. 

38 

39 :meta private: 

40 """ 

41 return parts 

42 

43 

44def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | None: 

45 if scheme == "file": 

46 return normalize_noop 

47 from airflow.providers_manager import ProvidersManager 

48 

49 return ProvidersManager().dataset_uri_handlers.get(scheme) 

50 

51 

52def _sanitize_uri(uri: str) -> str: 

53 """Sanitize a dataset URI. 

54 

55 This checks for URI validity, and normalizes the URI if needed. A fully 

56 normalized URI is returned. 

57 """ 

58 if not uri: 

59 raise ValueError("Dataset URI cannot be empty") 

60 if uri.isspace(): 

61 raise ValueError("Dataset URI cannot be just whitespace") 

62 if not uri.isascii(): 

63 raise ValueError("Dataset URI must only consist of ASCII characters") 

64 parsed = urllib.parse.urlsplit(uri) 

65 if not parsed.scheme and not parsed.netloc: # Does not look like a URI. 

66 return uri 

67 normalized_scheme = parsed.scheme.lower() 

68 if normalized_scheme.startswith("x-"): 

69 return uri 

70 if normalized_scheme == "airflow": 

71 raise ValueError("Dataset scheme 'airflow' is reserved") 

72 _, auth_exists, normalized_netloc = parsed.netloc.rpartition("@") 

73 if auth_exists: 

74 # TODO: Collect this into a DagWarning. 

75 warnings.warn( 

76 "A dataset URI should not contain auth info (e.g. username or " 

77 "password). It has been automatically dropped.", 

78 UserWarning, 

79 stacklevel=3, 

80 ) 

81 if parsed.query: 

82 normalized_query = urllib.parse.urlencode(sorted(urllib.parse.parse_qsl(parsed.query))) 

83 else: 

84 normalized_query = "" 

85 parsed = parsed._replace( 

86 scheme=normalized_scheme, 

87 netloc=normalized_netloc, 

88 path=parsed.path.rstrip("/") or "/", # Remove all trailing slashes. 

89 query=normalized_query, 

90 fragment="", # Ignore any fragments. 

91 ) 

92 if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None: 

93 try: 

94 parsed = normalizer(parsed) 

95 except ValueError as exception: 

96 if conf.getboolean("core", "strict_dataset_uri_validation", fallback=False): 

97 raise 

98 warnings.warn( 

99 f"The dataset URI {uri} is not AIP-60 compliant: {exception}. " 

100 f"In Airflow 3, this will raise an exception.", 

101 UserWarning, 

102 stacklevel=3, 

103 ) 

104 return urllib.parse.urlunsplit(parsed) 

105 

106 

107def coerce_to_uri(value: str | Dataset) -> str: 

108 """Coerce a user input into a sanitized URI. 

109 

110 If the input value is a string, it is treated as a URI and sanitized. If the 

111 input is a :class:`Dataset`, the URI it contains is considered sanitized and 

112 returned directly. 

113 

114 :meta private: 

115 """ 

116 if isinstance(value, Dataset): 

117 return value.uri 

118 return _sanitize_uri(str(value)) 

119 

120 

121class BaseDataset: 

122 """Protocol for all dataset triggers to use in ``DAG(schedule=...)``. 

123 

124 :meta private: 

125 """ 

126 

127 def __or__(self, other: BaseDataset) -> DatasetAny: 

128 if not isinstance(other, BaseDataset): 

129 return NotImplemented 

130 return DatasetAny(self, other) 

131 

132 def __and__(self, other: BaseDataset) -> DatasetAll: 

133 if not isinstance(other, BaseDataset): 

134 return NotImplemented 

135 return DatasetAll(self, other) 

136 

137 def as_expression(self) -> Any: 

138 """Serialize the dataset into its scheduling expression. 

139 

140 The return value is stored in DagModel for display purposes. It must be 

141 JSON-compatible. 

142 

143 :meta private: 

144 """ 

145 raise NotImplementedError 

146 

147 def evaluate(self, statuses: dict[str, bool]) -> bool: 

148 raise NotImplementedError 

149 

150 def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: 

151 raise NotImplementedError 

152 

153 

154@attr.define() 

155class Dataset(os.PathLike, BaseDataset): 

156 """A representation of data dependencies between workflows.""" 

157 

158 uri: str = attr.field( 

159 converter=_sanitize_uri, 

160 validator=[attr.validators.min_len(1), attr.validators.max_len(3000)], 

161 ) 

162 extra: dict[str, Any] | None = None 

163 

164 __version__: ClassVar[int] = 1 

165 

166 def __fspath__(self) -> str: 

167 return self.uri 

168 

169 def __eq__(self, other: Any) -> bool: 

170 if isinstance(other, self.__class__): 

171 return self.uri == other.uri 

172 return NotImplemented 

173 

174 def __hash__(self) -> int: 

175 return hash(self.uri) 

176 

177 def as_expression(self) -> Any: 

178 """Serialize the dataset into its scheduling expression. 

179 

180 :meta private: 

181 """ 

182 return self.uri 

183 

184 def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: 

185 yield self.uri, self 

186 

187 def evaluate(self, statuses: dict[str, bool]) -> bool: 

188 return statuses.get(self.uri, False) 

189 

190 

191class _DatasetBooleanCondition(BaseDataset): 

192 """Base class for dataset boolean logic.""" 

193 

194 agg_func: Callable[[Iterable], bool] 

195 

196 def __init__(self, *objects: BaseDataset) -> None: 

197 if not all(isinstance(o, BaseDataset) for o in objects): 

198 raise TypeError("expect dataset expressions in condition") 

199 self.objects = objects 

200 

201 def evaluate(self, statuses: dict[str, bool]) -> bool: 

202 return self.agg_func(x.evaluate(statuses=statuses) for x in self.objects) 

203 

204 def iter_datasets(self) -> Iterator[tuple[str, Dataset]]: 

205 seen = set() # We want to keep the first instance. 

206 for o in self.objects: 

207 for k, v in o.iter_datasets(): 

208 if k in seen: 

209 continue 

210 yield k, v 

211 seen.add(k) 

212 

213 

214class DatasetAny(_DatasetBooleanCondition): 

215 """Use to combine datasets schedule references in an "and" relationship.""" 

216 

217 agg_func = any 

218 

219 def __or__(self, other: BaseDataset) -> DatasetAny: 

220 if not isinstance(other, BaseDataset): 

221 return NotImplemented 

222 # Optimization: X | (Y | Z) is equivalent to X | Y | Z. 

223 return DatasetAny(*self.objects, other) 

224 

225 def __repr__(self) -> str: 

226 return f"DatasetAny({', '.join(map(str, self.objects))})" 

227 

228 def as_expression(self) -> dict[str, Any]: 

229 """Serialize the dataset into its scheduling expression. 

230 

231 :meta private: 

232 """ 

233 return {"any": [o.as_expression() for o in self.objects]} 

234 

235 

236class DatasetAll(_DatasetBooleanCondition): 

237 """Use to combine datasets schedule references in an "or" relationship.""" 

238 

239 agg_func = all 

240 

241 def __and__(self, other: BaseDataset) -> DatasetAll: 

242 if not isinstance(other, BaseDataset): 

243 return NotImplemented 

244 # Optimization: X & (Y & Z) is equivalent to X & Y & Z. 

245 return DatasetAll(*self.objects, other) 

246 

247 def __repr__(self) -> str: 

248 return f"DatasetAll({', '.join(map(str, self.objects))})" 

249 

250 def as_expression(self) -> Any: 

251 """Serialize the dataset into its scheduling expression. 

252 

253 :meta private: 

254 """ 

255 return {"all": [o.as_expression() for o in self.objects]}