Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/system/providers/databricks/example_databricks_repos.py: 0%
24 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import os
20from datetime import datetime
22from airflow import DAG
23from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
24from airflow.providers.databricks.operators.databricks_repos import (
25 DatabricksReposCreateOperator,
26 DatabricksReposDeleteOperator,
27 DatabricksReposUpdateOperator,
28)
30default_args = {
31 "owner": "airflow",
32 "databricks_conn_id": "databricks",
33}
35ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
36DAG_ID = "example_databricks_repos_operator"
38with DAG(
39 dag_id=DAG_ID,
40 schedule="@daily",
41 start_date=datetime(2021, 1, 1),
42 default_args=default_args,
43 tags=["example"],
44 catchup=False,
45) as dag:
46 # [START howto_operator_databricks_repo_create]
47 # Example of creating a Databricks Repo
48 repo_path = "/Repos/user@domain.com/demo-repo"
49 git_url = "https://github.com/test/test"
50 create_repo = DatabricksReposCreateOperator(task_id="create_repo", repo_path=repo_path, git_url=git_url)
51 # [END howto_operator_databricks_repo_create]
53 # [START howto_operator_databricks_repo_update]
54 # Example of updating a Databricks Repo to the latest code
55 repo_path = "/Repos/user@domain.com/demo-repo"
56 update_repo = DatabricksReposUpdateOperator(task_id="update_repo", repo_path=repo_path, branch="releases")
57 # [END howto_operator_databricks_repo_update]
59 notebook_task_params = {
60 "new_cluster": {
61 "spark_version": "9.1.x-scala2.12",
62 "node_type_id": "r3.xlarge",
63 "aws_attributes": {"availability": "ON_DEMAND"},
64 "num_workers": 8,
65 },
66 "notebook_task": {
67 "notebook_path": f"{repo_path}/PrepareData",
68 },
69 }
71 notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params)
73 # [START howto_operator_databricks_repo_delete]
74 # Example of deleting a Databricks Repo
75 repo_path = "/Repos/user@domain.com/demo-repo"
76 delete_repo = DatabricksReposDeleteOperator(task_id="delete_repo", repo_path=repo_path)
77 # [END howto_operator_databricks_repo_delete]
79 (create_repo >> update_repo >> notebook_task >> delete_repo)
81 from tests.system.utils.watcher import watcher
83 # This test needs watcher in order to properly mark success/failure
84 # when "tearDown" task with trigger rule is part of the DAG
85 list(dag.tasks) >> watcher()
87from tests.system.utils import get_test_run # noqa: E402
89# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
90test_run = get_test_run(dag)