Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/helm_tests/other/test_redis.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

132 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import re 

20from base64 import b64decode 

21from subprocess import CalledProcessError 

22 

23import jmespath 

24import pytest 

25 

26from tests.charts.helm_template_generator import prepare_k8s_lookup_dict, render_chart 

27 

28RELEASE_NAME_REDIS = "test-redis" 

29 

30REDIS_OBJECTS = { 

31 "NETWORK_POLICY": ("NetworkPolicy", f"{RELEASE_NAME_REDIS}-redis-policy"), 

32 "SERVICE": ("Service", f"{RELEASE_NAME_REDIS}-redis"), 

33 "STATEFUL_SET": ("StatefulSet", f"{RELEASE_NAME_REDIS}-redis"), 

34 "SECRET_PASSWORD": ("Secret", f"{RELEASE_NAME_REDIS}-redis-password"), 

35 "SECRET_BROKER_URL": ("Secret", f"{RELEASE_NAME_REDIS}-broker-url"), 

36} 

37SET_POSSIBLE_REDIS_OBJECT_KEYS = set(REDIS_OBJECTS.values()) 

38 

39CELERY_EXECUTORS_PARAMS = ["CeleryExecutor", "CeleryKubernetesExecutor"] 

40 

41 

42class TestRedis: 

43 """Tests redis.""" 

44 

45 @staticmethod 

46 def get_broker_url_in_broker_url_secret(k8s_obj_by_key): 

47 broker_url_in_obj = b64decode( 

48 k8s_obj_by_key[REDIS_OBJECTS["SECRET_BROKER_URL"]]["data"]["connection"] 

49 ).decode("utf-8") 

50 return broker_url_in_obj 

51 

52 @staticmethod 

53 def get_redis_password_in_password_secret(k8s_obj_by_key): 

54 password_in_obj = b64decode( 

55 k8s_obj_by_key[REDIS_OBJECTS["SECRET_PASSWORD"]]["data"]["password"] 

56 ).decode("utf-8") 

57 return password_in_obj 

58 

59 @staticmethod 

60 def get_broker_url_secret_in_deployment(k8s_obj_by_key, kind: str, name: str) -> str: 

61 deployment_obj = k8s_obj_by_key[(kind, f"{RELEASE_NAME_REDIS}-{name}")] 

62 containers = deployment_obj["spec"]["template"]["spec"]["containers"] 

63 container = next(obj for obj in containers if obj["name"] == name) 

64 

65 envs = container["env"] 

66 env = next(obj for obj in envs if obj["name"] == "AIRFLOW__CELERY__BROKER_URL") 

67 return env["valueFrom"]["secretKeyRef"]["name"] 

68 

69 def assert_password_and_broker_url_secrets( 

70 self, k8s_obj_by_key, expected_password_match: str | None, expected_broker_url_match: str | None 

71 ): 

72 if expected_password_match is not None: 

73 redis_password_in_password_secret = self.get_redis_password_in_password_secret(k8s_obj_by_key) 

74 assert re.search(expected_password_match, redis_password_in_password_secret) 

75 else: 

76 assert REDIS_OBJECTS["SECRET_PASSWORD"] not in k8s_obj_by_key.keys() 

77 

78 if expected_broker_url_match is not None: 

79 # assert redis broker url in secret 

80 broker_url_in_broker_url_secret = self.get_broker_url_in_broker_url_secret(k8s_obj_by_key) 

81 assert re.search(expected_broker_url_match, broker_url_in_broker_url_secret) 

82 else: 

83 assert REDIS_OBJECTS["SECRET_BROKER_URL"] not in k8s_obj_by_key.keys() 

84 

85 def assert_broker_url_env( 

86 self, k8s_obj_by_key, expected_broker_url_secret_name=REDIS_OBJECTS["SECRET_BROKER_URL"][1] 

87 ): 

88 broker_url_secret_in_scheduler = self.get_broker_url_secret_in_deployment( 

89 k8s_obj_by_key, "StatefulSet", "worker" 

90 ) 

