Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/api_internal/internal_api_call.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

78 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import inspect 

21import json 

22import logging 

23from functools import wraps 

24from typing import Callable, TypeVar 

25 

26import requests 

27import tenacity 

28from urllib3.exceptions import NewConnectionError 

29 

30from airflow.configuration import conf 

31from airflow.exceptions import AirflowConfigException, AirflowException 

32from airflow.settings import _ENABLE_AIP_44 

33from airflow.typing_compat import ParamSpec 

34 

35PS = ParamSpec("PS") 

36RT = TypeVar("RT") 

37 

38logger = logging.getLogger(__name__) 

39 

40 

41class InternalApiConfig: 

42 """Stores and caches configuration for Internal API.""" 

43 

44 _initialized = False 

45 _use_internal_api = False 

46 _internal_api_endpoint = "" 

47 

48 @staticmethod 

49 def force_database_direct_access(): 

50 """ 

51 Block current component from using Internal API. 

52 

53 All methods decorated with internal_api_call will always be executed locally. 

54 This mode is needed for "trusted" components like Scheduler, Webserver or Internal Api server. 

55 """ 

56 InternalApiConfig._initialized = True 

57 InternalApiConfig._use_internal_api = False 

58 

59 @staticmethod 

60 def get_use_internal_api(): 

61 if not InternalApiConfig._initialized: 

62 InternalApiConfig._init_values() 

63 return InternalApiConfig._use_internal_api 

64 

65 @staticmethod 

66 def get_internal_api_endpoint(): 

67 if not InternalApiConfig._initialized: 

68 InternalApiConfig._init_values() 

69 return InternalApiConfig._internal_api_endpoint 

70 

71 @staticmethod 

72 def _init_values(): 

73 use_internal_api = conf.getboolean("core", "database_access_isolation", fallback=False) 

74 if use_internal_api and not _ENABLE_AIP_44: 

75 raise RuntimeError("The AIP_44 is not enabled so you cannot use it.") 

76 internal_api_endpoint = "" 

77 if use_internal_api: 

78 internal_api_url = conf.get("core", "internal_api_url") 

79 internal_api_endpoint = internal_api_url + "/internal_api/v1/rpcapi" 

80 if not internal_api_endpoint.startswith("http://"): 

81 raise AirflowConfigException("[core]internal_api_url must start with http://") 

82 

83 InternalApiConfig._initialized = True 

84 InternalApiConfig._use_internal_api = use_internal_api 

85 InternalApiConfig._internal_api_endpoint = internal_api_endpoint 

86 

87 

88def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: 

89 """ 

90 Allow methods to be executed in database isolation mode. 

91 

92 If [core]database_access_isolation is true then such method are not executed locally, 

93 but instead RPC call is made to Database API (aka Internal API). This makes some components 

94 decouple from direct Airflow database access. 

95 Each decorated method must be present in METHODS list in airflow.api_internal.endpoints.rpc_api_endpoint. 

96 Only static methods can be decorated. This decorator must be before "provide_session". 

97 

98 See [AIP-44](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API) 

99 for more information . 

100 """ 

101 headers = { 

102 "Content-Type": "application/json", 

103 } 

104 from requests.exceptions import ConnectionError 

105 

106 @tenacity.retry( 

107 stop=tenacity.stop_after_attempt(10), 

108 wait=tenacity.wait_exponential(min=1), 

109 retry=tenacity.retry_if_exception_type((NewConnectionError, ConnectionError)), 

110 before_sleep=tenacity.before_log(logger, logging.WARNING), 

111 ) 

112 def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: 

113 data = {"jsonrpc": "2.0", "method": method_name, "params": params_json} 

114 internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint() 

115 response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers) 

116 if response.status_code != 200: 

117 raise AirflowException( 

118 f"Got {response.status_code}:{response.reason} when sending " 

119 f"the internal api request: {response.text}" 

120 ) 

121 return response.content 

122 

123 @wraps(func) 

124 def wrapper(*args, **kwargs): 

125 use_internal_api = InternalApiConfig.get_use_internal_api() 

126 if not use_internal_api: 

127 return func(*args, **kwargs) 

128 

129 from airflow.serialization.serialized_objects import BaseSerialization # avoid circular import 

130 

131 bound = inspect.signature(func).bind(*args, **kwargs) 

132 arguments_dict = dict(bound.arguments) 

133 if "session" in arguments_dict: 

134 del arguments_dict["session"] 

135 if "cls" in arguments_dict: # used by @classmethod 

136 del arguments_dict["cls"] 

137 

138 args_dict = BaseSerialization.serialize(arguments_dict, use_pydantic_models=True) 

139 method_name = f"{func.__module__}.{func.__qualname__}" 

140 result = make_jsonrpc_request(method_name, args_dict) 

141 if result is None or result == b"": 

142 return None 

143 return BaseSerialization.deserialize(json.loads(result), use_pydantic_models=True) 

144 

145 return wrapper