Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/callbacks/callback_requests.py: 43%
46 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import json
20from typing import TYPE_CHECKING
22if TYPE_CHECKING:
23 from airflow.models.taskinstance import SimpleTaskInstance
26class CallbackRequest:
27 """
28 Base Class with information about the callback to be executed.
30 :param full_filepath: File Path to use to run the callback
31 :param msg: Additional Message that can be used for logging
32 :param processor_subdir: Directory used by Dag Processor when parsed the dag.
33 """
35 def __init__(
36 self,
37 full_filepath: str,
38 processor_subdir: str | None = None,
39 msg: str | None = None,
40 ):
41 self.full_filepath = full_filepath
42 self.processor_subdir = processor_subdir
43 self.msg = msg
45 def __eq__(self, other):
46 if isinstance(other, self.__class__):
47 return self.__dict__ == other.__dict__
48 return NotImplemented
50 def __repr__(self):
51 return str(self.__dict__)
53 def to_json(self) -> str:
54 return json.dumps(self.__dict__)
56 @classmethod
57 def from_json(cls, json_str: str):
58 json_object = json.loads(json_str)
59 return cls(**json_object)
62class TaskCallbackRequest(CallbackRequest):
63 """
64 Task callback status information.
66 A Class with information about the success/failure TI callback to be executed. Currently, only failure
67 callbacks (when tasks are externally killed) and Zombies are run via DagFileProcessorProcess.
69 :param full_filepath: File Path to use to run the callback
70 :param simple_task_instance: Simplified Task Instance representation
71 :param is_failure_callback: Flag to determine whether it is a Failure Callback or Success Callback
72 :param msg: Additional Message that can be used for logging to determine failure/zombie
73 :param processor_subdir: Directory used by Dag Processor when parsed the dag.
74 """
76 def __init__(
77 self,
78 full_filepath: str,
79 simple_task_instance: SimpleTaskInstance,
80 is_failure_callback: bool | None = True,
81 processor_subdir: str | None = None,
82 msg: str | None = None,
83 ):
84 super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg)
85 self.simple_task_instance = simple_task_instance
86 self.is_failure_callback = is_failure_callback
88 def to_json(self) -> str:
89 from airflow.serialization.serialized_objects import BaseSerialization
91 val = BaseSerialization.serialize(self.__dict__, strict=True)
92 return json.dumps(val)
94 @classmethod
95 def from_json(cls, json_str: str):
96 from airflow.serialization.serialized_objects import BaseSerialization
98 val = json.loads(json_str)
99 return cls(**BaseSerialization.deserialize(val))
102class DagCallbackRequest(CallbackRequest):
103 """
104 A Class with information about the success/failure DAG callback to be executed.
106 :param full_filepath: File Path to use to run the callback
107 :param dag_id: DAG ID
108 :param run_id: Run ID for the DagRun
109 :param processor_subdir: Directory used by Dag Processor when parsed the dag.
110 :param is_failure_callback: Flag to determine whether it is a Failure Callback or Success Callback
111 :param msg: Additional Message that can be used for logging
112 """
114 def __init__(
115 self,
116 full_filepath: str,
117 dag_id: str,
118 run_id: str,
119 processor_subdir: str | None,
120 is_failure_callback: bool | None = True,
121 msg: str | None = None,
122 ):
123 super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg)
124 self.dag_id = dag_id
125 self.run_id = run_id
126 self.is_failure_callback = is_failure_callback
129class SlaCallbackRequest(CallbackRequest):
130 """
131 A class with information about the SLA callback to be executed.
133 :param full_filepath: File Path to use to run the callback
134 :param dag_id: DAG ID
135 :param processor_subdir: Directory used by Dag Processor when parsed the dag.
136 """
138 def __init__(
139 self,
140 full_filepath: str,
141 dag_id: str,
142 processor_subdir: str | None,
143 msg: str | None = None,
144 ):
145 super().__init__(full_filepath, processor_subdir=processor_subdir, msg=msg)
146 self.dag_id = dag_id