Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/waiter.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

185 statements  

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import logging 

14import time 

15from functools import partial 

16 

17import jmespath 

18 

19from botocore.context import with_current_context 

20from botocore.docs.docstring import WaiterDocstring 

21from botocore.useragent import register_feature_id 

22from botocore.utils import get_service_module_name 

23 

24from . import xform_name 

25from .exceptions import ClientError, WaiterConfigError, WaiterError 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30def create_waiter_with_client(waiter_name, waiter_model, client): 

31 """ 

32 

33 :type waiter_name: str 

34 :param waiter_name: The name of the waiter. The name should match 

35 the name (including the casing) of the key name in the waiter 

36 model file (typically this is CamelCasing). 

37 

38 :type waiter_model: botocore.waiter.WaiterModel 

39 :param waiter_model: The model for the waiter configuration. 

40 

41 :type client: botocore.client.BaseClient 

42 :param client: The botocore client associated with the service. 

43 

44 :rtype: botocore.waiter.Waiter 

45 :return: The waiter object. 

46 

47 """ 

48 single_waiter_config = waiter_model.get_waiter(waiter_name) 

49 operation_name = xform_name(single_waiter_config.operation) 

50 operation_method = NormalizedOperationMethod( 

51 getattr(client, operation_name) 

52 ) 

53 

54 # Create a new wait method that will serve as a proxy to the underlying 

55 # Waiter.wait method. This is needed to attach a docstring to the 

56 # method. 

57 def wait(self, **kwargs): 

58 Waiter.wait(self, **kwargs) 

59 

60 wait.__doc__ = WaiterDocstring( 

61 waiter_name=waiter_name, 

62 event_emitter=client.meta.events, 

63 service_model=client.meta.service_model, 

64 service_waiter_model=waiter_model, 

65 include_signature=False, 

66 ) 

67 

68 # Rename the waiter class based on the type of waiter. 

69 waiter_class_name = str( 

70 f'{get_service_module_name(client.meta.service_model)}.Waiter.{waiter_name}' 

71 ) 

72 

73 # Create the new waiter class 

74 documented_waiter_cls = type(waiter_class_name, (Waiter,), {'wait': wait}) 

75 

76 # Return an instance of the new waiter class. 

77 return documented_waiter_cls( 

78 waiter_name, single_waiter_config, operation_method 

79 ) 

80 

81 

82def is_valid_waiter_error(response): 

83 error = response.get('Error') 

84 if isinstance(error, dict) and 'Code' in error: 

85 return True 

86 return False 

87 

88 

89class NormalizedOperationMethod: 

90 def __init__(self, client_method): 

91 self._client_method = client_method 

92 

93 def __call__(self, **kwargs): 

94 try: 

95 return self._client_method(**kwargs) 

96 except ClientError as e: 

97 return e.response 

98 

99 

100class WaiterModel: 

101 SUPPORTED_VERSION = 2 

102 

103 def __init__(self, waiter_config): 

104 """ 

105 

106 Note that the WaiterModel takes ownership of the waiter_config. 

107 It may or may not mutate the waiter_config. If this is a concern, 

108 it is best to make a copy of the waiter config before passing it to 

109 the WaiterModel. 

110 

111 :type waiter_config: dict 

112 :param waiter_config: The loaded waiter config 

113 from the <service>*.waiters.json file. This can be 

114 obtained from a botocore Loader object as well. 

115 

116 """ 

117 self._waiter_config = waiter_config['waiters'] 

118 

119 # These are part of the public API. Changing these 

120 # will result in having to update the consuming code, 

121 # so don't change unless you really need to. 

122 version = waiter_config.get('version', 'unknown') 

123 self._verify_supported_version(version) 

124 self.version = version 

125 self.waiter_names = list(sorted(waiter_config['waiters'].keys())) 

126 

127 def _verify_supported_version(self, version): 

128 if version != self.SUPPORTED_VERSION: 

129 raise WaiterConfigError( 

130 error_msg=( 

131 "Unsupported waiter version, supported version " 

132 f"must be: {self.SUPPORTED_VERSION}, but version " 

133 f"of waiter config is: {version}" 

134 ) 

135 ) 

136 

137 def get_waiter(self, waiter_name): 

138 try: 

139 single_waiter_config = self._waiter_config[waiter_name] 

140 except KeyError: 

141 raise ValueError(f"Waiter does not exist: {waiter_name}") 

