Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/operators/branch.py: 65%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Branching operators."""
20from __future__ import annotations
22from typing import TYPE_CHECKING, Iterable
24from airflow.models.baseoperator import BaseOperator
25from airflow.models.skipmixin import SkipMixin
27if TYPE_CHECKING:
28 from airflow.utils.context import Context
31class BranchMixIn(SkipMixin):
32 """Utility helper which handles the branching as one-liner."""
34 def do_branch(self, context: Context, branches_to_execute: str | Iterable[str]) -> str | Iterable[str]:
35 """Implement the handling of branching including logging."""
36 self.log.info("Branch into %s", branches_to_execute)
37 self.skip_all_except(context["ti"], branches_to_execute)
38 return branches_to_execute
41class BaseBranchOperator(BaseOperator, BranchMixIn):
42 """
43 A base class for creating operators with branching functionality, like to BranchPythonOperator.
45 Users should create a subclass from this operator and implement the function
46 `choose_branch(self, context)`. This should run whatever business logic
47 is needed to determine the branch, and return either the task_id for
48 a single task (as a str) or a list of task_ids.
50 The operator will continue with the returned task_id(s), and all other
51 tasks directly downstream of this operator will be skipped.
52 """
54 def choose_branch(self, context: Context) -> str | Iterable[str]:
55 """
56 Abstract method to choose which branch to run.
58 Subclasses should implement this, running whatever logic is
59 necessary to choose a branch and returning a task_id or list of
60 task_ids.
62 :param context: Context dictionary as passed to execute()
63 """
64 raise NotImplementedError
66 def execute(self, context: Context):
67 return self.do_branch(context, self.choose_branch(context))