Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/tests/charts/test_redis.py: 12%

112 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import re 

20from base64 import b64decode 

21from subprocess import CalledProcessError 

22 

23import jmespath 

24import pytest 

25 

26from tests.charts.helm_template_generator import prepare_k8s_lookup_dict, render_chart 

27 

28RELEASE_NAME_REDIS = "test-redis" 

29 

30REDIS_OBJECTS = { 

31 "NETWORK_POLICY": ("NetworkPolicy", f"{RELEASE_NAME_REDIS}-redis-policy"), 

32 "SERVICE": ("Service", f"{RELEASE_NAME_REDIS}-redis"), 

33 "STATEFUL_SET": ("StatefulSet", f"{RELEASE_NAME_REDIS}-redis"), 

34 "SECRET_PASSWORD": ("Secret", f"{RELEASE_NAME_REDIS}-redis-password"), 

35 "SECRET_BROKER_URL": ("Secret", f"{RELEASE_NAME_REDIS}-broker-url"), 

36} 

37SET_POSSIBLE_REDIS_OBJECT_KEYS = set(REDIS_OBJECTS.values()) 

38 

39CELERY_EXECUTORS_PARAMS = ["CeleryExecutor", "CeleryKubernetesExecutor"] 

40 

41 

42class TestRedis: 

43 @staticmethod 

44 def get_broker_url_in_broker_url_secret(k8s_obj_by_key): 

45 broker_url_in_obj = b64decode( 

46 k8s_obj_by_key[REDIS_OBJECTS["SECRET_BROKER_URL"]]["data"]["connection"] 

47 ).decode("utf-8") 

48 return broker_url_in_obj 

49 

50 @staticmethod 

51 def get_redis_password_in_password_secret(k8s_obj_by_key): 

52 password_in_obj = b64decode( 

53 k8s_obj_by_key[REDIS_OBJECTS["SECRET_PASSWORD"]]["data"]["password"] 

54 ).decode("utf-8") 

55 return password_in_obj 

56 

57 @staticmethod 

58 def get_broker_url_secret_in_deployment(k8s_obj_by_key, kind: str, name: str) -> str: 

59 deployment_obj = k8s_obj_by_key[(kind, f"{RELEASE_NAME_REDIS}-{name}")] 

60 containers = deployment_obj["spec"]["template"]["spec"]["containers"] 

61 container = next(obj for obj in containers if obj["name"] == name) 

62 

63 envs = container["env"] 

64 env = next(obj for obj in envs if obj["name"] == "AIRFLOW__CELERY__BROKER_URL") 

65 return env["valueFrom"]["secretKeyRef"]["name"] 

66 

67 def assert_password_and_broker_url_secrets( 

68 self, k8s_obj_by_key, expected_password_match: str | None, expected_broker_url_match: str | None 

69 ): 

70 if expected_password_match is not None: 

71 redis_password_in_password_secret = self.get_redis_password_in_password_secret(k8s_obj_by_key) 

72 assert re.search(expected_password_match, redis_password_in_password_secret) 

73 else: 

74 assert REDIS_OBJECTS["SECRET_PASSWORD"] not in k8s_obj_by_key.keys() 

75 

76 if expected_broker_url_match is not None: 

77 # assert redis broker url in secret 

78 broker_url_in_broker_url_secret = self.get_broker_url_in_broker_url_secret(k8s_obj_by_key) 

79 assert re.search(expected_broker_url_match, broker_url_in_broker_url_secret) 

80 else: 

81 assert REDIS_OBJECTS["SECRET_BROKER_URL"] not in k8s_obj_by_key.keys() 

82 

83 def assert_broker_url_env( 

84 self, k8s_obj_by_key, expected_broker_url_secret_name=REDIS_OBJECTS["SECRET_BROKER_URL"][1] 

85 ): 

86 broker_url_secret_in_scheduler = self.get_broker_url_secret_in_deployment( 

87 k8s_obj_by_key, "StatefulSet", "worker" 

88 ) 

89 assert broker_url_secret_in_scheduler == expected_broker_url_secret_name 

90 broker_url_secret_in_worker = self.get_broker_url_secret_in_deployment( 

91 k8s_obj_by_key, "Deployment", "scheduler" 

92 ) 

93 assert broker_url_secret_in_worker == expected_broker_url_secret_name 

94 

95 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

96 def test_redis_by_chart_default(self, executor): 

97 k8s_objects = render_chart( 

98 RELEASE_NAME_REDIS, 

99 { 

100 "executor": executor, 

101 "networkPolicies": {"enabled": True}, 

102 "redis": {"enabled": True}, 

103 }, 

104 ) 