91 assert broker_url_secret_in_scheduler == expected_broker_url_secret_name 

92 broker_url_secret_in_worker = self.get_broker_url_secret_in_deployment( 

93 k8s_obj_by_key, "Deployment", "scheduler" 

94 ) 

95 assert broker_url_secret_in_worker == expected_broker_url_secret_name 

96 

97 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

98 def test_redis_by_chart_default(self, executor): 

99 k8s_objects = render_chart( 

100 RELEASE_NAME_REDIS, 

101 { 

102 "executor": executor, 

103 "networkPolicies": {"enabled": True}, 

104 "redis": {"enabled": True}, 

105 }, 

106 ) 

107 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

108 

109 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

110 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS 

111 

112 self.assert_password_and_broker_url_secrets( 

113 k8s_obj_by_key, 

114 expected_password_match=r"\w+", 

115 expected_broker_url_match=rf"redis://:.+@{RELEASE_NAME_REDIS}-redis:6379/0", 

116 ) 

117 

118 self.assert_broker_url_env(k8s_obj_by_key) 

119 

120 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

121 def test_redis_by_chart_password(self, executor): 

122 k8s_objects = render_chart( 

123 RELEASE_NAME_REDIS, 

124 { 

125 "executor": executor, 

126 "networkPolicies": {"enabled": True}, 

127 "redis": {"enabled": True, "password": "test-redis-password!@#$%^&*()_+"}, 

128 }, 

129 ) 

130 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

131 

132 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

133 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS 

134 

135 self.assert_password_and_broker_url_secrets( 

136 k8s_obj_by_key, 

137 expected_password_match="test-redis-password", 

138 expected_broker_url_match=re.escape( 

139 "redis://:test-redis-password%21%40%23$%25%5E&%2A%28%29_+@test-redis-redis:6379/0" 

140 ), 

141 ) 

142 

143 self.assert_broker_url_env(k8s_obj_by_key) 

144 

145 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

146 def test_redis_by_chart_password_secret_name_missing_broker_url_secret_name(self, executor): 

147 with pytest.raises(CalledProcessError): 

148 render_chart( 

149 RELEASE_NAME_REDIS, 

150 { 

151 "executor": executor, 

152 "redis": { 

153 "enabled": True, 

154 "passwordSecretName": "test-redis-password-secret-name", 

155 }, 

156 }, 

157 ) 

158 

159 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

160 def test_redis_by_chart_password_secret_name(self, executor): 

161 expected_broker_url_secret_name = "test-redis-broker-url-secret-name" 

162 k8s_objects = render_chart( 

163 RELEASE_NAME_REDIS, 

164 { 

165 "executor": executor, 

166 "networkPolicies": {"enabled": True}, 

167 "data": {"brokerUrlSecretName": expected_broker_url_secret_name}, 

168 "redis": { 

169 "enabled": True, 

170 "passwordSecretName": "test-redis-password-secret-name", 

171 }, 

172 }, 

173 ) 

174 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

175 

176 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

177 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS - { 

178 REDIS_OBJECTS["SECRET_PASSWORD"], 

179 REDIS_OBJECTS["SECRET_BROKER_URL"], 

180 } 

181 

182 self.assert_password_and_broker_url_secrets( 

183 k8s_obj_by_key, expected_password_match=None, expected_broker_url_match=None 

184 ) 

185 

186 self.assert_broker_url_env(k8s_obj_by_key, expected_broker_url_secret_name) 

187 

188 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

189 def test_external_redis_broker_url(self, executor): 

190 k8s_objects = render_chart( 

191 RELEASE_NAME_REDIS, 

192 { 

193 "executor": executor, 

194 "networkPolicies": {"enabled": True}, 

195 "data": { 

196 "brokerUrl": "redis://redis-user:password@redis-host:6379/0", 

197 }, 

198 "redis": {"enabled": False}, 

199 }, 

200 ) 

201 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

202 

203 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

204 assert created_redis_objects == {REDIS_OBJECTS["SECRET_BROKER_URL"]} 

205 

