Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/providers/microsoft/azure/operators/test_cosmos.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

24 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import uuid 

21from unittest import mock 

22 

23import pytest 

24 

25from airflow.models import Connection 

26from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperator 

27 

28 

29class TestAzureCosmosDbHook: 

30 # Set up an environment to test with 

31 @pytest.fixture(autouse=True) 

32 def setup_test_cases(self, create_mock_connection): 

33 # set up some test variables 

34 self.test_end_point = "https://test_endpoint:443" 

35 self.test_master_key = "magic_test_key" 

36 self.test_database_name = "test_database_name" 

37 self.test_collection_name = "test_collection_name" 

38 self.test_partition_key = "test_partition_key" 

39 create_mock_connection( 

40 Connection( 

41 conn_id="azure_cosmos_test_key_id", 

42 conn_type="azure_cosmos", 

43 login=self.test_end_point, 

44 password=self.test_master_key, 

45 extra={ 

46 "database_name": self.test_database_name, 

47 "collection_name": self.test_collection_name, 

48 "partition_key": self.test_partition_key, 

49 }, 

50 ) 

51 ) 

52 

53 @mock.patch("airflow.providers.microsoft.azure.hooks.cosmos.CosmosClient") 

54 def test_insert_document(self, cosmos_mock): 

55 test_id = str(uuid.uuid4()) 

56 cosmos_mock.return_value.CreateItem.return_value = {"id": test_id} 

57 op = AzureCosmosInsertDocumentOperator( 

58 database_name=self.test_database_name, 

59 collection_name=self.test_collection_name, 

60 document={"id": test_id, "data": "sometestdata"}, 

61 azure_cosmos_conn_id="azure_cosmos_test_key_id", 

62 task_id="azure_cosmos_sensor", 

63 ) 

64 

65 expected_calls = [ 

66 mock.call() 

67 .get_database_client("test_database_name") 

68 .get_container_client("test_collection_name") 

69 .upsert_item({"data": "sometestdata", "id": test_id}) 

70 ] 

71 

72 op.execute(None) 

73 cosmos_mock.assert_any_call(self.test_end_point, {"masterKey": self.test_master_key}) 

74 cosmos_mock.assert_has_calls(expected_calls)