105 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

106 

107 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

108 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS 

109 

110 self.assert_password_and_broker_url_secrets( 

111 k8s_obj_by_key, 

112 expected_password_match=r"\w+", 

113 expected_broker_url_match=rf"redis://:.+@{RELEASE_NAME_REDIS}-redis:6379/0", 

114 ) 

115 

116 self.assert_broker_url_env(k8s_obj_by_key) 

117 

118 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

119 def test_redis_by_chart_password(self, executor): 

120 k8s_objects = render_chart( 

121 RELEASE_NAME_REDIS, 

122 { 

123 "executor": executor, 

124 "networkPolicies": {"enabled": True}, 

125 "redis": {"enabled": True, "password": "test-redis-password!@#$%^&*()_+"}, 

126 }, 

127 ) 

128 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

129 

130 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

131 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS 

132 

133 self.assert_password_and_broker_url_secrets( 

134 k8s_obj_by_key, 

135 expected_password_match="test-redis-password", 

136 expected_broker_url_match=re.escape( 

137 "redis://:test-redis-password%21%40%23$%25%5E&%2A%28%29_+@test-redis-redis:6379/0" 

138 ), 

139 ) 

140 

141 self.assert_broker_url_env(k8s_obj_by_key) 

142 

143 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

144 def test_redis_by_chart_password_secret_name_missing_broker_url_secret_name(self, executor): 

145 with pytest.raises(CalledProcessError): 

146 render_chart( 

147 RELEASE_NAME_REDIS, 

148 { 

149 "executor": executor, 

150 "redis": { 

151 "enabled": True, 

152 "passwordSecretName": "test-redis-password-secret-name", 

153 }, 

154 }, 

155 ) 

156 

157 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

158 def test_redis_by_chart_password_secret_name(self, executor): 

159 expected_broker_url_secret_name = "test-redis-broker-url-secret-name" 

160 k8s_objects = render_chart( 

161 RELEASE_NAME_REDIS, 

162 { 

163 "executor": executor, 

164 "networkPolicies": {"enabled": True}, 

165 "data": {"brokerUrlSecretName": expected_broker_url_secret_name}, 

166 "redis": { 

167 "enabled": True, 

168 "passwordSecretName": "test-redis-password-secret-name", 

169 }, 

170 }, 

171 ) 

172 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

173 

174 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

175 assert created_redis_objects == SET_POSSIBLE_REDIS_OBJECT_KEYS - { 

176 REDIS_OBJECTS["SECRET_PASSWORD"], 

177 REDIS_OBJECTS["SECRET_BROKER_URL"], 

178 } 

179 

180 self.assert_password_and_broker_url_secrets( 

181 k8s_obj_by_key, expected_password_match=None, expected_broker_url_match=None 

182 ) 

183 

184 self.assert_broker_url_env(k8s_obj_by_key, expected_broker_url_secret_name) 

185 

186 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

187 def test_external_redis_broker_url(self, executor): 

188 k8s_objects = render_chart( 

189 RELEASE_NAME_REDIS, 

190 { 

191 "executor": executor, 

192 "networkPolicies": {"enabled": True}, 

193 "data": { 

194 "brokerUrl": "redis://redis-user:password@redis-host:6379/0", 

195 }, 

196 "redis": {"enabled": False}, 

197 }, 

198 ) 

199 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

200 

201 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

202 assert created_redis_objects == {REDIS_OBJECTS["SECRET_BROKER_URL"]} 

203 

204 self.assert_password_and_broker_url_secrets( 

205 k8s_obj_by_key, 

206 expected_password_match=None, 

207 expected_broker_url_match="redis://redis-user:password@redis-host:6379/0", 

208 ) 

209 

210 self.assert_broker_url_env(k8s_obj_by_key) 

211 

212 @pytest.mark.parametrize("executor", CELERY_EXECUTORS_PARAMS) 

213 def test_external_redis_broker_url_secret_name(self, executor): 

214 expected_broker_url_secret_name = "redis-broker-url-secret-name" 

215 k8s_objects = render_chart( 

216 RELEASE_NAME_REDIS, 

217 { 

218 "executor": executor, 

219 "networkPolicies": {"enabled": True}, 

220 "data": {"brokerUrlSecretName": expected_broker_url_secret_name}, 

221 "redis": {"enabled": False}, 

222 }, 

223 ) 

