Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/providers/databricks/operators/test_databricks_repos.py: 0%

92 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from unittest import mock 

21 

22import pytest 

23 

24from airflow import AirflowException 

25from airflow.providers.databricks.operators.databricks_repos import ( 

26 DatabricksReposCreateOperator, 

27 DatabricksReposDeleteOperator, 

28 DatabricksReposUpdateOperator, 

29) 

30 

31TASK_ID = "databricks-operator" 

32DEFAULT_CONN_ID = "databricks_default" 

33 

34 

35class TestDatabricksReposUpdateOperator: 

36 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook") 

37 def test_update_with_id(self, db_mock_class): 

38 """ 

39 Test the execute function using Repo ID. 

40 """ 

41 op = DatabricksReposUpdateOperator(task_id=TASK_ID, branch="releases", repo_id="123") 

42 db_mock = db_mock_class.return_value 

43 db_mock.update_repo.return_value = {"head_commit_id": "123456"} 

44 

45 op.execute(None) 

46 

47 db_mock_class.assert_called_once_with( 

48 DEFAULT_CONN_ID, 

49 retry_limit=op.databricks_retry_limit, 

50 retry_delay=op.databricks_retry_delay, 

51 caller="DatabricksReposUpdateOperator", 

52 ) 

53 

54 db_mock.update_repo.assert_called_once_with("123", {"branch": "releases"}) 

55 

56 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook") 

57 def test_update_with_path(self, db_mock_class): 

58 """ 

59 Test the execute function using Repo path. 

60 """ 

61 op = DatabricksReposUpdateOperator( 

62 task_id=TASK_ID, tag="v1.0.0", repo_path="/Repos/user@domain.com/test-repo" 

63 ) 

64 db_mock = db_mock_class.return_value 

65 db_mock.get_repo_by_path.return_value = "123" 

66 db_mock.update_repo.return_value = {"head_commit_id": "123456"} 

67 

68 op.execute(None) 

69 

70 db_mock_class.assert_called_once_with( 

71 DEFAULT_CONN_ID, 

72 retry_limit=op.databricks_retry_limit, 

73 retry_delay=op.databricks_retry_delay, 

74 caller="DatabricksReposUpdateOperator", 

75 ) 

76 

77 db_mock.update_repo.assert_called_once_with("123", {"tag": "v1.0.0"}) 

78 

79 def test_init_exception(self): 

80 """ 

81 Tests handling of incorrect parameters passed to ``__init__`` 

82 """ 

83 with pytest.raises( 

84 AirflowException, match="Only one of repo_id or repo_path should be provided, but not both" 

85 ): 

86 DatabricksReposUpdateOperator(task_id=TASK_ID, repo_id="abc", repo_path="path", branch="abc") 

87 

88 with pytest.raises(AirflowException, match="One of repo_id or repo_path should be provided"): 

89 DatabricksReposUpdateOperator(task_id=TASK_ID, branch="abc") 

90 

91 with pytest.raises( 

92 AirflowException, match="Only one of branch or tag should be provided, but not both" 

93 ): 

94 DatabricksReposUpdateOperator(task_id=TASK_ID, repo_id="123", branch="123", tag="123") 

95 

96 with pytest.raises(AirflowException, match="One of branch or tag should be provided"): 

97 DatabricksReposUpdateOperator(task_id=TASK_ID, repo_id="123") 

98 

99 

100class TestDatabricksReposDeleteOperator: 

101 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook") 

102 def test_delete_with_id(self, db_mock_class): 

103 """ 

104 Test the execute function using Repo ID. 

105 """ 

106 op = DatabricksReposDeleteOperator(task_id=TASK_ID, repo_id="123") 

107 db_mock = db_mock_class.return_value 

108 db_mock.delete_repo.return_value = None 

109 

110 op.execute(None) 

111 

112 db_mock_class.assert_called_once_with( 

113 DEFAULT_CONN_ID, 

114 retry_limit=op.databricks_retry_limit, 

115 retry_delay=op.databricks_retry_delay, 

116 caller="DatabricksReposDeleteOperator", 

117 ) 

118 

119 db_mock.delete_repo.assert_called_once_with("123") 

120 

121 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook") 

122 def test_delete_with_path(self, db_mock_class): 

123 """ 

124 Test the execute function using Repo path. 

125 """ 

126 op = DatabricksReposDeleteOperator(task_id=TASK_ID, repo_path="/Repos/user@domain.com/test-repo") 

127 db_mock = db_mock_class.return_value 

