Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/charts/test_redis.py: 12%
112 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import re
20from base64 import b64decode
21from subprocess import CalledProcessError
23import jmespath
24import pytest
26from tests.charts.helm_template_generator import prepare_k8s_lookup_dict, render_chart
28RELEASE_NAME_REDIS = "test-redis"
30REDIS_OBJECTS = {
31 "NETWORK_POLICY": ("NetworkPolicy", f"{RELEASE_NAME_REDIS}-redis-policy"),
32 "SERVICE": ("Service", f"{RELEASE_NAME_REDIS}-redis"),
33 "STATEFUL_SET": ("StatefulSet", f"{RELEASE_NAME_REDIS}-redis"),
34 "SECRET_PASSWORD": ("Secret", f"{RELEASE_NAME_REDIS}-redis-password"),
35 "SECRET_BROKER_URL": ("Secret", f"{RELEASE_NAME_REDIS}-broker-url"),
36}
37SET_POSSIBLE_REDIS_OBJECT_KEYS = set(REDIS_OBJECTS.values())
39CELERY_EXECUTORS_PARAMS = ["CeleryExecutor", "CeleryKubernetesExecutor"]
42class TestRedis:
43 @staticmethod
44 def get_broker_url_in_broker_url_secret(k8s_obj_by_key):
45 broker_url_in_obj = b64decode(
46 k8s_obj_by_key[REDIS_OBJECTS["SECRET_BROKER_URL"]]["data"]["connection"]
47 ).decode("utf-8")
48 return broker_url_in_obj
50 @staticmethod
51 def get_redis_password_in_password_secret(k8s_obj_by_key):
52 password_in_obj = b64decode(
53 k8s_obj_by_key[REDIS_OBJECTS["SECRET_PASSWORD"]]["data"]["password"]
54 ).decode("utf-8")
55 return password_in_obj
57 @staticmethod
58 def get_broker_url_secret_in_deployment(k8s_obj_by_key, kind: str, name: str) -> str:
59 deployment_obj = k8s_obj_by_key[(kind, f"{RELEASE_NAME_REDIS}-{name}")]
60 containers = deployment_obj["spec"]["template"]["spec"]["containers"]
61 container = next(obj for obj in containers if obj["name"] == name)
63 envs = container["env"]
64 env = next(obj for obj in envs if obj["name"] == "AIRFLOW__CELERY__BROKER_URL")
65 return env["valueFrom"]["secretKeyRef"]["name"]
67 def assert_password_and_broker_url_secrets(
68 self, k8s_obj_by_key, expected_password_match: str | None, expected_broker_url_match: str | None
69 ):
70 if expected_password_match is not None:
71 redis_password_in_password_secret = self.get_redis_password_in_password_secret(k8s_obj_by_key)
72 assert re.search(expected_password_match, redis_password_in_password_secret)
73 else:
74 assert REDIS_OBJECTS["SECRET_PASSWORD"] not in k8s_obj_by_key.keys()
76 if expected_broker_url_match is not None:
77 # assert redis broker url in secret
78 broker_url_in_broker_url_secret = self.get_broker_url_in_broker_url_secret(k8s_obj_by_key)
79 assert re.search(expected_broker_url_match, broker_url_in_broker_url_secret)
80 else:
81 assert REDIS_OBJECTS["SECRET_BROKER_URL"] not in k8s_obj_by_key.keys()
83 def assert_broker_url_env(
84 self, k8s_obj_by_key, expected_broker_url_secret_name=REDIS_OBJECTS["SECRET_BROKER_URL"][1]
85 ):
86 broker_url_secret_in_scheduler = self.get_broker_url_secret_in_deployment(
87 k8s_obj_by_key, "StatefulSet", "worker"
88 )
89 assert broker_url_secret_in_scheduler == expected_broker_url_secret_name
90 broker_url_secret_in_worker = self.get_broker_url_secret_in_deployment(
91 k8s_obj_by_key, "Deployment", "scheduler"
92 )
93 assert broker_url_secret_in_worker == expected_broker_url_secret_name
95 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
96 def test_redis_by_chart_default(self, executor):
97 k8s_objects = render_chart(
98 RELEASE_NAME_REDIS,
99 {
100 "executor": executor,
101 "networkPolicies": {"enabled": True},
102 "redis": {"enabled": True},
103 },
104 )
105 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
107 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
108 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS
110 self.assert_password_and_broker_url_secrets(
111 k8s_obj_by_key,
112 expected_password_match=r"\w+",
113 expected_broker_url_match=rf"redis://:.+@{RELEASE_NAME_REDIS}-redis:6379/0",
114 )
116 self.assert_broker_url_env(k8s_obj_by_key)
118 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
119 def test_redis_by_chart_password(self, executor):
120 k8s_objects = render_chart(
121 RELEASE_NAME_REDIS,
122 {
123 "executor": executor,
124 "networkPolicies": {"enabled": True},
125 "redis": {"enabled": True, "password": "test-redis-password!@#$%^&*()_+"},
126 },
127 )
128 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
130 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
131 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS
133 self.assert_password_and_broker_url_secrets(
134 k8s_obj_by_key,
135 expected_password_match="test-redis-password",
136 expected_broker_url_match=re.escape(
137 "redis://:test-redis-password%21%40%23$%25%5E&%2A%28%29_+@test-redis-redis:6379/0"
138 ),
139 )
141 self.assert_broker_url_env(k8s_obj_by_key)
143 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
144 def test_redis_by_chart_password_secret_name_missing_broker_url_secret_name(self, executor):
145 with pytest.raises(CalledProcessError):
146 render_chart(
147 RELEASE_NAME_REDIS,
148 {
149 "executor": executor,
150 "redis": {
151 "enabled": True,
152 "passwordSecretName": "test-redis-password-secret-name",
153 },
154 },
155 )
157 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
158 def test_redis_by_chart_password_secret_name(self, executor):
159 expected_broker_url_secret_name = "test-redis-broker-url-secret-name"
160 k8s_objects = render_chart(
161 RELEASE_NAME_REDIS,
162 {
163 "executor": executor,
164 "networkPolicies": {"enabled": True},
165 "data": {"brokerUrlSecretName": expected_broker_url_secret_name},
166 "redis": {
167 "enabled": True,
168 "passwordSecretName": "test-redis-password-secret-name",
169 },
170 },
171 )
172 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
174 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
175 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS - {
176 REDIS_OBJECTS["SECRET_PASSWORD"],
177 REDIS_OBJECTS["SECRET_BROKER_URL"],
178 }
180 self.assert_password_and_broker_url_secrets(
181 k8s_obj_by_key, expected_password_match=None, expected_broker_url_match=None
182 )
184 self.assert_broker_url_env(k8s_obj_by_key, expected_broker_url_secret_name)
186 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
187 def test_external_redis_broker_url(self, executor):
188 k8s_objects = render_chart(
189 RELEASE_NAME_REDIS,
190 {
191 "executor": executor,
192 "networkPolicies": {"enabled": True},
193 "data": {
194 "brokerUrl": "redis://redis-user:password@redis-host:6379/0",
195 },
196 "redis": {"enabled": False},
197 },
198 )
199 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
201 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
202 assert created_redis_objects == {REDIS_OBJECTS["SECRET_BROKER_URL"]}
204 self.assert_password_and_broker_url_secrets(
205 k8s_obj_by_key,
206 expected_password_match=None,
207 expected_broker_url_match="redis://redis-user:password@redis-host:6379/0",
208 )
210 self.assert_broker_url_env(k8s_obj_by_key)
212 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
213 def test_external_redis_broker_url_secret_name(self, executor):
214 expected_broker_url_secret_name = "redis-broker-url-secret-name"
215 k8s_objects = render_chart(
216 RELEASE_NAME_REDIS,
217 {
218 "executor": executor,
219 "networkPolicies": {"enabled": True},
220 "data": {"brokerUrlSecretName": expected_broker_url_secret_name},
221 "redis": {"enabled": False},
222 },
223 )
224 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
226 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
227 assert created_redis_objects == set()
229 self.assert_password_and_broker_url_secrets(
230 k8s_obj_by_key, expected_password_match=None, expected_broker_url_match=None
231 )
233 self.assert_broker_url_env(k8s_obj_by_key, expected_broker_url_secret_name)
235 def test_default_redis_secrets_created_with_non_celery_executor(self):
236 # We want to make sure default redis secrets (if needed) are still
237 # created during install, as they are marked "pre-install".
238 # See note in templates/secrets/redis-secrets.yaml for more.
239 docs = render_chart(
240 values={"executor": "KubernetesExecutor"}, show_only=["templates/secrets/redis-secrets.yaml"]
241 )
242 assert 2 == len(docs)
244 def test_should_create_valid_affinity_tolerations_and_node_selector(self):
245 docs = render_chart(
246 values={
247 "executor": "CeleryExecutor",
248 "redis": {
249 "affinity": {
250 "nodeAffinity": {
251 "requiredDuringSchedulingIgnoredDuringExecution": {
252 "nodeSelectorTerms": [
253 {
254 "matchExpressions": [
255 {"key": "foo", "operator": "In", "values": ["true"]},
256 ]
257 }
258 ]
259 }
260 }
261 },
262 "tolerations": [
263 {"key": "dynamic-pods", "operator": "Equal", "value": "true", "effect": "NoSchedule"}
264 ],
265 "nodeSelector": {"diskType": "ssd"},
266 },
267 },
268 show_only=["templates/redis/redis-statefulset.yaml"],
269 )
271 assert "StatefulSet" == jmespath.search("kind", docs[0])
272 assert "foo" == jmespath.search(
273 "spec.template.spec.affinity.nodeAffinity."
274 "requiredDuringSchedulingIgnoredDuringExecution."
275 "nodeSelectorTerms[0]."
276 "matchExpressions[0]."
277 "key",
278 docs[0],
279 )
280 assert "ssd" == jmespath.search(
281 "spec.template.spec.nodeSelector.diskType",
282 docs[0],
283 )
284 assert "dynamic-pods" == jmespath.search(
285 "spec.template.spec.tolerations[0].key",
286 docs[0],
287 )
289 def test_redis_resources_are_configurable(self):
290 docs = render_chart(
291 values={
292 "redis": {
293 "resources": {
294 "limits": {"cpu": "200m", "memory": "128Mi"},
295 "requests": {"cpu": "300m", "memory": "169Mi"},
296 }
297 },
298 },
299 show_only=["templates/redis/redis-statefulset.yaml"],
300 )
301 assert "128Mi" == jmespath.search("spec.template.spec.containers[0].resources.limits.memory", docs[0])
302 assert "169Mi" == jmespath.search(
303 "spec.template.spec.containers[0].resources.requests.memory", docs[0]
304 )
305 assert "300m" == jmespath.search("spec.template.spec.containers[0].resources.requests.cpu", docs[0])
307 def test_redis_resources_are_not_added_by_default(self):
308 docs = render_chart(
309 show_only=["templates/redis/redis-statefulset.yaml"],
310 )
311 assert jmespath.search("spec.template.spec.containers[0].resources", docs[0]) == {}
313 def test_should_set_correct_helm_hooks_weight(self):
314 docs = render_chart(
315 values={
316 "executor": "CeleryExecutor",
317 },
318 show_only=["templates/secrets/redis-secrets.yaml"],
319 )
320 annotations = jmespath.search("metadata.annotations", docs[0])
321 assert annotations["helm.sh/hook-weight"] == "0"
323 def test_persistence_volume_annotations(self):
324 docs = render_chart(
325 values={"redis": {"persistence": {"annotations": {"foo": "bar"}}}},
326 show_only=["templates/redis/redis-statefulset.yaml"],
327 )
328 assert {"foo": "bar"} == jmespath.search("spec.volumeClaimTemplates[0].metadata.annotations", docs[0])