142 return SingleWaiterConfig(single_waiter_config) 

143 

144 

145class SingleWaiterConfig: 

146 """Represents the waiter configuration for a single waiter. 

147 

148 A single waiter is considered the configuration for a single 

149 value associated with a named waiter (i.e TableExists). 

150 

151 """ 

152 

153 def __init__(self, single_waiter_config): 

154 self._config = single_waiter_config 

155 

156 # These attributes are part of the public API. 

157 self.description = single_waiter_config.get('description', '') 

158 # Per the spec, these three fields are required. 

159 self.operation = single_waiter_config['operation'] 

160 self.delay = single_waiter_config['delay'] 

161 self.max_attempts = single_waiter_config['maxAttempts'] 

162 

163 @property 

164 def acceptors(self): 

165 acceptors = [] 

166 for acceptor_config in self._config['acceptors']: 

167 acceptor = AcceptorConfig(acceptor_config) 

168 acceptors.append(acceptor) 

169 return acceptors 

170 

171 

172class AcceptorConfig: 

173 def __init__(self, config): 

174 self.state = config['state'] 

175 self.matcher = config['matcher'] 

176 self.expected = config['expected'] 

177 self.argument = config.get('argument') 

178 self.matcher_func = self._create_matcher_func() 

179 

180 @property 

181 def explanation(self): 

182 if self.matcher == 'path': 

183 return f'For expression "{self.argument}" we matched expected path: "{self.expected}"' 

184 elif self.matcher == 'pathAll': 

185 return ( 

186 f'For expression "{self.argument}" all members matched ' 

187 f'expected path: "{self.expected}"' 

188 ) 

189 elif self.matcher == 'pathAny': 

190 return ( 

191 f'For expression "{self.argument}" we matched expected ' 

192 f'path: "{self.expected}" at least once' 

193 ) 

194 elif self.matcher == 'status': 

195 return f'Matched expected HTTP status code: {self.expected}' 

196 elif self.matcher == 'error': 

197 return f'Matched expected service error code: {self.expected}' 

198 else: 

199 return f'No explanation for unknown waiter type: "{self.matcher}"' 

200 

201 def _create_matcher_func(self): 

202 # An acceptor function is a callable that takes a single value. The 

203 # parsed AWS response. Note that the parsed error response is also 

204 # provided in the case of errors, so it's entirely possible to 

205 # handle all the available matcher capabilities in the future. 

206 # There's only three supported matchers, so for now, this is all 

207 # contained to a single method. If this grows, we can expand this 

208 # out to separate methods or even objects. 

209 

210 if self.matcher == 'path': 

211 return self._create_path_matcher() 

212 elif self.matcher == 'pathAll': 

213 return self._create_path_all_matcher() 

214 elif self.matcher == 'pathAny': 

215 return self._create_path_any_matcher() 

216 elif self.matcher == 'status': 

217 return self._create_status_matcher() 

218 elif self.matcher == 'error': 

219 return self._create_error_matcher() 

220 else: 

221 raise WaiterConfigError( 

222 error_msg=f"Unknown acceptor: {self.matcher}" 

223 ) 

224 

225 def _create_path_matcher(self): 

226 expression = jmespath.compile(self.argument) 

227 expected = self.expected 

228 

229 def acceptor_matches(response): 

230 if is_valid_waiter_error(response): 

231 return 

232 return expression.search(response) == expected 

233 

234 return acceptor_matches 

235 

236 def _create_path_all_matcher(self): 

237 expression = jmespath.compile(self.argument) 

238 expected = self.expected 

239 

240 def acceptor_matches(response): 

241 if is_valid_waiter_error(response): 

242 return 

243 result = expression.search(response) 

244 if not isinstance(result, list) or not result: 

245 # pathAll matcher must result in a list. 

246 # Also we require at least one element in the list, 

247 # that is, an empty list should not result in this 

248 # acceptor match. 

249 return False 

250 for element in result: 

251 if element != expected: 

252 return False 

253 return True 

254 

255 return acceptor_matches 

256 

257 def _create_path_any_matcher(self): 

258 expression = jmespath.compile(self.argument) 

259 expected = self.expected 

260 

261 def acceptor_matches(response): 

262 if is_valid_waiter_error(response): 

263 return 

264 result = expression.search(response) 

265 if not isinstance(result, list) or not result: 

266 # pathAny matcher must result in a list. 

