Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/triggers/base.py: 55%
31 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import abc
20from typing import Any, AsyncIterator
22from airflow.utils.log.logging_mixin import LoggingMixin
25class BaseTrigger(abc.ABC, LoggingMixin):
26 """
27 Base class for all triggers.
29 A trigger has two contexts it can exist in:
31 - Inside an Operator, when it's passed to TaskDeferred
32 - Actively running in a trigger worker
34 We use the same class for both situations, and rely on all Trigger classes
35 to be able to return the arguments (possible to encode with Airflow-JSON) that will
36 let them be re-instantiated elsewhere.
37 """
39 def __init__(self, **kwargs):
41 # these values are set by triggerer when preparing to run the instance
42 # when run, they are injected into logger record.
43 self.task_instance = None
44 self.trigger_id = None
46 def _set_context(self, context):
47 """
48 This method, part of LoggingMixin, is used mainly for configuration of logging
49 for tasks, but is not used for triggers.
50 """
51 raise NotImplementedError
53 @abc.abstractmethod
54 def serialize(self) -> tuple[str, dict[str, Any]]:
55 """
56 Returns the information needed to reconstruct this Trigger.
58 :return: Tuple of (class path, keyword arguments needed to re-instantiate).
59 """
60 raise NotImplementedError("Triggers must implement serialize()")
62 @abc.abstractmethod
63 async def run(self) -> AsyncIterator[TriggerEvent]:
64 """
65 Runs the trigger in an asynchronous context.
67 The trigger should yield an Event whenever it wants to fire off
68 an event, and return None if it is finished. Single-event triggers
69 should thus yield and then immediately return.
71 If it yields, it is likely that it will be resumed very quickly,
72 but it may not be (e.g. if the workload is being moved to another
73 triggerer process, or a multi-event trigger was being used for a
74 single-event task defer).
76 In either case, Trigger classes should assume they will be persisted,
77 and then rely on cleanup() being called when they are no longer needed.
78 """
79 raise NotImplementedError("Triggers must implement run()")
80 yield # To convince Mypy this is an async iterator.
82 async def cleanup(self) -> None:
83 """
84 Cleanup the trigger.
86 Called when the trigger is no longer needed, and it's being removed
87 from the active triggerer process.
89 This method follows the async/await pattern to allow to run the cleanup
90 in triggerer main event loop. Exceptions raised by the cleanup method
91 are ignored, so if you would like to be able to debug them and be notified
92 that cleanup method failed, you should wrap your code with try/except block
93 and handle it appropriately (in async-compatible way).
94 """
96 def __repr__(self) -> str:
97 classpath, kwargs = self.serialize()
98 kwargs_str = ", ".join(f"{k}={v}" for k, v in kwargs.items())
99 return f"<{classpath} {kwargs_str}>"
102class TriggerEvent:
103 """
104 Something that a trigger can fire when its conditions are met.
106 Events must have a uniquely identifying value that would be the same
107 wherever the trigger is run; this is to ensure that if the same trigger
108 is being run in two locations (for HA reasons) that we can deduplicate its
109 events.
110 """
112 def __init__(self, payload: Any):
113 self.payload = payload
115 def __repr__(self) -> str:
116 return f"TriggerEvent<{self.payload!r}>"
118 def __eq__(self, other):
119 if isinstance(other, TriggerEvent):
120 return other.payload == self.payload
121 return False