Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/callbacks/callback_requests.py: 43%

46 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import json 

20from typing import TYPE_CHECKING 

21 

22if TYPE_CHECKING: 

23 from airflow.models.taskinstance import SimpleTaskInstance 

24 

25 

26class CallbackRequest: 

27 """ 

28 Base Class with information about the callback to be executed. 

29 

30 :param full_filepath: File Path to use to run the callback 

31 :param msg: Additional Message that can be used for logging 

32 :param processor_subdir: Directory used by Dag Processor when parsed the dag. 

33 """ 

34 

35 def __init__( 

36 self, 

37 full_filepath: str, 

38 processor_subdir: str | None = None, 

39 msg: str | None = None, 

40 ): 

41 self.full_filepath = full_filepath 

42 self.processor_subdir = processor_subdir 

43 self.msg = msg 

44 

45 def __eq__(self, other): 

46 if isinstance(other, self.__class__): 

47 return self.__dict__ == other.__dict__ 

48 return NotImplemented 

49 

50 def __repr__(self): 

51 return str(self.__dict__) 

52 

53 def to_json(self) -> str: 

54 return json.dumps(self.__dict__) 

55 

56 @classmethod 

57 def from_json(cls, json_str: str): 

58 json_object = json.loads(json_str) 

59 return cls(**json_object) 

60 

61 

62class TaskCallbackRequest(CallbackRequest): 

63 """ 

64 Task callback status information. 

65 

66 A Class with information about the success/failure TI callback to be executed. Currently, only failure 

67 callbacks (when tasks are externally killed) and Zombies are run via DagFileProcessorProcess. 

68 

69 :param full_filepath: File Path to use to run the callback 

70 :param simple_task_instance: Simplified Task Instance representation 

71 :param is_failure_callback: Flag to determine whether it is a Failure Callback or Success Callback 

72 :param msg: Additional Message that can be used for logging to determine failure/zombie 

73 :param processor_subdir: Directory used by Dag Processor when parsed the dag. 

74 """ 

75 

76 def __init__( 

77 self, 

78 full_filepath: str, 

79 simple_task_instance: SimpleTaskInstance, 

80 is_failure_callback: bool | None = True, 

81 processor_subdir: str | None = None, 

82 msg: str | None = None, 

83 ): 

84 super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg) 

85 self.simple_task_instance = simple_task_instance 

86 self.is_failure_callback = is_failure_callback 

87 

88 def to_json(self) -> str: 

89 from airflow.serialization.serialized_objects import BaseSerialization 

90 

91 val = BaseSerialization.serialize(self.__dict__, strict=True) 

92 return json.dumps(val) 

93 

94 @classmethod 

95 def from_json(cls, json_str: str): 

96 from airflow.serialization.serialized_objects import BaseSerialization 

97 

98 val = json.loads(json_str) 

99 return cls(**BaseSerialization.deserialize(val)) 

100 

101 

102class DagCallbackRequest(CallbackRequest): 

103 """ 

104 A Class with information about the success/failure DAG callback to be executed. 

105 

106 :param full_filepath: File Path to use to run the callback 

107 :param dag_id: DAG ID 

108 :param run_id: Run ID for the DagRun 

109 :param processor_subdir: Directory used by Dag Processor when parsed the dag. 

110 :param is_failure_callback: Flag to determine whether it is a Failure Callback or Success Callback 

111 :param msg: Additional Message that can be used for logging 

112 """ 

113 

114 def __init__( 

115 self, 

116 full_filepath: str, 

117 dag_id: str, 

118 run_id: str, 

119 processor_subdir: str | None, 

120 is_failure_callback: bool | None = True, 

121 msg: str | None = None, 

122 ): 

123 super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg) 

124 self.dag_id = dag_id 

125 self.run_id = run_id 

126 self.is_failure_callback = is_failure_callback 

127 

128 

129class SlaCallbackRequest(CallbackRequest): 

130 """ 

131 A class with information about the SLA callback to be executed. 

132 

133 :param full_filepath: File Path to use to run the callback 

134 :param dag_id: DAG ID 

135 :param processor_subdir: Directory used by Dag Processor when parsed the dag. 

136 """ 

137 

138 def __init__( 

139 self, 

140 full_filepath: str, 

141 dag_id: str, 

142 processor_subdir: str | None, 

143 msg: str | None = None, 

144 ): 

145 super().__init__(full_filepath, processor_subdir=processor_subdir, msg=msg) 

146 self.dag_id = dag_id