267 # Also we require at least one element in the list, 

268 # that is, an empty list should not result in this 

269 # acceptor match. 

270 return False 

271 for element in result: 

272 if element == expected: 

273 return True 

274 return False 

275 

276 return acceptor_matches 

277 

278 def _create_status_matcher(self): 

279 expected = self.expected 

280 

281 def acceptor_matches(response): 

282 # We don't have any requirements on the expected incoming data 

283 # other than it is a dict, so we don't assume there's 

284 # a ResponseMetadata.HTTPStatusCode. 

285 status_code = response.get('ResponseMetadata', {}).get( 

286 'HTTPStatusCode' 

287 ) 

288 return status_code == expected 

289 

290 return acceptor_matches 

291 

292 def _create_error_matcher(self): 

293 expected = self.expected 

294 

295 def acceptor_matches(response): 

296 # When the client encounters an error, it will normally raise 

297 # an exception. However, the waiter implementation will catch 

298 # this exception, and instead send us the parsed error 

299 # response. So response is still a dictionary, and in the case 

300 # of an error response will contain the "Error" and 

301 # "ResponseMetadata" key. 

302 # When expected is True, accept any error code. 

303 # When expected is False, check if any errors were encountered. 

304 # Otherwise, check for a specific AWS error code. 

305 if expected is True: 

306 return "Error" in response and "Code" in response["Error"] 

307 elif expected is False: 

308 return "Error" not in response 

309 else: 

310 return response.get("Error", {}).get("Code", "") == expected 

311 

312 return acceptor_matches 

313 

314 

315class Waiter: 

316 def __init__(self, name, config, operation_method): 

317 """ 

318 

319 :type name: string 

320 :param name: The name of the waiter 

321 

322 :type config: botocore.waiter.SingleWaiterConfig 

323 :param config: The configuration for the waiter. 

324 

325 :type operation_method: callable 

326 :param operation_method: A callable that accepts **kwargs 

327 and returns a response. For example, this can be 

328 a method from a botocore client. 

329 

330 """ 

331 self._operation_method = operation_method 

332 # The two attributes are exposed to allow for introspection 

333 # and documentation. 

334 self.name = name 

335 self.config = config 

336 

337 @with_current_context(partial(register_feature_id, 'WAITER')) 

338 def wait(self, **kwargs): 

339 acceptors = list(self.config.acceptors) 

340 current_state = 'waiting' 

341 # pop the invocation specific config 

342 config = kwargs.pop('WaiterConfig', {}) 

343 sleep_amount = config.get('Delay', self.config.delay) 

344 max_attempts = config.get('MaxAttempts', self.config.max_attempts) 

345 last_matched_acceptor = None 

346 num_attempts = 0 

347 

348 while True: 

349 response = self._operation_method(**kwargs) 

350 num_attempts += 1 

351 for acceptor in acceptors: 

352 if acceptor.matcher_func(response): 

353 last_matched_acceptor = acceptor 

354 current_state = acceptor.state 

355 break 

356 else: 

357 # If none of the acceptors matched, we should 

358 # transition to the failure state if an error 

359 # response was received. 

360 if is_valid_waiter_error(response): 

361 # Transition to a failure state, which we 

362 # can just handle here by raising an exception. 

363 raise WaiterError( 

364 name=self.name, 

365 reason='An error occurred ({}): {}'.format( 

366 response['Error'].get('Code', 'Unknown'), 

367 response['Error'].get('Message', 'Unknown'), 

368 ), 

369 last_response=response, 

370 ) 

371 if current_state == 'success': 

372 logger.debug( 

373 "Waiting complete, waiter matched the success state." 

374 ) 

375 return 

376 if current_state == 'failure': 

377 reason = f'Waiter encountered a terminal failure state: {acceptor.explanation}' 

378 raise WaiterError( 

379 name=self.name, 

380 reason=reason, 

381 last_response=response, 

382 ) 

383 if num_attempts >= max_attempts: 

384 if last_matched_acceptor is None: 

385 reason = 'Max attempts exceeded' 

386 else: 

387 reason = ( 

388 f'Max attempts exceeded. Previously accepted state: ' 

389 f'{acceptor.explanation}' 

390 ) 

391 raise WaiterError( 

392 name=self.name, 

393 reason=reason, 

394 last_response=response, 

395 ) 

396 time.sleep(sleep_amount)