Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/providers/databricks/operators/test_databricks_copy.py: 29%

52 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import pytest 

21 

22from airflow import AirflowException 

23from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator 

24 

25DATE = "2017-04-20" 

26TASK_ID = "databricks-sql-operator" 

27DEFAULT_CONN_ID = "databricks_default" 

28COPY_FILE_LOCATION = "s3://my-bucket/jsonData" 

29 

30 

31def test_copy_with_files(): 

32 op = DatabricksCopyIntoOperator( 

33 file_location=COPY_FILE_LOCATION, 

34 file_format="JSON", 

35 table_name="test", 

36 files=["file1", "file2", "file3"], 

37 format_options={"dateFormat": "yyyy-MM-dd"}, 

38 task_id=TASK_ID, 

39 ) 

40 assert ( 

41 op._create_sql_query() 

42 == f"""COPY INTO test 

43FROM '{COPY_FILE_LOCATION}' 

44FILEFORMAT = JSON 

45FILES = ('file1','file2','file3') 

46FORMAT_OPTIONS ('dateFormat' = 'yyyy-MM-dd') 

47""".strip() 

48 ) 

49 

50 

51def test_copy_with_expression(): 

52 expression = "col1, col2" 

53 op = DatabricksCopyIntoOperator( 

54 file_location=COPY_FILE_LOCATION, 

55 file_format="CSV", 

56 table_name="test", 

57 task_id=TASK_ID, 

58 pattern="folder1/file_[a-g].csv", 

59 expression_list=expression, 

60 format_options={"header": "true"}, 

61 force_copy=True, 

62 ) 

63 assert ( 

64 op._create_sql_query() 

65 == f"""COPY INTO test 

66FROM (SELECT {expression} FROM '{COPY_FILE_LOCATION}') 

67FILEFORMAT = CSV 

68PATTERN = 'folder1/file_[a-g].csv' 

69FORMAT_OPTIONS ('header' = 'true') 

70COPY_OPTIONS ('force' = 'true') 

71""".strip() 

72 ) 

73 

74 

75def test_copy_with_credential(): 

76 expression = "col1, col2" 

77 op = DatabricksCopyIntoOperator( 

78 file_location=COPY_FILE_LOCATION, 

79 file_format="CSV", 

80 table_name="test", 

81 task_id=TASK_ID, 

82 expression_list=expression, 

83 credential={"AZURE_SAS_TOKEN": "abc"}, 

84 ) 

85 assert ( 

86 op._create_sql_query() 

87 == f"""COPY INTO test 

88FROM (SELECT {expression} FROM '{COPY_FILE_LOCATION}' WITH (CREDENTIAL (AZURE_SAS_TOKEN = 'abc') )) 

89FILEFORMAT = CSV 

90""".strip() 

91 ) 

92 

93 

94def test_copy_with_target_credential(): 

95 expression = "col1, col2" 

96 op = DatabricksCopyIntoOperator( 

97 file_location=COPY_FILE_LOCATION, 

98 file_format="CSV", 

99 table_name="test", 

100 task_id=TASK_ID, 

101 expression_list=expression, 

102 storage_credential="abc", 

103 credential={"AZURE_SAS_TOKEN": "abc"}, 

104 ) 

105 assert ( 

106 op._create_sql_query() 

107 == f"""COPY INTO test WITH (CREDENTIAL abc) 

108FROM (SELECT {expression} FROM '{COPY_FILE_LOCATION}' WITH (CREDENTIAL (AZURE_SAS_TOKEN = 'abc') )) 

109FILEFORMAT = CSV 

110""".strip() 

111 ) 

112 

113 

114def test_copy_with_encryption(): 

115 op = DatabricksCopyIntoOperator( 

116 file_location=COPY_FILE_LOCATION, 

117 file_format="CSV", 

118 table_name="test", 

119 task_id=TASK_ID, 

120 encryption={"TYPE": "AWS_SSE_C", "MASTER_KEY": "abc"}, 

121 ) 

