1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""jsonschema for validating serialized DAG and operator."""
19
20from __future__ import annotations
21
22import pkgutil
23from typing import TYPE_CHECKING, Iterable
24
25from airflow.exceptions import AirflowException
26from airflow.settings import json
27from airflow.typing_compat import Protocol
28
29if TYPE_CHECKING:
30 import jsonschema
31
32
33class Validator(Protocol):
34 """
35 This class is only used for type checking.
36
37 A workaround for IDEs, mypy, etc. due to the way ``Draft7Validator`` is created.
38 They are created or do not inherit from proper classes.
39 Hence, you can not have ``type: Draft7Validator``.
40 """
41
42 def is_valid(self, instance) -> bool:
43 """Check if the instance is valid under the current schema."""
44 ...
45
46 def validate(self, instance) -> None:
47 """Check if the instance is valid under the current schema, raising validation error if not."""
48 ...
49
50 def iter_errors(self, instance) -> Iterable[jsonschema.exceptions.ValidationError]:
51 """Lazily yield each of the validation errors in the given instance."""
52 ...
53
54
55def load_dag_schema_dict() -> dict:
56 """Load & return Json Schema for DAG as Python dict."""
57 schema_file_name = "schema.json"
58 schema_file = pkgutil.get_data(__name__, schema_file_name)
59
60 if schema_file is None:
61 raise AirflowException(f"Schema file {schema_file_name} does not exists")
62
63 schema = json.loads(schema_file.decode())
64 return schema
65
66
67def load_dag_schema() -> Validator:
68 """Load & Validate Json Schema for DAG."""
69 import jsonschema
70
71 schema = load_dag_schema_dict()
72 return jsonschema.Draft7Validator(schema)