Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/providers/redis/hooks/redis.py: 0%
29 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""RedisHook module."""
19from __future__ import annotations
21from redis import Redis
23from airflow.hooks.base import BaseHook
26class RedisHook(BaseHook):
27 """
28 Wrapper for connection to interact with Redis in-memory data structure store.
30 You can set your db in the extra field of your connection as ``{"db": 3}``.
31 Also you can set ssl parameters as:
32 ``{"ssl": true, "ssl_cert_reqs": "require", "ssl_cert_file": "/path/to/cert.pem", etc}``.
33 """
35 conn_name_attr = "redis_conn_id"
36 default_conn_name = "redis_default"
37 conn_type = "redis"
38 hook_name = "Redis"
40 def __init__(self, redis_conn_id: str = default_conn_name) -> None:
41 """
42 Prepares hook to connect to a Redis database.
44 :param conn_id: the name of the connection that has the parameters
45 we need to connect to Redis.
46 """
47 super().__init__()
48 self.redis_conn_id = redis_conn_id
49 self.redis = None
50 self.host = None
51 self.port = None
52 self.password = None
53 self.db = None
55 def get_conn(self):
56 """Returns a Redis connection."""
57 conn = self.get_connection(self.redis_conn_id)
58 self.host = conn.host
59 self.port = conn.port
60 self.password = None if str(conn.password).lower() in ["none", "false", ""] else conn.password
61 self.db = conn.extra_dejson.get("db")
63 # check for ssl parameters in conn.extra
64 ssl_arg_names = [
65 "ssl",
66 "ssl_cert_reqs",
67 "ssl_ca_certs",
68 "ssl_keyfile",
69 "ssl_cert_file",
70 "ssl_check_hostname",
71 ]
72 ssl_args = {name: val for name, val in conn.extra_dejson.items() if name in ssl_arg_names}
74 if not self.redis:
75 self.log.debug(
76 'Initializing redis object for conn_id "%s" on %s:%s:%s',
77 self.redis_conn_id,
78 self.host,
79 self.port,
80 self.db,
81 )
82 self.redis = Redis(host=self.host, port=self.port, password=self.password, db=self.db, **ssl_args)
84 return self.redis