Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/providers/redis/hooks/redis.py: 2%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""RedisHook module."""
20from __future__ import annotations
22import warnings
23from typing import Any
25from redis import Redis
27from airflow.exceptions import AirflowProviderDeprecationWarning
28from airflow.hooks.base import BaseHook
30DEFAULT_SSL_CERT_REQS = "required"
31ALLOWED_SSL_CERT_REQS = [DEFAULT_SSL_CERT_REQS, "optional", "none"]
34class RedisHook(BaseHook):
35 """
36 Wrapper for connection to interact with Redis in-memory data structure store.
38 You can set your db in the extra field of your connection as ``{"db": 3}``.
39 Also you can set ssl parameters as:
40 ``{"ssl": true, "ssl_cert_reqs": "require", "ssl_cert_file": "/path/to/cert.pem", etc}``.
41 """
43 conn_name_attr = "redis_conn_id"
44 default_conn_name = "redis_default"
45 conn_type = "redis"
46 hook_name = "Redis"
48 def __init__(self, redis_conn_id: str = default_conn_name) -> None:
49 """
50 Prepare hook to connect to a Redis database.
52 :param conn_id: the name of the connection that has the parameters
53 we need to connect to Redis.
54 """
55 super().__init__()
56 self.redis_conn_id = redis_conn_id
57 self.redis = None
58 self.host = None
59 self.port = None
60 self.username = None
61 self.password = None
62 self.db = None
64 def get_conn(self):
65 """Return a Redis connection."""
66 conn = self.get_connection(self.redis_conn_id)
67 self.host = conn.host
68 self.port = conn.port
69 self.username = conn.login
70 self.password = None if str(conn.password).lower() in ["none", "false", ""] else conn.password
71 self.db = conn.extra_dejson.get("db")
73 # check for ssl parameters in conn.extra
74 ssl_arg_names = [
75 "ssl",
76 "ssl_cert_reqs",
77 "ssl_ca_certs",
78 "ssl_keyfile",
79 "ssl_certfile",
80 "ssl_check_hostname",
81 ]
82 ssl_args = {name: val for name, val in conn.extra_dejson.items() if name in ssl_arg_names}
84 # This logic is for backward compatibility only
85 if "ssl_cert_file" in conn.extra_dejson and "ssl_certfile" not in conn.extra_dejson:
86 warnings.warn(
87 "Extra parameter `ssl_cert_file` deprecated and will be removed "
88 "in a future release. Please use `ssl_certfile` instead.",
89 AirflowProviderDeprecationWarning,
90 stacklevel=2,
91 )
92 ssl_args["ssl_certfile"] = conn.extra_dejson.get("ssl_cert_file")
94 if not self.redis:
95 self.log.debug(
96 'Initializing redis object for conn_id "%s" on %s:%s:%s',
97 self.redis_conn_id,
98 self.host,
99 self.port,
100 self.db,
101 )
102 self.redis = Redis(
103 host=self.host,
104 port=self.port,
105 username=self.username,
106 password=self.password,
107 db=self.db,
108 **ssl_args,
109 )
111 return self.redis
113 @classmethod
114 def get_ui_field_behaviour(cls) -> dict[str, Any]:
115 """Return custom UI field behaviour for Redis connection."""
116 return {
117 "hidden_fields": ["schema", "extra"],
118 "relabeling": {},
119 }
121 @classmethod
122 def get_connection_form_widgets(cls) -> dict[str, Any]:
123 """Return connection widgets to add to Redis connection form."""
124 from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
125 from flask_babel import lazy_gettext
126 from wtforms import BooleanField, IntegerField, StringField
127 from wtforms.validators import Optional, any_of
129 return {
130 "db": IntegerField(lazy_gettext("DB"), widget=BS3TextFieldWidget(), default=0),
131 "ssl": BooleanField(lazy_gettext("Enable SSL"), default=False),
132 "ssl_cert_reqs": StringField(
133 lazy_gettext("SSL verify mode"),
134 validators=[any_of(ALLOWED_SSL_CERT_REQS)],
135 widget=BS3TextFieldWidget(),
136 description=f"Must be one of: {', '.join(ALLOWED_SSL_CERT_REQS)}.",
137 default=DEFAULT_SSL_CERT_REQS,
138 ),
139 "ssl_ca_certs": StringField(
140 lazy_gettext("CA certificate path"),
141 widget=BS3TextFieldWidget(),
142 validators=[Optional()],
143 default=None,
144 ),
145 "ssl_keyfile": StringField(
146 lazy_gettext("Private key path"),
147 widget=BS3TextFieldWidget(),
148 validators=[Optional()],
149 default=None,
150 ),
151 "ssl_certfile": StringField(
152 lazy_gettext("Certificate path"),
153 widget=BS3TextFieldWidget(),
154 validators=[Optional()],
155 default=None,
156 ),
157 "ssl_check_hostname": BooleanField(lazy_gettext("Enable hostname check"), default=False),
158 }