206 self.assert_password_and_broker_url_secrets( 

207 k8s_obj_by_key, 

208 expected_password_match=None, 

209 expected_broker_url_match="redis://redis-user:password@redis-host:6379/0", 

210 ) 

211 

212 self.assert_broker_url_env(k8s_obj_by_key) 

213 

214 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

215 def test_external_redis_broker_url_secret_name(self, executor): 

216 expected_broker_url_secret_name = "redis-broker-url-secret-name" 

217 k8s_objects = render_chart( 

218 RELEASE_NAME_REDIS, 

219 { 

220 "executor": executor, 

221 "networkPolicies": {"enabled": True}, 

222 "data": {"brokerUrlSecretName": expected_broker_url_secret_name}, 

223 "redis": {"enabled": False}, 

224 }, 

225 ) 

226 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

227 

228 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

229 assert created_redis_objects == set() 

230 

231 self.assert_password_and_broker_url_secrets( 

232 k8s_obj_by_key, expected_password_match=None, expected_broker_url_match=None 

233 ) 

234 

235 self.assert_broker_url_env(k8s_obj_by_key, expected_broker_url_secret_name) 

236 

237 def test_default_redis_secrets_created_with_non_celery_executor(self): 

238 # We want to make sure default redis secrets (if needed) are still 

239 # created during install, as they are marked "pre-install". 

240 # See note in templates/secrets/redis-secrets.yaml for more. 

241 docs = render_chart( 

242 values={"executor": "KubernetesExecutor"}, show_only=["templates/secrets/redis-secrets.yaml"] 

243 ) 

244 assert 2 == len(docs) 

245 

246 def test_scheduler_name(self): 

247 docs = render_chart( 

248 values={"schedulerName": "airflow-scheduler"}, 

249 show_only=["templates/redis/redis-statefulset.yaml"], 

250 ) 

251 

252 assert "airflow-scheduler" == jmespath.search( 

253 "spec.template.spec.schedulerName", 

254 docs[0], 

255 ) 

256 

257 def test_should_create_valid_affinity_tolerations_and_node_selector(self): 

258 docs = render_chart( 

259 values={ 

260 "executor": "CeleryExecutor", 

261 "redis": { 

262 "affinity": { 

263 "nodeAffinity": { 

264 "requiredDuringSchedulingIgnoredDuringExecution": { 

265 "nodeSelectorTerms": [ 

266 { 

267 "matchExpressions": [ 

268 {"key": "foo", "operator": "In", "values": ["true"]}, 

269 ] 

270 } 

271 ] 

272 } 

273 } 

274 }, 

275 "tolerations": [ 

276 {"key": "dynamic-pods", "operator": "Equal", "value": "true", "effect": "NoSchedule"} 

277 ], 

278 "nodeSelector": {"diskType": "ssd"}, 

279 }, 

280 }, 

281 show_only=["templates/redis/redis-statefulset.yaml"], 

282 ) 

283 

284 assert "StatefulSet" == jmespath.search("kind", docs[0]) 

285 assert "foo" == jmespath.search( 

286 "spec.template.spec.affinity.nodeAffinity." 

287 "requiredDuringSchedulingIgnoredDuringExecution." 

288 "nodeSelectorTerms[0]." 

289 "matchExpressions[0]." 

290 "key", 

291 docs[0], 

292 ) 

293 assert "ssd" == jmespath.search( 

294 "spec.template.spec.nodeSelector.diskType", 

295 docs[0], 

296 ) 

297 assert "dynamic-pods" == jmespath.search( 

298 "spec.template.spec.tolerations[0].key", 

299 docs[0], 

300 ) 

301 

302 def test_redis_resources_are_configurable(self): 

303 docs = render_chart( 

304 values={ 

305 "redis": { 

306 "resources": { 

307 "limits": {"cpu": "200m", "memory": "128Mi"}, 

308 "requests": {"cpu": "300m", "memory": "169Mi"}, 

309 } 

310 }, 

311 }, 

312 show_only=["templates/redis/redis-statefulset.yaml"], 

313 ) 

