Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/execution_time/sentry/noop.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

18 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18 

19from __future__ import annotations 

20 

21from typing import TYPE_CHECKING, Protocol 

22 

23if TYPE_CHECKING: 

24 from structlog.typing import FilteringBoundLogger as Logger 

25 

26 from airflow.sdk import Context 

27 from airflow.sdk.execution_time.comms import ToSupervisor 

28 from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance 

29 from airflow.sdk.types import DagRunProtocol, RuntimeTaskInstanceProtocol, TaskInstanceState 

30 

31 RunReturn = tuple[TaskInstanceState, ToSupervisor | None, BaseException | None] 

32 

33 class Run(Protocol): 

34 def __call__(self, ti: RuntimeTaskInstance, context: Context, log: Logger) -> RunReturn: ... 

35 

36 

37class NoopSentry: 

38 """Blank class for Sentry.""" 

39 

40 def add_tagging(self, dag_run: DagRunProtocol, task_instance: RuntimeTaskInstanceProtocol) -> None: 

41 """Blank function for tagging.""" 

42 

43 def add_breadcrumbs(self, task_instance: RuntimeTaskInstanceProtocol) -> None: 

44 """Blank function for breadcrumbs.""" 

45 

46 def enrich_errors(self, run: Run) -> Run: 

47 """Blank function for formatting a TaskInstance._run_raw_task.""" 

48 return run 

49 

50 def flush(self) -> None: 

51 """Blank function for flushing errors."""