Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/providers/celery/sensors/celery_queue.py: 17%

30 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING 

21 

22from celery.app import control 

23 

24from airflow.sensors.base import BaseSensorOperator 

25 

26if TYPE_CHECKING: 

27 from airflow.utils.context import Context 

28 

29 

30class CeleryQueueSensor(BaseSensorOperator): 

31 """ 

32 Waits for a Celery queue to be empty. By default, in order to be considered 

33 empty, the queue must not have any tasks in the ``reserved``, ``scheduled`` 

34 or ``active`` states. 

35 

36 :param celery_queue: The name of the Celery queue to wait for. 

37 :param target_task_id: Task id for checking 

38 """ 

39 

40 def __init__(self, *, celery_queue: str, target_task_id: str | None = None, **kwargs) -> None: 

41 

42 super().__init__(**kwargs) 

43 self.celery_queue = celery_queue 

44 self.target_task_id = target_task_id 

45 

46 def _check_task_id(self, context: Context) -> bool: 

47 """ 

48 Gets the returned Celery result from the Airflow task 

49 ID provided to the sensor, and returns True if the 

50 celery result has been finished execution. 

51 

52 :param context: Airflow's execution context 

53 :return: True if task has been executed, otherwise False 

54 """ 

55 ti = context["ti"] 

56 celery_result = ti.xcom_pull(task_ids=self.target_task_id) 

57 return celery_result.ready() 

58 

59 def poke(self, context: Context) -> bool: 

60 

61 if self.target_task_id: 

62 return self._check_task_id(context) 

63 

64 inspect_result = control.Inspect() 

65 reserved = inspect_result.reserved() 

66 scheduled = inspect_result.scheduled() 

67 active = inspect_result.active() 

68 

69 try: 

70 reserved = len(reserved[self.celery_queue]) 

71 scheduled = len(scheduled[self.celery_queue]) 

72 active = len(active[self.celery_queue]) 

73 

74 self.log.info("Checking if celery queue %s is empty.", self.celery_queue) 

75 

76 return reserved == 0 and scheduled == 0 and active == 0 

77 except KeyError: 

78 raise KeyError(f"Could not locate Celery queue {self.celery_queue}")