122 assert ( 

123 op._create_sql_query() 

124 == f"""COPY INTO test 

125FROM '{COPY_FILE_LOCATION}' WITH ( ENCRYPTION (TYPE = 'AWS_SSE_C', MASTER_KEY = 'abc')) 

126FILEFORMAT = CSV 

127""".strip() 

128 ) 

129 

130 

131def test_copy_with_encryption_and_credential(): 

132 op = DatabricksCopyIntoOperator( 

133 file_location=COPY_FILE_LOCATION, 

134 file_format="CSV", 

135 table_name="test", 

136 task_id=TASK_ID, 

137 encryption={"TYPE": "AWS_SSE_C", "MASTER_KEY": "abc"}, 

138 credential={"AZURE_SAS_TOKEN": "abc"}, 

139 ) 

140 assert ( 

141 op._create_sql_query() 

142 == f"""COPY INTO test 

143FROM '{COPY_FILE_LOCATION}' WITH (CREDENTIAL (AZURE_SAS_TOKEN = 'abc') """ 

144 """ENCRYPTION (TYPE = 'AWS_SSE_C', MASTER_KEY = 'abc')) 

145FILEFORMAT = CSV 

146""".strip() 

147 ) 

148 

149 

150def test_copy_with_validate_all(): 

151 op = DatabricksCopyIntoOperator( 

152 file_location=COPY_FILE_LOCATION, 

153 file_format="JSON", 

154 table_name="test", 

155 task_id=TASK_ID, 

156 validate=True, 

157 ) 

158 assert ( 

159 op._create_sql_query() 

160 == f"""COPY INTO test 

161FROM '{COPY_FILE_LOCATION}' 

162FILEFORMAT = JSON 

163VALIDATE ALL 

164""".strip() 

165 ) 

166 

167 

168def test_copy_with_validate_N_rows(): 

169 op = DatabricksCopyIntoOperator( 

170 file_location=COPY_FILE_LOCATION, 

171 file_format="JSON", 

172 table_name="test", 

173 task_id=TASK_ID, 

174 validate=10, 

175 ) 

176 assert ( 

177 op._create_sql_query() 

178 == f"""COPY INTO test 

179FROM '{COPY_FILE_LOCATION}' 

180FILEFORMAT = JSON 

181VALIDATE 10 ROWS 

182""".strip() 

183 ) 

184 

185 

186def test_incorrect_params_files_patterns(): 

187 exception_message = "Only one of 'pattern' or 'files' should be specified" 

188 with pytest.raises(AirflowException, match=exception_message): 

189 DatabricksCopyIntoOperator( 

190 task_id=TASK_ID, 

191 file_location=COPY_FILE_LOCATION, 

192 file_format="JSON", 

193 table_name="test", 

194 files=["file1", "file2", "file3"], 

195 pattern="abc", 

196 ) 

197 

198 

199def test_incorrect_params_emtpy_table(): 

200 exception_message = "table_name shouldn't be empty" 

201 with pytest.raises(AirflowException, match=exception_message): 

202 DatabricksCopyIntoOperator( 

203 task_id=TASK_ID, 

204 file_location=COPY_FILE_LOCATION, 

205 file_format="JSON", 

206 table_name="", 

207 ) 

208 

209 

210def test_incorrect_params_emtpy_location(): 

211 exception_message = "file_location shouldn't be empty" 

212 with pytest.raises(AirflowException, match=exception_message): 

213 DatabricksCopyIntoOperator( 

214 task_id=TASK_ID, 

215 file_location="", 

216 file_format="JSON", 

217 table_name="abc", 

218 ) 

219 

220 

221def test_incorrect_params_wrong_format(): 

222 file_format = "JSONL" 

223 exception_message = f"file_format '{file_format}' isn't supported" 

224 with pytest.raises(AirflowException, match=exception_message): 

225 DatabricksCopyIntoOperator( 

226 task_id=TASK_ID, 

227 file_location=COPY_FILE_LOCATION, 

228 file_format=file_format, 

229 table_name="abc", 

230 )