224 k8s_obj_by_key = prepare_k8s_lookup_dict(k8s_objects) 

225 

226 created_redis_objects = SET_POSSIBLE_REDIS_OBJECT_KEYS & set(k8s_obj_by_key.keys()) 

227 assert created_redis_objects == set() 

228 

229 self.assert_password_and_broker_url_secrets( 

230 k8s_obj_by_key, expected_password_match=None, expected_broker_url_match=None 

231 ) 

232 

233 self.assert_broker_url_env(k8s_obj_by_key, expected_broker_url_secret_name) 

234 

235 def test_default_redis_secrets_created_with_non_celery_executor(self): 

236 # We want to make sure default redis secrets (if needed) are still 

237 # created during install, as they are marked "pre-install". 

238 # See note in templates/secrets/redis-secrets.yaml for more. 

239 docs = render_chart( 

240 values={"executor": "KubernetesExecutor"}, show_only=["templates/secrets/redis-secrets.yaml"] 

241 ) 

242 assert 2 == len(docs) 

243 

244 def test_should_create_valid_affinity_tolerations_and_node_selector(self): 

245 docs = render_chart( 

246 values={ 

247 "executor": "CeleryExecutor", 

248 "redis": { 

249 "affinity": { 

250 "nodeAffinity": { 

251 "requiredDuringSchedulingIgnoredDuringExecution": { 

252 "nodeSelectorTerms": [ 

253 { 

254 "matchExpressions": [ 

255 {"key": "foo", "operator": "In", "values": ["true"]}, 

256 ] 

257 } 

258 ] 

259 } 

260 } 

261 }, 

262 "tolerations": [ 

263 {"key": "dynamic-pods", "operator": "Equal", "value": "true", "effect": "NoSchedule"} 

264 ], 

265 "nodeSelector": {"diskType": "ssd"}, 

266 }, 

267 }, 

268 show_only=["templates/redis/redis-statefulset.yaml"], 

269 ) 

270 

271 assert "StatefulSet" == jmespath.search("kind", docs[0]) 

272 assert "foo" == jmespath.search( 

273 "spec.template.spec.affinity.nodeAffinity." 

274 "requiredDuringSchedulingIgnoredDuringExecution." 

275 "nodeSelectorTerms[0]." 

276 "matchExpressions[0]." 

277 "key", 

278 docs[0], 

279 ) 

280 assert "ssd" == jmespath.search( 

281 "spec.template.spec.nodeSelector.diskType", 

282 docs[0], 

283 ) 

284 assert "dynamic-pods" == jmespath.search( 

285 "spec.template.spec.tolerations[0].key", 

286 docs[0], 

287 ) 

288 

289 def test_redis_resources_are_configurable(self): 

290 docs = render_chart( 

291 values={ 

292 "redis": { 

293 "resources": { 

294 "limits": {"cpu": "200m", "memory": "128Mi"}, 

295 "requests": {"cpu": "300m", "memory": "169Mi"}, 

296 } 

297 }, 

298 }, 

299 show_only=["templates/redis/redis-statefulset.yaml"], 

300 ) 

301 assert "128Mi" == jmespath.search("spec.template.spec.containers[0].resources.limits.memory", docs[0]) 

302 assert "169Mi" == jmespath.search( 

303 "spec.template.spec.containers[0].resources.requests.memory", docs[0] 

304 ) 

305 assert "300m" == jmespath.search("spec.template.spec.containers[0].resources.requests.cpu", docs[0]) 

306 

307 def test_redis_resources_are_not_added_by_default(self): 

308 docs = render_chart( 

309 show_only=["templates/redis/redis-statefulset.yaml"], 

310 ) 

311 assert jmespath.search("spec.template.spec.containers[0].resources", docs[0]) == {} 

312 

313 def test_should_set_correct_helm_hooks_weight(self): 

314 docs = render_chart( 

315 values={ 

316 "executor": "CeleryExecutor", 

317 }, 

318 show_only=["templates/secrets/redis-secrets.yaml"], 

319 ) 

320 annotations = jmespath.search("metadata.annotations", docs[0]) 

321 assert annotations["helm.sh/hook-weight"] == "0" 

322 

323 def test_persistence_volume_annotations(self): 

324 docs = render_chart( 

325 values={"redis": {"persistence": {"annotations": {"foo": "bar"}}}}, 

326 show_only=["templates/redis/redis-statefulset.yaml"], 

327 ) 

328 assert {"foo": "bar"} == jmespath.search("spec.volumeClaimTemplates[0].metadata.annotations", docs[0])