128 db_mock.get_repo_by_path.return_value = "123" 

129 db_mock.delete_repo.return_value = None 

130 

131 op.execute(None) 

132 

133 db_mock_class.assert_called_once_with( 

134 DEFAULT_CONN_ID, 

135 retry_limit=op.databricks_retry_limit, 

136 retry_delay=op.databricks_retry_delay, 

137 caller="DatabricksReposDeleteOperator", 

138 ) 

139 

140 db_mock.delete_repo.assert_called_once_with("123") 

141 

142 def test_init_exception(self): 

143 """ 

144 Tests handling of incorrect parameters passed to ``__init__`` 

145 """ 

146 with pytest.raises( 

147 AirflowException, match="Only one of repo_id or repo_path should be provided, but not both" 

148 ): 

149 DatabricksReposDeleteOperator(task_id=TASK_ID, repo_id="abc", repo_path="path") 

150 

151 with pytest.raises(AirflowException, match="One of repo_id repo_path tag should be provided"): 

152 DatabricksReposDeleteOperator(task_id=TASK_ID) 

153 

154 

155class TestDatabricksReposCreateOperator: 

156 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook") 

157 def test_create_plus_checkout(self, db_mock_class): 

158 """ 

159 Test the execute function creating new Repo. 

160 """ 

161 git_url = "https://github.com/test/test" 

162 repo_path = "/Repos/Project1/test-repo" 

163 op = DatabricksReposCreateOperator( 

164 task_id=TASK_ID, git_url=git_url, repo_path=repo_path, branch="releases" 

165 ) 

166 db_mock = db_mock_class.return_value 

167 db_mock.update_repo.return_value = {"head_commit_id": "123456"} 

168 db_mock.create_repo.return_value = {"id": "123", "branch": "main"} 

169 db_mock.get_repo_by_path.return_value = None 

170 

171 op.execute(None) 

172 

173 db_mock_class.assert_called_once_with( 

174 DEFAULT_CONN_ID, 

175 retry_limit=op.databricks_retry_limit, 

176 retry_delay=op.databricks_retry_delay, 

177 caller="DatabricksReposCreateOperator", 

178 ) 

179 

180 db_mock.create_repo.assert_called_once_with({"url": git_url, "provider": "gitHub", "path": repo_path}) 

181 db_mock.update_repo.assert_called_once_with("123", {"branch": "releases"}) 

182 

183 @mock.patch("airflow.providers.databricks.operators.databricks_repos.DatabricksHook") 

184 def test_create_ignore_existing_plus_checkout(self, db_mock_class): 

185 """ 

186 Test the execute function creating new Repo. 

187 """ 

188 git_url = "https://github.com/test/test" 

189 repo_path = "/Repos/Project1/test-repo" 

190 op = DatabricksReposCreateOperator( 

191 task_id=TASK_ID, 

192 git_url=git_url, 

193 repo_path=repo_path, 

194 branch="releases", 

195 ignore_existing_repo=True, 

196 ) 

197 db_mock = db_mock_class.return_value 

198 db_mock.update_repo.return_value = {"head_commit_id": "123456"} 

199 db_mock.get_repo_by_path.return_value = "123" 

200 

201 op.execute(None) 

202 

203 db_mock_class.assert_called_once_with( 

204 DEFAULT_CONN_ID, 

205 retry_limit=op.databricks_retry_limit, 

206 retry_delay=op.databricks_retry_delay, 

207 caller="DatabricksReposCreateOperator", 

208 ) 

209 

210 db_mock.get_repo_by_path.assert_called_once_with(repo_path) 

211 db_mock.update_repo.assert_called_once_with("123", {"branch": "releases"}) 

212 

213 def test_init_exception(self): 

214 """ 

215 Tests handling of incorrect parameters passed to ``__init__`` 

216 """ 

217 git_url = "https://github.com/test/test" 

218 repo_path = "/Repos/test-repo" 

219 exception_message = ( 

220 f"repo_path should have form of /Repos/{{folder}}/{{repo-name}}, got '{repo_path}'" 

221 ) 

222 

223 with pytest.raises(AirflowException, match=exception_message): 

224 op = DatabricksReposCreateOperator(task_id=TASK_ID, git_url=git_url, repo_path=repo_path) 

225 op.execute(None) 

226 

227 with pytest.raises( 

228 AirflowException, match="Only one of branch or tag should be provided, but not both" 

229 ): 

230 DatabricksReposCreateOperator(task_id=TASK_ID, git_url=git_url, branch="123", tag="123")