Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/system/providers/databricks/example_databricks_repos.py: 0%

24 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import os 

20from datetime import datetime 

21 

22from airflow import DAG 

23from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator 

24from airflow.providers.databricks.operators.databricks_repos import ( 

25 DatabricksReposCreateOperator, 

26 DatabricksReposDeleteOperator, 

27 DatabricksReposUpdateOperator, 

28) 

29 

30default_args = { 

31 "owner": "airflow", 

32 "databricks_conn_id": "databricks", 

33} 

34 

35ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") 

36DAG_ID = "example_databricks_repos_operator" 

37 

38with DAG( 

39 dag_id=DAG_ID, 

40 schedule="@daily", 

41 start_date=datetime(2021, 1, 1), 

42 default_args=default_args, 

43 tags=["example"], 

44 catchup=False, 

45) as dag: 

46 # [START howto_operator_databricks_repo_create] 

47 # Example of creating a Databricks Repo 

48 repo_path = "/Repos/user@domain.com/demo-repo" 

49 git_url = "https://github.com/test/test" 

50 create_repo = DatabricksReposCreateOperator(task_id="create_repo", repo_path=repo_path, git_url=git_url) 

51 # [END howto_operator_databricks_repo_create] 

52 

53 # [START howto_operator_databricks_repo_update] 

54 # Example of updating a Databricks Repo to the latest code 

55 repo_path = "/Repos/user@domain.com/demo-repo" 

56 update_repo = DatabricksReposUpdateOperator(task_id="update_repo", repo_path=repo_path, branch="releases") 

57 # [END howto_operator_databricks_repo_update] 

58 

59 notebook_task_params = { 

60 "new_cluster": { 

61 "spark_version": "9.1.x-scala2.12", 

62 "node_type_id": "r3.xlarge", 

63 "aws_attributes": {"availability": "ON_DEMAND"}, 

64 "num_workers": 8, 

65 }, 

66 "notebook_task": { 

67 "notebook_path": f"{repo_path}/PrepareData", 

68 }, 

69 } 

70 

71 notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params) 

72 

73 # [START howto_operator_databricks_repo_delete] 

74 # Example of deleting a Databricks Repo 

75 repo_path = "/Repos/user@domain.com/demo-repo" 

76 delete_repo = DatabricksReposDeleteOperator(task_id="delete_repo", repo_path=repo_path) 

77 # [END howto_operator_databricks_repo_delete] 

78 

79 (create_repo >> update_repo >> notebook_task >> delete_repo) 

80 

81 from tests.system.utils.watcher import watcher 

82 

83 # This test needs watcher in order to properly mark success/failure 

84 # when "tearDown" task with trigger rule is part of the DAG 

85 list(dag.tasks) >> watcher() 

86 

87from tests.system.utils import get_test_run # noqa: E402 

88 

89# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) 

90test_run = get_test_run(dag)