Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/helm_tests/other/test_redis.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import re
20from base64 import b64decode
21from subprocess import CalledProcessError
23import jmespath
24import pytest
26from tests.charts.helm_template_generator import prepare_k8s_lookup_dict, render_chart
28RELEASE_NAME_REDIS = "test-redis"
30REDIS_OBJECTS = {
31 "NETWORK_POLICY": ("NetworkPolicy", f"{RELEASE_NAME_REDIS}-redis-policy"),
32 "SERVICE": ("Service", f"{RELEASE_NAME_REDIS}-redis"),
33 "STATEFUL_SET": ("StatefulSet", f"{RELEASE_NAME_REDIS}-redis"),
34 "SECRET_PASSWORD": ("Secret", f"{RELEASE_NAME_REDIS}-redis-password"),
35 "SECRET_BROKER_URL": ("Secret", f"{RELEASE_NAME_REDIS}-broker-url"),
36}
37SET_POSSIBLE_REDIS_OBJECT_KEYS = set(REDIS_OBJECTS.values())
39CELERY_EXECUTORS_PARAMS = ["CeleryExecutor", "CeleryKubernetesExecutor"]
42class TestRedis:
43 """Tests redis."""
45 @staticmethod
46 def get_broker_url_in_broker_url_secret(k8s_obj_by_key):
47 broker_url_in_obj = b64decode(
48 k8s_obj_by_key[REDIS_OBJECTS["SECRET_BROKER_URL"]]["data"]["connection"]
49 ).decode("utf-8")
50 return broker_url_in_obj
52 @staticmethod
53 def get_redis_password_in_password_secret(k8s_obj_by_key):
54 password_in_obj = b64decode(
55 k8s_obj_by_key[REDIS_OBJECTS["SECRET_PASSWORD"]]["data"]["password"]
56 ).decode("utf-8")
57 return password_in_obj
59 @staticmethod
60 def get_broker_url_secret_in_deployment(k8s_obj_by_key, kind: str, name: str) -> str:
61 deployment_obj = k8s_obj_by_key[(kind, f"{RELEASE_NAME_REDIS}-{name}")]
62 containers = deployment_obj["spec"]["template"]["spec"]["containers"]
63 container = next(obj for obj in containers if obj["name"] == name)
65 envs = container["env"]
66 env = next(obj for obj in envs if obj["name"] == "AIRFLOW__CELERY__BROKER_URL")
67 return env["valueFrom"]["secretKeyRef"]["name"]
69 def assert_password_and_broker_url_secrets(
70 self, k8s_obj_by_key, expected_password_match: str | None, expected_broker_url_match: str | None
71 ):
72 if expected_password_match is not None:
73 redis_password_in_password_secret = self.get_redis_password_in_password_secret(k8s_obj_by_key)
74 assert re.search(expected_password_match, redis_password_in_password_secret)
75 else:
76 assert REDIS_OBJECTS["SECRET_PASSWORD"] not in k8s_obj_by_key.keys()
78 if expected_broker_url_match is not None:
79 # assert redis broker url in secret
80 broker_url_in_broker_url_secret = self.get_broker_url_in_broker_url_secret(k8s_obj_by_key)
81 assert re.search(expected_broker_url_match, broker_url_in_broker_url_secret)
82 else:
83 assert REDIS_OBJECTS["SECRET_BROKER_URL"] not in k8s_obj_by_key.keys()
85 def assert_broker_url_env(
86 self, k8s_obj_by_key, expected_broker_url_secret_name=REDIS_OBJECTS["SECRET_BROKER_URL"][1]
87 ):
88 broker_url_secret_in_scheduler = self.get_broker_url_secret_in_deployment(
89 k8s_obj_by_key, "StatefulSet", "worker"
90 )
91 assert broker_url_secret_in_scheduler == expected_broker_url_secret_name
92 broker_url_secret_in_worker = self.get_broker_url_secret_in_deployment(
93 k8s_obj_by_key, "Deployment", "scheduler"
94 )
95 assert broker_url_secret_in_worker == expected_broker_url_secret_name
97 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
98 def test_redis_by_chart_default(self, executor):
99 k8s_objects = render_chart(
100 RELEASE_NAME_REDIS,
101 {
102 "executor": executor,
103 "networkPolicies": {"enabled": True},
104 "redis": {"enabled": True},
105 },
106 )
107 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
109 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
110 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS
112 self.assert_password_and_broker_url_secrets(
113 k8s_obj_by_key,
114 expected_password_match=r"\w+",
115 expected_broker_url_match=rf"redis://:.+@{RELEASE_NAME_REDIS}-redis:6379/0",
116 )
118 self.assert_broker_url_env(k8s_obj_by_key)
120 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
121 def test_redis_by_chart_password(self, executor):
122 k8s_objects = render_chart(
123 RELEASE_NAME_REDIS,
124 {
125 "executor": executor,
126 "networkPolicies": {"enabled": True},
127 "redis": {"enabled": True, "password": "test-redis-password!@#$%^&*()_+"},
128 },
129 )
130 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
132 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
133 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS
135 self.assert_password_and_broker_url_secrets(
136 k8s_obj_by_key,
137 expected_password_match="test-redis-password",
138 expected_broker_url_match=re.escape(
139 "redis://:test-redis-password%21%40%23$%25%5E&%2A%28%29_+@test-redis-redis:6379/0"
140 ),
141 )
143 self.assert_broker_url_env(k8s_obj_by_key)
145 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
146 def test_redis_by_chart_password_secret_name_missing_broker_url_secret_name(self, executor):
147 with pytest.raises(CalledProcessError):
148 render_chart(
149 RELEASE_NAME_REDIS,
150 {
151 "executor": executor,
152 "redis": {
153 "enabled": True,
154 "passwordSecretName": "test-redis-password-secret-name",
155 },
156 },
157 )
159 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
160 def test_redis_by_chart_password_secret_name(self, executor):
161 expected_broker_url_secret_name = "test-redis-broker-url-secret-name"
162 k8s_objects = render_chart(
163 RELEASE_NAME_REDIS,
164 {
165 "executor": executor,
166 "networkPolicies": {"enabled": True},
167 "data": {"brokerUrlSecretName": expected_broker_url_secret_name},
168 "redis": {
169 "enabled": True,
170 "passwordSecretName": "test-redis-password-secret-name",
171 },
172 },
173 )
174 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
176 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
177 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS - {
178 REDIS_OBJECTS["SECRET_PASSWORD"],
179 REDIS_OBJECTS["SECRET_BROKER_URL"],
180 }
182 self.assert_password_and_broker_url_secrets(
183 k8s_obj_by_key, expected_password_match=None, expected_broker_url_match=None
184 )
186 self.assert_broker_url_env(k8s_obj_by_key, expected_broker_url_secret_name)
188 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
189 def test_external_redis_broker_url(self, executor):
190 k8s_objects = render_chart(
191 RELEASE_NAME_REDIS,
192 {
193 "executor": executor,
194 "networkPolicies": {"enabled": True},
195 "data": {
196 "brokerUrl": "redis://redis-user:password@redis-host:6379/0",
197 },
198 "redis": {"enabled": False},
199 },
200 )
201 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
203 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
204 assert created_redis_objects == {REDIS_OBJECTS["SECRET_BROKER_URL"]}
206 self.assert_password_and_broker_url_secrets(
207 k8s_obj_by_key,
208 expected_password_match=None,
209 expected_broker_url_match="redis://redis-user:password@redis-host:6379/0",
210 )
212 self.assert_broker_url_env(k8s_obj_by_key)
214 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS)
215 def test_external_redis_broker_url_secret_name(self, executor):
216 expected_broker_url_secret_name = "redis-broker-url-secret-name"
217 k8s_objects = render_chart(
218 RELEASE_NAME_REDIS,
219 {
220 "executor": executor,
221 "networkPolicies": {"enabled": True},
222 "data": {"brokerUrlSecretName": expected_broker_url_secret_name},
223 "redis": {"enabled": False},
224 },
225 )
226 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects)
228 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys())
229 assert created_redis_objects == set()
231 self.assert_password_and_broker_url_secrets(
232 k8s_obj_by_key, expected_password_match=None, expected_broker_url_match=None
233 )
235 self.assert_broker_url_env(k8s_obj_by_key, expected_broker_url_secret_name)
237 def test_default_redis_secrets_created_with_non_celery_executor(self):
238 # We want to make sure default redis secrets (if needed) are still
239 # created during install, as they are marked "pre-install".
240 # See note in templates/secrets/redis-secrets.yaml for more.
241 docs = render_chart(
242 values={"executor": "KubernetesExecutor"}, show_only=["templates/secrets/redis-secrets.yaml"]
243 )
244 assert 2 == len(docs)
246 def test_scheduler_name(self):
247 docs = render_chart(
248 values={"schedulerName": "airflow-scheduler"},
249 show_only=["templates/redis/redis-statefulset.yaml"],
250 )
252 assert "airflow-scheduler" == jmespath.search(
253 "spec.template.spec.schedulerName",
254 docs[0],
255 )
257 def test_should_create_valid_affinity_tolerations_and_node_selector(self):
258 docs = render_chart(
259 values={
260 "executor": "CeleryExecutor",
261 "redis": {
262 "affinity": {
263 "nodeAffinity": {
264 "requiredDuringSchedulingIgnoredDuringExecution": {
265 "nodeSelectorTerms": [
266 {
267 "matchExpressions": [
268 {"key": "foo", "operator": "In", "values": ["true"]},
269 ]
270 }
271 ]
272 }
273 }
274 },
275 "tolerations": [
276 {"key": "dynamic-pods", "operator": "Equal", "value": "true", "effect": "NoSchedule"}
277 ],
278 "nodeSelector": {"diskType": "ssd"},
279 },
280 },
281 show_only=["templates/redis/redis-statefulset.yaml"],
282 )
284 assert "StatefulSet" == jmespath.search("kind", docs[0])
285 assert "foo" == jmespath.search(
286 "spec.template.spec.affinity.nodeAffinity."
287 "requiredDuringSchedulingIgnoredDuringExecution."
288 "nodeSelectorTerms[0]."
289 "matchExpressions[0]."
290 "key",
291 docs[0],
292 )
293 assert "ssd" == jmespath.search(
294 "spec.template.spec.nodeSelector.diskType",
295 docs[0],
296 )
297 assert "dynamic-pods" == jmespath.search(
298 "spec.template.spec.tolerations[0].key",
299 docs[0],
300 )
302 def test_redis_resources_are_configurable(self):
303 docs = render_chart(
304 values={
305 "redis": {
306 "resources": {
307 "limits": {"cpu": "200m", "memory": "128Mi"},
308 "requests": {"cpu": "300m", "memory": "169Mi"},
309 }
310 },
311 },
312 show_only=["templates/redis/redis-statefulset.yaml"],
313 )
314 assert "128Mi" == jmespath.search("spec.template.spec.containers[0].resources.limits.memory", docs[0])
315 assert "169Mi" == jmespath.search(
316 "spec.template.spec.containers[0].resources.requests.memory", docs[0]
317 )
318 assert "300m" == jmespath.search("spec.template.spec.containers[0].resources.requests.cpu", docs[0])
320 def test_redis_resources_are_not_added_by_default(self):
321 docs = render_chart(
322 show_only=["templates/redis/redis-statefulset.yaml"],
323 )
324 assert jmespath.search("spec.template.spec.containers[0].resources", docs[0]) == {}
326 def test_should_set_correct_helm_hooks_weight(self):
327 docs = render_chart(
328 values={
329 "executor": "CeleryExecutor",
330 },
331 show_only=["templates/secrets/redis-secrets.yaml"],
332 )
333 annotations = jmespath.search("metadata.annotations", docs[0])
334 assert annotations["helm.sh/hook-weight"] == "0"
336 def test_persistence_volume_annotations(self):
337 docs = render_chart(
338 values={"redis": {"persistence": {"annotations": {"foo": "bar"}}}},
339 show_only=["templates/redis/redis-statefulset.yaml"],
340 )
341 assert {"foo": "bar"} == jmespath.search("spec.volumeClaimTemplates[0].metadata.annotations", docs[0])
343 @pytest.mark.parametrize(
344 "redis_values, expected",
345 [
346 ({"persistence": {"enabled": False}}, {"emptyDir": {}}),
347 (
348 {"persistence": {"enabled": False}, "emptyDirConfig": {"sizeLimit": "10Gi"}},
349 {"emptyDir": {"sizeLimit": "10Gi"}},
350 ),
351 ],
352 )
353 def test_should_use_empty_dir_on_persistence_disabled(self, redis_values, expected):
354 docs = render_chart(
355 values={"redis": redis_values},
356 show_only=["templates/redis/redis-statefulset.yaml"],
357 )
358 assert {"name": "redis-db", **expected} in jmespath.search("spec.template.spec.volumes", docs[0])
360 def test_priority_class_name(self):
361 docs = render_chart(
362 values={"redis": {"priorityClassName": "airflow-priority-class-name"}},
363 show_only=["templates/redis/redis-statefulset.yaml"],
364 )
366 assert "airflow-priority-class-name" == jmespath.search(
367 "spec.template.spec.priorityClassName",
368 docs[0],
369 )
371 def test_redis_template_storage_class_name(self):
372 docs = render_chart(
373 values={"redis": {"persistence": {"storageClassName": "{{ .Release.Name }}-storage-class"}}},
374 show_only=["templates/redis/redis-statefulset.yaml"],
375 )
376 assert "release-name-storage-class" == jmespath.search(
377 "spec.volumeClaimTemplates[0].spec.storageClassName", docs[0]
378 )
381class TestRedisServiceAccount:
382 """Tests redis service account."""
384 def test_default_automount_service_account_token(self):
385 docs = render_chart(
386 values={
387 "redis": {
388 "serviceAccount": {"create": True},
389 },
390 },
391 show_only=["templates/redis/redis-serviceaccount.yaml"],
392 )
393 assert jmespath.search("automountServiceAccountToken", docs[0]) is True
395 def test_overridden_automount_service_account_token(self):
396 docs = render_chart(
397 values={
398 "redis": {
399 "serviceAccount": {"create": True, "automountServiceAccountToken": False},
400 },
401 },
402 show_only=["templates/redis/redis-serviceaccount.yaml"],
403 )
404 assert jmespath.search("automountServiceAccountToken", docs[0]) is False