Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/providers/openlineage/plugins/macros.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

15 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING 

20 

21from airflow.providers.openlineage import conf 

22from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter 

23from airflow.providers.openlineage.utils.utils import get_job_name 

24 

25if TYPE_CHECKING: 

26 from airflow.models import TaskInstance 

27 

28 

29def lineage_job_namespace(): 

30 """ 

31 Macro function which returns Airflow OpenLineage namespace. 

32 

33 .. seealso:: 

34 For more information take a look at the guide: 

35 :ref:`howto/macros:openlineage` 

36 """ 

37 return conf.namespace() 

38 

39 

40def lineage_job_name(task_instance: TaskInstance): 

41 """ 

42 Macro function which returns Airflow task name in OpenLineage format (`<dag_id>.<task_id>`). 

43 

44 .. seealso:: 

45 For more information take a look at the guide: 

46 :ref:`howto/macros:openlineage` 

47 """ 

48 return get_job_name(task_instance) 

49 

50 

51def lineage_run_id(task_instance: TaskInstance): 

52 """ 

53 Macro function which returns the generated run id (UUID) for a given task. 

54 

55 This can be used to forward the run id from a task to a child run so the job hierarchy is preserved. 

56 

57 .. seealso:: 

58 For more information take a look at the guide: 

59 :ref:`howto/macros:openlineage` 

60 """ 

61 return OpenLineageAdapter.build_task_instance_run_id( 

62 dag_id=task_instance.dag_id, 

63 task_id=task_instance.task_id, 

64 execution_date=task_instance.execution_date, 

65 try_number=task_instance.try_number, 

66 ) 

67 

68 

69def lineage_parent_id(task_instance: TaskInstance): 

70 """ 

71 Macro function which returns a unique identifier of given task that can be used to create ParentRunFacet. 

72 

73 This identifier is composed of the namespace, job name, and generated run id for given task, structured 

74 as '{namespace}/{job_name}/{run_id}'. This can be used to forward task information from a task to a child 

75 run so the job hierarchy is preserved. Child run can easily create ParentRunFacet from these information. 

76 

77 .. seealso:: 

78 For more information take a look at the guide: 

79 :ref:`howto/macros:openlineage` 

80 """ 

81 return "/".join( 

82 ( 

83 lineage_job_namespace(), 

84 lineage_job_name(task_instance), 

85 lineage_run_id(task_instance), 

86 ) 

87 )