Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/system/providers/google/cloud/tasks/example_queue.py: 26%

39 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18""" 

19Example Airflow DAG that creates, gets, lists, updates, purges, pauses, resumes 

20and deletes Queues in the Google Cloud Tasks service in the Google Cloud. 

21""" 

22from __future__ import annotations 

23 

24import os 

25from datetime import datetime 

26 

27from google.api_core.retry import Retry 

28from google.cloud.tasks_v2.types import Queue 

29from google.protobuf.field_mask_pb2 import FieldMask 

30 

31from airflow import models 

32from airflow.decorators import task 

33from airflow.models.baseoperator import chain 

34from airflow.operators.bash import BashOperator 

35from airflow.providers.google.cloud.operators.tasks import ( 

36 CloudTasksQueueCreateOperator, 

37 CloudTasksQueueDeleteOperator, 

38 CloudTasksQueueGetOperator, 

39 CloudTasksQueuePauseOperator, 

40 CloudTasksQueuePurgeOperator, 

41 CloudTasksQueueResumeOperator, 

42 CloudTasksQueuesListOperator, 

43 CloudTasksQueueUpdateOperator, 

44) 

45from airflow.utils.trigger_rule import TriggerRule 

46 

47ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") 

48DAG_ID = "cloud_tasks_queue" 

49 

50LOCATION = "europe-west2" 

51QUEUE_ID = f"queue-{ENV_ID}-{DAG_ID.replace('_', '-')}" 

52 

53 

54with models.DAG( 

55 dag_id=DAG_ID, 

56 schedule="@once", 

57 start_date=datetime(2021, 1, 1), 

58 catchup=False, 

59 tags=["example", "tasks"], 

60) as dag: 

61 

62 @task(task_id="random_string") 

63 def generate_random_string(): 

64 """ 

65 Generate random string for queue and task names. 

66 Queue name cannot be repeated in preceding 7 days and 

67 task name in the last 1 hour. 

68 """ 

69 import random 

70 import string 

71 

72 return "".join(random.choices(string.ascii_uppercase + string.digits, k=8)) 

73 

74 random_string = generate_random_string() 

75 

76 # [START create_queue] 

77 create_queue = CloudTasksQueueCreateOperator( 

78 location=LOCATION, 

79 task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)), 

80 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}", 

81 retry=Retry(maximum=10.0), 

82 timeout=5, 

83 task_id="create_queue", 

84 ) 

85 # [END create_queue] 

86 

87 # [START delete_queue] 

88 delete_queue = CloudTasksQueueDeleteOperator( 

89 location=LOCATION, 

90 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}", 

91 task_id="delete_queue", 

92 ) 

93 # [END delete_queue] 

94 delete_queue.trigger_rule = TriggerRule.ALL_DONE 

95 

96 # [START resume_queue] 

97 resume_queue = CloudTasksQueueResumeOperator( 

98 location=LOCATION, 

99 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}", 

100 task_id="resume_queue", 

101 ) 

102 # [END resume_queue] 

103 

104 # [START pause_queue] 

105 pause_queue = CloudTasksQueuePauseOperator( 

106 location=LOCATION, 

107 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}", 

108 task_id="pause_queue", 

109 ) 

110 # [END pause_queue] 

111 

112 # [START purge_queue] 

113 purge_queue = CloudTasksQueuePurgeOperator( 

114 location=LOCATION, 

115 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}", 

116 task_id="purge_queue", 

117 ) 

118 # [END purge_queue] 

119 

120 # [START get_queue] 

121 get_queue = CloudTasksQueueGetOperator( 

122 location=LOCATION, 

123 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}", 

124 task_id="get_queue", 

125 ) 

126 

127 get_queue_result = BashOperator( 

128 task_id="get_queue_result", 

129 bash_command=f"echo {get_queue.output}", 

130 ) 

131 # [END get_queue] 

132 

133 # [START update_queue] 

134 update_queue = CloudTasksQueueUpdateOperator( 

135 task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)), 

136 location=LOCATION, 

137 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}", 

138 update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]), 

139 task_id="update_queue", 

140 ) 

141 # [END update_queue] 

142 

143 # [START list_queue] 

144 list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue") 

145 # [END list_queue] 

146 

147 chain( 

148 random_string, 

149 create_queue, 

150 update_queue, 

151 pause_queue, 

152 resume_queue, 

153 purge_queue, 

154 get_queue, 

155 get_queue_result, 

156 list_queue, 

157 delete_queue, 

158 ) 

159 

160 from tests.system.utils.watcher import watcher 

161 

162 # This test needs watcher in order to properly mark success/failure 

163 # when "tearDown" task with trigger rule is part of the DAG 

164 list(dag.tasks) >> watcher() 

165 

166 

167from tests.system.utils import get_test_run # noqa: E402 

168 

169# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) 

170test_run = get_test_run(dag)