Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/providers/databricks/operators/test_databricks_copy.py: 29%
52 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import pytest
22from airflow import AirflowException
23from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator
25DATE = "2017-04-20"
26TASK_ID = "databricks-sql-operator"
27DEFAULT_CONN_ID = "databricks_default"
28COPY_FILE_LOCATION = "s3://my-bucket/jsonData"
31def test_copy_with_files():
32 op = DatabricksCopyIntoOperator(
33 file_location=COPY_FILE_LOCATION,
34 file_format="JSON",
35 table_name="test",
36 files=["file1", "file2", "file3"],
37 format_options={"dateFormat": "yyyy-MM-dd"},
38 task_id=TASK_ID,
39 )
40 assert (
41 op._create_sql_query()
42 == f"""COPY INTO test
43FROM '{COPY_FILE_LOCATION}'
44FILEFORMAT = JSON
45FILES = ('file1','file2','file3')
46FORMAT_OPTIONS ('dateFormat' = 'yyyy-MM-dd')
47""".strip()
48 )
51def test_copy_with_expression():
52 expression = "col1, col2"
53 op = DatabricksCopyIntoOperator(
54 file_location=COPY_FILE_LOCATION,
55 file_format="CSV",
56 table_name="test",
57 task_id=TASK_ID,
58 pattern="folder1/file_[a-g].csv",
59 expression_list=expression,
60 format_options={"header": "true"},
61 force_copy=True,
62 )
63 assert (
64 op._create_sql_query()
65 == f"""COPY INTO test
66FROM (SELECT {expression} FROM '{COPY_FILE_LOCATION}')
67FILEFORMAT = CSV
68PATTERN = 'folder1/file_[a-g].csv'
69FORMAT_OPTIONS ('header' = 'true')
70COPY_OPTIONS ('force' = 'true')
71""".strip()
72 )
75def test_copy_with_credential():
76 expression = "col1, col2"
77 op = DatabricksCopyIntoOperator(
78 file_location=COPY_FILE_LOCATION,
79 file_format="CSV",
80 table_name="test",
81 task_id=TASK_ID,
82 expression_list=expression,
83 credential={"AZURE_SAS_TOKEN": "abc"},
84 )
85 assert (
86 op._create_sql_query()
87 == f"""COPY INTO test
88FROM (SELECT {expression} FROM '{COPY_FILE_LOCATION}' WITH (CREDENTIAL (AZURE_SAS_TOKEN = 'abc') ))
89FILEFORMAT = CSV
90""".strip()
91 )
94def test_copy_with_target_credential():
95 expression = "col1, col2"
96 op = DatabricksCopyIntoOperator(
97 file_location=COPY_FILE_LOCATION,
98 file_format="CSV",
99 table_name="test",
100 task_id=TASK_ID,
101 expression_list=expression,
102 storage_credential="abc",
103 credential={"AZURE_SAS_TOKEN": "abc"},
104 )
105 assert (
106 op._create_sql_query()
107 == f"""COPY INTO test WITH (CREDENTIAL abc)
108FROM (SELECT {expression} FROM '{COPY_FILE_LOCATION}' WITH (CREDENTIAL (AZURE_SAS_TOKEN = 'abc') ))
109FILEFORMAT = CSV
110""".strip()
111 )
114def test_copy_with_encryption():
115 op = DatabricksCopyIntoOperator(
116 file_location=COPY_FILE_LOCATION,
117 file_format="CSV",
118 table_name="test",
119 task_id=TASK_ID,
120 encryption={"TYPE": "AWS_SSE_C", "MASTER_KEY": "abc"},
121 )
122 assert (
123 op._create_sql_query()
124 == f"""COPY INTO test
125FROM '{COPY_FILE_LOCATION}' WITH ( ENCRYPTION (TYPE = 'AWS_SSE_C', MASTER_KEY = 'abc'))
126FILEFORMAT = CSV
127""".strip()
128 )
131def test_copy_with_encryption_and_credential():
132 op = DatabricksCopyIntoOperator(
133 file_location=COPY_FILE_LOCATION,
134 file_format="CSV",
135 table_name="test",
136 task_id=TASK_ID,
137 encryption={"TYPE": "AWS_SSE_C", "MASTER_KEY": "abc"},
138 credential={"AZURE_SAS_TOKEN": "abc"},
139 )
140 assert (
141 op._create_sql_query()
142 == f"""COPY INTO test
143FROM '{COPY_FILE_LOCATION}' WITH (CREDENTIAL (AZURE_SAS_TOKEN = 'abc') """
144 """ENCRYPTION (TYPE = 'AWS_SSE_C', MASTER_KEY = 'abc'))
145FILEFORMAT = CSV
146""".strip()
147 )
150def test_copy_with_validate_all():
151 op = DatabricksCopyIntoOperator(
152 file_location=COPY_FILE_LOCATION,
153 file_format="JSON",
154 table_name="test",
155 task_id=TASK_ID,
156 validate=True,
157 )
158 assert (
159 op._create_sql_query()
160 == f"""COPY INTO test
161FROM '{COPY_FILE_LOCATION}'
162FILEFORMAT = JSON
163VALIDATE ALL
164""".strip()
165 )
168def test_copy_with_validate_N_rows():
169 op = DatabricksCopyIntoOperator(
170 file_location=COPY_FILE_LOCATION,
171 file_format="JSON",
172 table_name="test",
173 task_id=TASK_ID,
174 validate=10,
175 )
176 assert (
177 op._create_sql_query()
178 == f"""COPY INTO test
179FROM '{COPY_FILE_LOCATION}'
180FILEFORMAT = JSON
181VALIDATE 10 ROWS
182""".strip()
183 )
186def test_incorrect_params_files_patterns():
187 exception_message = "Only one of 'pattern' or 'files' should be specified"
188 with pytest.raises(AirflowException, match=exception_message):
189 DatabricksCopyIntoOperator(
190 task_id=TASK_ID,
191 file_location=COPY_FILE_LOCATION,
192 file_format="JSON",
193 table_name="test",
194 files=["file1", "file2", "file3"],
195 pattern="abc",
196 )
199def test_incorrect_params_emtpy_table():
200 exception_message = "table_name shouldn't be empty"
201 with pytest.raises(AirflowException, match=exception_message):
202 DatabricksCopyIntoOperator(
203 task_id=TASK_ID,
204 file_location=COPY_FILE_LOCATION,
205 file_format="JSON",
206 table_name="",
207 )
210def test_incorrect_params_emtpy_location():
211 exception_message = "file_location shouldn't be empty"
212 with pytest.raises(AirflowException, match=exception_message):
213 DatabricksCopyIntoOperator(
214 task_id=TASK_ID,
215 file_location="",
216 file_format="JSON",
217 table_name="abc",
218 )
221def test_incorrect_params_wrong_format():
222 file_format = "JSONL"
223 exception_message = f"file_format '{file_format}' isn't supported"
224 with pytest.raises(AirflowException, match=exception_message):
225 DatabricksCopyIntoOperator(
226 task_id=TASK_ID,
227 file_location=COPY_FILE_LOCATION,
228 file_format=file_format,
229 table_name="abc",
230 )