Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/system/providers/google/cloud/tasks/example_queue.py: 26%
39 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""
19Example Airflow DAG that creates, gets, lists, updates, purges, pauses, resumes
20and deletes Queues in the Google Cloud Tasks service in the Google Cloud.
21"""
22from __future__ import annotations
24import os
25from datetime import datetime
27from google.api_core.retry import Retry
28from google.cloud.tasks_v2.types import Queue
29from google.protobuf.field_mask_pb2 import FieldMask
31from airflow import models
32from airflow.decorators import task
33from airflow.models.baseoperator import chain
34from airflow.operators.bash import BashOperator
35from airflow.providers.google.cloud.operators.tasks import (
36 CloudTasksQueueCreateOperator,
37 CloudTasksQueueDeleteOperator,
38 CloudTasksQueueGetOperator,
39 CloudTasksQueuePauseOperator,
40 CloudTasksQueuePurgeOperator,
41 CloudTasksQueueResumeOperator,
42 CloudTasksQueuesListOperator,
43 CloudTasksQueueUpdateOperator,
44)
45from airflow.utils.trigger_rule import TriggerRule
47ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
48DAG_ID = "cloud_tasks_queue"
50LOCATION = "europe-west2"
51QUEUE_ID = f"queue-{ENV_ID}-{DAG_ID.replace('_', '-')}"
54with models.DAG(
55 dag_id=DAG_ID,
56 schedule="@once",
57 start_date=datetime(2021, 1, 1),
58 catchup=False,
59 tags=["example", "tasks"],
60) as dag:
62 @task(task_id="random_string")
63 def generate_random_string():
64 """
65 Generate random string for queue and task names.
66 Queue name cannot be repeated in preceding 7 days and
67 task name in the last 1 hour.
68 """
69 import random
70 import string
72 return "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
74 random_string = generate_random_string()
76 # [START create_queue]
77 create_queue = CloudTasksQueueCreateOperator(
78 location=LOCATION,
79 task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
80 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
81 retry=Retry(maximum=10.0),
82 timeout=5,
83 task_id="create_queue",
84 )
85 # [END create_queue]
87 # [START delete_queue]
88 delete_queue = CloudTasksQueueDeleteOperator(
89 location=LOCATION,
90 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
91 task_id="delete_queue",
92 )
93 # [END delete_queue]
94 delete_queue.trigger_rule = TriggerRule.ALL_DONE
96 # [START resume_queue]
97 resume_queue = CloudTasksQueueResumeOperator(
98 location=LOCATION,
99 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
100 task_id="resume_queue",
101 )
102 # [END resume_queue]
104 # [START pause_queue]
105 pause_queue = CloudTasksQueuePauseOperator(
106 location=LOCATION,
107 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
108 task_id="pause_queue",
109 )
110 # [END pause_queue]
112 # [START purge_queue]
113 purge_queue = CloudTasksQueuePurgeOperator(
114 location=LOCATION,
115 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
116 task_id="purge_queue",
117 )
118 # [END purge_queue]
120 # [START get_queue]
121 get_queue = CloudTasksQueueGetOperator(
122 location=LOCATION,
123 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
124 task_id="get_queue",
125 )
127 get_queue_result = BashOperator(
128 task_id="get_queue_result",
129 bash_command=f"echo {get_queue.output}",
130 )
131 # [END get_queue]
133 # [START update_queue]
134 update_queue = CloudTasksQueueUpdateOperator(
135 task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
136 location=LOCATION,
137 queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
138 update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
139 task_id="update_queue",
140 )
141 # [END update_queue]
143 # [START list_queue]
144 list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
145 # [END list_queue]
147 chain(
148 random_string,
149 create_queue,
150 update_queue,
151 pause_queue,
152 resume_queue,
153 purge_queue,
154 get_queue,
155 get_queue_result,
156 list_queue,
157 delete_queue,
158 )
160 from tests.system.utils.watcher import watcher
162 # This test needs watcher in order to properly mark success/failure
163 # when "tearDown" task with trigger rule is part of the DAG
164 list(dag.tasks) >> watcher()
167from tests.system.utils import get_test_run # noqa: E402
169# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
170test_run = get_test_run(dag)