Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/serialization/json_schema.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

27 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""jsonschema for validating serialized DAG and operator.""" 

19 

20from __future__ import annotations 

21 

22import pkgutil 

23from typing import TYPE_CHECKING, Iterable 

24 

25from airflow.exceptions import AirflowException 

26from airflow.settings import json 

27from airflow.typing_compat import Protocol 

28 

29if TYPE_CHECKING: 

30 import jsonschema 

31 

32 

33class Validator(Protocol): 

34 """ 

35 This class is only used for type checking. 

36 

37 A workaround for IDEs, mypy, etc. due to the way ``Draft7Validator`` is created. 

38 They are created or do not inherit from proper classes. 

39 Hence, you can not have ``type: Draft7Validator``. 

40 """ 

41 

42 def is_valid(self, instance) -> bool: 

43 """Check if the instance is valid under the current schema.""" 

44 ... 

45 

46 def validate(self, instance) -> None: 

47 """Check if the instance is valid under the current schema, raising validation error if not.""" 

48 ... 

49 

50 def iter_errors(self, instance) -> Iterable[jsonschema.exceptions.ValidationError]: 

51 """Lazily yield each of the validation errors in the given instance.""" 

52 ... 

53 

54 

55def load_dag_schema_dict() -> dict: 

56 """Load & return Json Schema for DAG as Python dict.""" 

57 schema_file_name = "schema.json" 

58 schema_file = pkgutil.get_data(__name__, schema_file_name) 

59 

60 if schema_file is None: 

61 raise AirflowException(f"Schema file {schema_file_name} does not exists") 

62 

63 schema = json.loads(schema_file.decode()) 

64 return schema 

65 

66 

67def load_dag_schema() -> Validator: 

68 """Load & Validate Json Schema for DAG.""" 

69 import jsonschema 

70 

71 schema = load_dag_schema_dict() 

72 return jsonschema.Draft7Validator(schema)