Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/providers/redis/hooks/redis.py: 2%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

49 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""RedisHook module.""" 

19 

20from __future__ import annotations 

21 

22import warnings 

23from typing import Any 

24 

25from redis import Redis 

26 

27from airflow.exceptions import AirflowProviderDeprecationWarning 

28from airflow.hooks.base import BaseHook 

29 

30DEFAULT_SSL_CERT_REQS = "required" 

31ALLOWED_SSL_CERT_REQS = [DEFAULT_SSL_CERT_REQS, "optional", "none"] 

32 

33 

34class RedisHook(BaseHook): 

35 """ 

36 Wrapper for connection to interact with Redis in-memory data structure store. 

37 

38 You can set your db in the extra field of your connection as ``{"db": 3}``. 

39 Also you can set ssl parameters as: 

40 ``{"ssl": true, "ssl_cert_reqs": "require", "ssl_cert_file": "/path/to/cert.pem", etc}``. 

41 """ 

42 

43 conn_name_attr = "redis_conn_id" 

44 default_conn_name = "redis_default" 

45 conn_type = "redis" 

46 hook_name = "Redis" 

47 

48 def __init__(self, redis_conn_id: str = default_conn_name) -> None: 

49 """ 

50 Prepare hook to connect to a Redis database. 

51 

52 :param conn_id: the name of the connection that has the parameters 

53 we need to connect to Redis. 

54 """ 

55 super().__init__() 

56 self.redis_conn_id = redis_conn_id 

57 self.redis = None 

58 self.host = None 

59 self.port = None 

60 self.username = None 

61 self.password = None 

62 self.db = None 

63 

64 def get_conn(self): 

65 """Return a Redis connection.""" 

66 conn = self.get_connection(self.redis_conn_id) 

67 self.host = conn.host 

68 self.port = conn.port 

69 self.username = conn.login 

70 self.password = None if str(conn.password).lower() in ["none", "false", ""] else conn.password 

71 self.db = conn.extra_dejson.get("db") 

72 

73 # check for ssl parameters in conn.extra 

74 ssl_arg_names = [ 

75 "ssl", 

76 "ssl_cert_reqs", 

77 "ssl_ca_certs", 

78 "ssl_keyfile", 

79 "ssl_certfile", 

80 "ssl_check_hostname", 

81 ] 

82 ssl_args = {name: val for name, val in conn.extra_dejson.items() if name in ssl_arg_names} 

83 

84 # This logic is for backward compatibility only 

85 if "ssl_cert_file" in conn.extra_dejson and "ssl_certfile" not in conn.extra_dejson: 

86 warnings.warn( 

87 "Extra parameter `ssl_cert_file` deprecated and will be removed " 

88 "in a future release. Please use `ssl_certfile` instead.", 

89 AirflowProviderDeprecationWarning, 

90 stacklevel=2, 

91 ) 

92 ssl_args["ssl_certfile"] = conn.extra_dejson.get("ssl_cert_file") 

93 

94 if not self.redis: 

95 self.log.debug( 

96 'Initializing redis object for conn_id "%s" on %s:%s:%s', 

97 self.redis_conn_id, 

98 self.host, 

99 self.port, 

100 self.db, 

101 ) 

102 self.redis = Redis( 

103 host=self.host, 

104 port=self.port, 

105 username=self.username, 

106 password=self.password, 

107 db=self.db, 

108 **ssl_args, 

109 ) 

110 

111 return self.redis 

112 

113 @classmethod 

114 def get_ui_field_behaviour(cls) -> dict[str, Any]: 

115 """Return custom UI field behaviour for Redis connection.""" 

116 return { 

117 "hidden_fields": ["schema", "extra"], 

118 "relabeling": {}, 

119 } 

120 

121 @classmethod 

122 def get_connection_form_widgets(cls) -> dict[str, Any]: 

123 """Return connection widgets to add to Redis connection form.""" 

124 from flask_appbuilder.fieldwidgets import BS3TextFieldWidget 

125 from flask_babel import lazy_gettext 

126 from wtforms import BooleanField, IntegerField, StringField 

127 from wtforms.validators import Optional, any_of 

128 

129 return { 

130 "db": IntegerField(lazy_gettext("DB"), widget=BS3TextFieldWidget(), default=0), 

131 "ssl": BooleanField(lazy_gettext("Enable SSL"), default=False), 

132 "ssl_cert_reqs": StringField( 

133 lazy_gettext("SSL verify mode"), 

134 validators=[any_of(ALLOWED_SSL_CERT_REQS)], 

135 widget=BS3TextFieldWidget(), 

136 description=f"Must be one of: {', '.join(ALLOWED_SSL_CERT_REQS)}.", 

137 default=DEFAULT_SSL_CERT_REQS, 

138 ), 

139 "ssl_ca_certs": StringField( 

140 lazy_gettext("CA certificate path"), 

141 widget=BS3TextFieldWidget(), 

142 validators=[Optional()], 

143 default=None, 

144 ), 

145 "ssl_keyfile": StringField( 

146 lazy_gettext("Private key path"), 

147 widget=BS3TextFieldWidget(), 

148 validators=[Optional()], 

149 default=None, 

150 ), 

151 "ssl_certfile": StringField( 

152 lazy_gettext("Certificate path"), 

153 widget=BS3TextFieldWidget(), 

154 validators=[Optional()], 

155 default=None, 

156 ), 

157 "ssl_check_hostname": BooleanField(lazy_gettext("Enable hostname check"), default=False), 

158 }