314 assert "128Mi" == jmespath.search("spec.template.spec.containers[0].resources.limits.memory", docs[0]) 

315 assert "169Mi" == jmespath.search( 

316 "spec.template.spec.containers[0].resources.requests.memory", docs[0] 

317 ) 

318 assert "300m" == jmespath.search("spec.template.spec.containers[0].resources.requests.cpu", docs[0]) 

319 

320 def test_redis_resources_are_not_added_by_default(self): 

321 docs = render_chart( 

322 show_only=["templates/redis/redis-statefulset.yaml"], 

323 ) 

324 assert jmespath.search("spec.template.spec.containers[0].resources", docs[0]) == {} 

325 

326 def test_should_set_correct_helm_hooks_weight(self): 

327 docs = render_chart( 

328 values={ 

329 "executor": "CeleryExecutor", 

330 }, 

331 show_only=["templates/secrets/redis-secrets.yaml"], 

332 ) 

333 annotations = jmespath.search("metadata.annotations", docs[0]) 

334 assert annotations["helm.sh/hook-weight"] == "0" 

335 

336 def test_persistence_volume_annotations(self): 

337 docs = render_chart( 

338 values={"redis": {"persistence": {"annotations": {"foo": "bar"}}}}, 

339 show_only=["templates/redis/redis-statefulset.yaml"], 

340 ) 

341 assert {"foo": "bar"} == jmespath.search("spec.volumeClaimTemplates[0].metadata.annotations", docs[0]) 

342 

343 @pytest.mark.parametrize( 

344 "redis_values, expected", 

345 [ 

346 ({"persistence": {"enabled": False}}, {"emptyDir": {}}), 

347 ( 

348 {"persistence": {"enabled": False}, "emptyDirConfig": {"sizeLimit": "10Gi"}}, 

349 {"emptyDir": {"sizeLimit": "10Gi"}}, 

350 ), 

351 ], 

352 ) 

353 def test_should_use_empty_dir_on_persistence_disabled(self, redis_values, expected): 

354 docs = render_chart( 

355 values={"redis": redis_values}, 

356 show_only=["templates/redis/redis-statefulset.yaml"], 

357 ) 

358 assert {"name": "redis-db", **expected} in jmespath.search("spec.template.spec.volumes", docs[0]) 

359 

360 def test_priority_class_name(self): 

361 docs = render_chart( 

362 values={"redis": {"priorityClassName": "airflow-priority-class-name"}}, 

363 show_only=["templates/redis/redis-statefulset.yaml"], 

364 ) 

365 

366 assert "airflow-priority-class-name" == jmespath.search( 

367 "spec.template.spec.priorityClassName", 

368 docs[0], 

369 ) 

370 

371 def test_redis_template_storage_class_name(self): 

372 docs = render_chart( 

373 values={"redis": {"persistence": {"storageClassName": "{{ .Release.Name }}-storage-class"}}}, 

374 show_only=["templates/redis/redis-statefulset.yaml"], 

375 ) 

376 assert "release-name-storage-class" == jmespath.search( 

377 "spec.volumeClaimTemplates[0].spec.storageClassName", docs[0] 

378 ) 

379 

380 

381class TestRedisServiceAccount: 

382 """Tests redis service account.""" 

383 

384 def test_default_automount_service_account_token(self): 

385 docs = render_chart( 

386 values={ 

387 "redis": { 

388 "serviceAccount": {"create": True}, 

389 }, 

390 }, 

391 show_only=["templates/redis/redis-serviceaccount.yaml"], 

392 ) 

393 assert jmespath.search("automountServiceAccountToken", docs[0]) is True 

394 

395 def test_overridden_automount_service_account_token(self): 

396 docs = render_chart( 

397 values={ 

398 "redis": { 

399 "serviceAccount": {"create": True, "automountServiceAccountToken": False}, 

400 }, 

401 }, 

402 show_only=["templates/redis/redis-serviceaccount.yaml"], 

403 ) 

404 assert jmespath.search("automountServiceAccountToken", docs[0]) is False