Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/providers/microsoft/azure/operators/test_azure_cosmos.py: 0%

23 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import json 

21import uuid 

22from unittest import mock 

23 

24from airflow.models import Connection 

25from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperator 

26from airflow.utils import db 

27 

28 

29class TestAzureCosmosDbHook: 

30 

31 # Set up an environment to test with 

32 def setup_method(self): 

33 # set up some test variables 

34 self.test_end_point = "https://test_endpoint:443" 

35 self.test_master_key = "magic_test_key" 

36 self.test_database_name = "test_database_name" 

37 self.test_collection_name = "test_collection_name" 

38 db.merge_conn( 

39 Connection( 

40 conn_id="azure_cosmos_test_key_id", 

41 conn_type="azure_cosmos", 

42 login=self.test_end_point, 

43 password=self.test_master_key, 

44 extra=json.dumps( 

45 {"database_name": self.test_database_name, "collection_name": self.test_collection_name} 

46 ), 

47 ) 

48 ) 

49 

50 @mock.patch("airflow.providers.microsoft.azure.hooks.cosmos.CosmosClient") 

51 def test_insert_document(self, cosmos_mock): 

52 test_id = str(uuid.uuid4()) 

53 cosmos_mock.return_value.CreateItem.return_value = {"id": test_id} 

54 op = AzureCosmosInsertDocumentOperator( 

55 database_name=self.test_database_name, 

56 collection_name=self.test_collection_name, 

57 document={"id": test_id, "data": "sometestdata"}, 

58 azure_cosmos_conn_id="azure_cosmos_test_key_id", 

59 task_id="azure_cosmos_sensor", 

60 ) 

61 

62 expected_calls = [ 

63 mock.call() 

64 .get_database_client("test_database_name") 

65 .get_container_client("test_collection_name") 

66 .upsert_item({"data": "sometestdata", "id": test_id}) 

67 ] 

68 

69 op.execute(None) 

70 cosmos_mock.assert_any_call(self.test_end_point, {"masterKey": self.test_master_key}) 

71 cosmos_mock.assert_has_calls(expected_calls)