1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import uuid
20from unittest import mock
21
22from airflow.providers.openlineage.conf import namespace
23from airflow.providers.openlineage.plugins.macros import (
24 lineage_job_name,
25 lineage_job_namespace,
26 lineage_parent_id,
27 lineage_run_id,
28)
29
30_DAG_NAMESPACE = namespace()
31
32
33def test_lineage_job_namespace():
34 assert lineage_job_namespace() == _DAG_NAMESPACE
35
36
37def test_lineage_job_name():
38 task_instance = mock.MagicMock(
39 dag_id="dag_id",
40 task_id="task_id",
41 execution_date="execution_date",
42 try_number=1,
43 )
44 assert lineage_job_name(task_instance) == "dag_id.task_id"
45
46
47def test_lineage_run_id():
48 task_instance = mock.MagicMock(
49 dag_id="dag_id",
50 task_id="task_id",
51 execution_date="execution_date",
52 try_number=1,
53 )
54 actual = lineage_run_id(task_instance)
55 expected = str(
56 uuid.uuid3(
57 uuid.NAMESPACE_URL,
58 f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
59 )
60 )
61 assert actual == expected
62
63
64@mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id")
65def test_lineage_parent_id(mock_run_id):
66 mock_run_id.return_value = "run_id"
67 task_instance = mock.MagicMock(
68 dag_id="dag_id",
69 task_id="task_id",
70 execution_date="execution_date",
71 try_number=1,
72 )
73 actual = lineage_parent_id(task_instance)
74 expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id"
75 assert actual == expected