Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/providers/databricks/operators/test_databricks_repos.py: 0%
92 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from unittest import mock
22import pytest
24from airflow import AirflowException
25from airflow.providers.databricks.operators.databricks_repos import (
26 DatabricksReposCreateOperator,
27 DatabricksReposDeleteOperator,
28 DatabricksReposUpdateOperator,
29)
31TASK_ID = "databricks-operator"
32DEFAULT_CONN_ID = "databricks_default"
35class TestDatabricksReposUpdateOperator:
36 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook")
37 def test_update_with_id(self, db_mock_class):
38 """
39 Test the execute function using Repo ID.
40 """
41 op = DatabricksReposUpdateOperator(task_id=TASK_ID, branch="releases", repo_id="123")
42 db_mock = db_mock_class.return_value
43 db_mock.update_repo.return_value = {"head_commit_id": "123456"}
45 op.execute(None)
47 db_mock_class.assert_called_once_with(
48 DEFAULT_CONN_ID,
49 retry_limit=op.databricks_retry_limit,
50 retry_delay=op.databricks_retry_delay,
51 caller="DatabricksReposUpdateOperator",
52 )
54 db_mock.update_repo.assert_called_once_with("123", {"branch": "releases"})
56 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook")
57 def test_update_with_path(self, db_mock_class):
58 """
59 Test the execute function using Repo path.
60 """
61 op = DatabricksReposUpdateOperator(
62 task_id=TASK_ID, tag="v1.0.0", repo_path="/Repos/user@domain.com/test-repo"
63 )
64 db_mock = db_mock_class.return_value
65 db_mock.get_repo_by_path.return_value = "123"
66 db_mock.update_repo.return_value = {"head_commit_id": "123456"}
68 op.execute(None)
70 db_mock_class.assert_called_once_with(
71 DEFAULT_CONN_ID,
72 retry_limit=op.databricks_retry_limit,
73 retry_delay=op.databricks_retry_delay,
74 caller="DatabricksReposUpdateOperator",
75 )
77 db_mock.update_repo.assert_called_once_with("123", {"tag": "v1.0.0"})
79 def test_init_exception(self):
80 """
81 Tests handling of incorrect parameters passed to ``__init__``
82 """
83 with pytest.raises(
84 AirflowException, match="Only one of repo_id or repo_path should be provided, but not both"
85 ):
86 DatabricksReposUpdateOperator(task_id=TASK_ID, repo_id="abc", repo_path="path", branch="abc")
88 with pytest.raises(AirflowException, match="One of repo_id or repo_path should be provided"):
89 DatabricksReposUpdateOperator(task_id=TASK_ID, branch="abc")
91 with pytest.raises(
92 AirflowException, match="Only one of branch or tag should be provided, but not both"
93 ):
94 DatabricksReposUpdateOperator(task_id=TASK_ID, repo_id="123", branch="123", tag="123")
96 with pytest.raises(AirflowException, match="One of branch or tag should be provided"):
97 DatabricksReposUpdateOperator(task_id=TASK_ID, repo_id="123")
100class TestDatabricksReposDeleteOperator:
101 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook")
102 def test_delete_with_id(self, db_mock_class):
103 """
104 Test the execute function using Repo ID.
105 """
106 op = DatabricksReposDeleteOperator(task_id=TASK_ID, repo_id="123")
107 db_mock = db_mock_class.return_value
108 db_mock.delete_repo.return_value = None
110 op.execute(None)
112 db_mock_class.assert_called_once_with(
113 DEFAULT_CONN_ID,
114 retry_limit=op.databricks_retry_limit,
115 retry_delay=op.databricks_retry_delay,
116 caller="DatabricksReposDeleteOperator",
117 )
119 db_mock.delete_repo.assert_called_once_with("123")
121 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook")
122 def test_delete_with_path(self, db_mock_class):
123 """
124 Test the execute function using Repo path.
125 """
126 op = DatabricksReposDeleteOperator(task_id=TASK_ID, repo_path="/Repos/user@domain.com/test-repo")
127 db_mock = db_mock_class.return_value
128 db_mock.get_repo_by_path.return_value = "123"
129 db_mock.delete_repo.return_value = None
131 op.execute(None)
133 db_mock_class.assert_called_once_with(
134 DEFAULT_CONN_ID,
135 retry_limit=op.databricks_retry_limit,
136 retry_delay=op.databricks_retry_delay,
137 caller="DatabricksReposDeleteOperator",
138 )
140 db_mock.delete_repo.assert_called_once_with("123")
142 def test_init_exception(self):
143 """
144 Tests handling of incorrect parameters passed to ``__init__``
145 """
146 with pytest.raises(
147 AirflowException, match="Only one of repo_id or repo_path should be provided, but not both"
148 ):
149 DatabricksReposDeleteOperator(task_id=TASK_ID, repo_id="abc", repo_path="path")
151 with pytest.raises(AirflowException, match="One of repo_id repo_path tag should be provided"):
152 DatabricksReposDeleteOperator(task_id=TASK_ID)
155class TestDatabricksReposCreateOperator:
156 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook")
157 def test_create_plus_checkout(self, db_mock_class):
158 """
159 Test the execute function creating new Repo.
160 """
161 git_url = "https://github.com/test/test"
162 repo_path = "/Repos/Project1/test-repo"
163 op = DatabricksReposCreateOperator(
164 task_id=TASK_ID, git_url=git_url, repo_path=repo_path, branch="releases"
165 )
166 db_mock = db_mock_class.return_value
167 db_mock.update_repo.return_value = {"head_commit_id": "123456"}
168 db_mock.create_repo.return_value = {"id": "123", "branch": "main"}
169 db_mock.get_repo_by_path.return_value = None
171 op.execute(None)
173 db_mock_class.assert_called_once_with(
174 DEFAULT_CONN_ID,
175 retry_limit=op.databricks_retry_limit,
176 retry_delay=op.databricks_retry_delay,
177 caller="DatabricksReposCreateOperator",
178 )
180 db_mock.create_repo.assert_called_once_with({"url": git_url, "provider": "gitHub", "path": repo_path})
181 db_mock.update_repo.assert_called_once_with("123", {"branch": "releases"})
183 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook")
184 def test_create_ignore_existing_plus_checkout(self, db_mock_class):
185 """
186 Test the execute function creating new Repo.
187 """
188 git_url = "https://github.com/test/test"
189 repo_path = "/Repos/Project1/test-repo"
190 op = DatabricksReposCreateOperator(
191 task_id=TASK_ID,
192 git_url=git_url,
193 repo_path=repo_path,
194 branch="releases",
195 ignore_existing_repo=True,
196 )
197 db_mock = db_mock_class.return_value
198 db_mock.update_repo.return_value = {"head_commit_id": "123456"}
199 db_mock.get_repo_by_path.return_value = "123"
201 op.execute(None)
203 db_mock_class.assert_called_once_with(
204 DEFAULT_CONN_ID,
205 retry_limit=op.databricks_retry_limit,
206 retry_delay=op.databricks_retry_delay,
207 caller="DatabricksReposCreateOperator",
208 )
210 db_mock.get_repo_by_path.assert_called_once_with(repo_path)
211 db_mock.update_repo.assert_called_once_with("123", {"branch": "releases"})
213 def test_init_exception(self):
214 """
215 Tests handling of incorrect parameters passed to ``__init__``
216 """
217 git_url = "https://github.com/test/test"
218 repo_path = "/Repos/test-repo"
219 exception_message = (
220 f"repo_path should have form of /Repos/{{folder}}/{{repo-name}}, got '{repo_path}'"
221 )
223 with pytest.raises(AirflowException, match=exception_message):
224 op = DatabricksReposCreateOperator(task_id=TASK_ID, git_url=git_url, repo_path=repo_path)
225 op.execute(None)
227 with pytest.raises(
228 AirflowException, match="Only one of branch or tag should be provided, but not both"
229 ):
230 DatabricksReposCreateOperator(task_id=TASK_ID, git_url=git_url, branch="123", tag="123")