Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/triggers/base.py: 57%

28 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import abc 

20from typing import Any, AsyncIterator 

21 

22from airflow.utils.log.logging_mixin import LoggingMixin 

23 

24 

25class BaseTrigger(abc.ABC, LoggingMixin): 

26 """ 

27 Base class for all triggers. 

28 

29 A trigger has two contexts it can exist in: 

30 

31 - Inside an Operator, when it's passed to TaskDeferred 

32 - Actively running in a trigger worker 

33 

34 We use the same class for both situations, and rely on all Trigger classes 

35 to be able to return the (Airflow-JSON-encodable) arguments that will 

36 let them be re-instantiated elsewhere. 

37 """ 

38 

39 def __init__(self, **kwargs): 

40 pass 

41 

42 @abc.abstractmethod 

43 def serialize(self) -> tuple[str, dict[str, Any]]: 

44 """ 

45 Returns the information needed to reconstruct this Trigger. 

46 

47 :return: Tuple of (class path, keyword arguments needed to re-instantiate). 

48 """ 

49 raise NotImplementedError("Triggers must implement serialize()") 

50 

51 @abc.abstractmethod 

52 async def run(self) -> AsyncIterator["TriggerEvent"]: 

53 """ 

54 Runs the trigger in an asynchronous context. 

55 

56 The trigger should yield an Event whenever it wants to fire off 

57 an event, and return None if it is finished. Single-event triggers 

58 should thus yield and then immediately return. 

59 

60 If it yields, it is likely that it will be resumed very quickly, 

61 but it may not be (e.g. if the workload is being moved to another 

62 triggerer process, or a multi-event trigger was being used for a 

63 single-event task defer). 

64 

65 In either case, Trigger classes should assume they will be persisted, 

66 and then rely on cleanup() being called when they are no longer needed. 

67 """ 

68 raise NotImplementedError("Triggers must implement run()") 

69 yield # To convince Mypy this is an async iterator. 

70 

71 def cleanup(self) -> None: 

72 """ 

73 Cleanup the trigger. 

74 

75 Called when the trigger is no longer needed, and it's being removed 

76 from the active triggerer process. 

77 """ 

78 

79 def __repr__(self) -> str: 

80 classpath, kwargs = self.serialize() 

81 kwargs_str = ", ".join(f"{k}={v}" for k, v in kwargs.items()) 

82 return f"<{classpath} {kwargs_str}>" 

83 

84 

85class TriggerEvent: 

86 """ 

87 Something that a trigger can fire when its conditions are met. 

88 

89 Events must have a uniquely identifying value that would be the same 

90 wherever the trigger is run; this is to ensure that if the same trigger 

91 is being run in two locations (for HA reasons) that we can deduplicate its 

92 events. 

93 """ 

94 

95 def __init__(self, payload: Any): 

96 self.payload = payload 

97 

98 def __repr__(self) -> str: 

99 return f"TriggerEvent<{self.payload!r}>" 

100 

101 def __eq__(self, other): 

102 if isinstance(other, TriggerEvent): 

103 return other.payload == self.payload 

104 return False