Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/paginate.py: 17%

365 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13 

14import base64 

15import json 

16import logging 

17from itertools import tee 

18 

19import jmespath 

20 

21from botocore.exceptions import PaginationError 

22from botocore.utils import merge_dicts, set_value_from_jmespath 

23 

24log = logging.getLogger(__name__) 

25 

26 

27class TokenEncoder: 

28 """Encodes dictionaries into opaque strings. 

29 

30 This for the most part json dumps + base64 encoding, but also supports 

31 having bytes in the dictionary in addition to the types that json can 

32 handle by default. 

33 

34 This is intended for use in encoding pagination tokens, which in some 

35 cases can be complex structures and / or contain bytes. 

36 """ 

37 

38 def encode(self, token): 

39 """Encodes a dictionary to an opaque string. 

40 

41 :type token: dict 

42 :param token: A dictionary containing pagination information, 

43 particularly the service pagination token(s) but also other boto 

44 metadata. 

45 

46 :rtype: str 

47 :returns: An opaque string 

48 """ 

49 try: 

50 # Try just using json dumps first to avoid having to traverse 

51 # and encode the dict. In 99.9999% of cases this will work. 

52 json_string = json.dumps(token) 

53 except (TypeError, UnicodeDecodeError): 

54 # If normal dumping failed, go through and base64 encode all bytes. 

55 encoded_token, encoded_keys = self._encode(token, []) 

56 

57 # Save the list of all the encoded key paths. We can safely 

58 # assume that no service will ever use this key. 

59 encoded_token['boto_encoded_keys'] = encoded_keys 

60 

61 # Now that the bytes are all encoded, dump the json. 

62 json_string = json.dumps(encoded_token) 

63 

64 # base64 encode the json string to produce an opaque token string. 

65 return base64.b64encode(json_string.encode('utf-8')).decode('utf-8') 

66 

67 def _encode(self, data, path): 

68 """Encode bytes in given data, keeping track of the path traversed.""" 

69 if isinstance(data, dict): 

70 return self._encode_dict(data, path) 

71 elif isinstance(data, list): 

72 return self._encode_list(data, path) 

73 elif isinstance(data, bytes): 

74 return self._encode_bytes(data, path) 

75 else: 

76 return data, [] 

77 

78 def _encode_list(self, data, path): 

79 """Encode any bytes in a list, noting the index of what is encoded.""" 

80 new_data = [] 

81 encoded = [] 

82 for i, value in enumerate(data): 

83 new_path = path + [i] 

84 new_value, new_encoded = self._encode(value, new_path) 

85 new_data.append(new_value) 

86 encoded.extend(new_encoded) 

87 return new_data, encoded 

88 

89 def _encode_dict(self, data, path): 

90 """Encode any bytes in a dict, noting the index of what is encoded.""" 

91 new_data = {} 

92 encoded = [] 

93 for key, value in data.items(): 

94 new_path = path + [key] 

95 new_value, new_encoded = self._encode(value, new_path) 

96 new_data[key] = new_value 

97 encoded.extend(new_encoded) 

98 return new_data, encoded 

99 

100 def _encode_bytes(self, data, path): 

101 """Base64 encode a byte string.""" 

102 return base64.b64encode(data).decode('utf-8'), [path] 

103 

104 

105class TokenDecoder: 

106 """Decodes token strings back into dictionaries. 

107 

108 This performs the inverse operation to the TokenEncoder, accepting 

109 opaque strings and decoding them into a useable form. 

110 """ 

111 

112 def decode(self, token): 

113 """Decodes an opaque string to a dictionary. 

114 

115 :type token: str 

116 :param token: A token string given by the botocore pagination 

117 interface. 

118 

119 :rtype: dict 

120 :returns: A dictionary containing pagination information, 

121 particularly the service pagination token(s) but also other boto 

122 metadata. 

123 """ 

124 json_string = base64.b64decode(token.encode('utf-8')).decode('utf-8') 

125 decoded_token = json.loads(json_string) 

126 

127 # Remove the encoding metadata as it is read since it will no longer 

128 # be needed. 

129 encoded_keys = decoded_token.pop('boto_encoded_keys', None) 

130 if encoded_keys is None: 

131 return decoded_token 

132 else: 

133 return self._decode(decoded_token, encoded_keys) 

134 

135 def _decode(self, token, encoded_keys): 

136 """Find each encoded value and decode it.""" 

137 for key in encoded_keys: 

138 encoded = self._path_get(token, key) 

139 decoded = base64.b64decode(encoded.encode('utf-8')) 

140 self._path_set(token, key, decoded) 

141 return token 

142 

143 def _path_get(self, data, path): 

144 """Return the nested data at the given path. 

145 

146 For instance: 

147 data = {'foo': ['bar', 'baz']} 

148 path = ['foo', 0] 

149 ==> 'bar' 

150 """ 

151 # jmespath isn't used here because it would be difficult to actually 

152 # create the jmespath query when taking all of the unknowns of key 

153 # structure into account. Gross though this is, it is simple and not 

154 # very error prone. 

155 d = data 

156 for step in path: 

157 d = d[step] 

158 return d 

159 

160 def _path_set(self, data, path, value): 

161 """Set the value of a key in the given data. 

162 

163 Example: 

164 data = {'foo': ['bar', 'baz']} 

165 path = ['foo', 1] 

166 value = 'bin' 

167 ==> data = {'foo': ['bar', 'bin']} 

168 """ 

169 container = self._path_get(data, path[:-1]) 

170 container[path[-1]] = value 

171 

172 

173class PaginatorModel: 

174 def __init__(self, paginator_config): 

175 self._paginator_config = paginator_config['pagination'] 

176 

177 def get_paginator(self, operation_name): 

178 try: 

179 single_paginator_config = self._paginator_config[operation_name] 

180 except KeyError: 

181 raise ValueError( 

182 "Paginator for operation does not exist: %s" % operation_name 

183 ) 

184 return single_paginator_config 

185 

186 

187class PageIterator: 

188 """An iterable object to paginate API results. 

189 Please note it is NOT a python iterator. 

190 Use ``iter`` to wrap this as a generator. 

191 """ 

192 

193 def __init__( 

194 self, 

195 method, 

196 input_token, 

197 output_token, 

198 more_results, 

199 result_keys, 

200 non_aggregate_keys, 

201 limit_key, 

202 max_items, 

203 starting_token, 

204 page_size, 

205 op_kwargs, 

206 ): 

207 self._method = method 

208 self._input_token = input_token 

209 self._output_token = output_token 

210 self._more_results = more_results 

211 self._result_keys = result_keys 

212 self._max_items = max_items 

213 self._limit_key = limit_key 

214 self._starting_token = starting_token 

215 self._page_size = page_size 

216 self._op_kwargs = op_kwargs 

217 self._resume_token = None 

218 self._non_aggregate_key_exprs = non_aggregate_keys 

219 self._non_aggregate_part = {} 

220 self._token_encoder = TokenEncoder() 

221 self._token_decoder = TokenDecoder() 

222 

223 @property 

224 def result_keys(self): 

225 return self._result_keys 

226 

227 @property 

228 def resume_token(self): 

229 """Token to specify to resume pagination.""" 

230 return self._resume_token 

231 

232 @resume_token.setter 

233 def resume_token(self, value): 

234 if not isinstance(value, dict): 

235 raise ValueError("Bad starting token: %s" % value) 

236 

237 if 'boto_truncate_amount' in value: 

238 token_keys = sorted(self._input_token + ['boto_truncate_amount']) 

239 else: 

240 token_keys = sorted(self._input_token) 

241 dict_keys = sorted(value.keys()) 

242 

243 if token_keys == dict_keys: 

244 self._resume_token = self._token_encoder.encode(value) 

245 else: 

246 raise ValueError("Bad starting token: %s" % value) 

247 

248 @property 

249 def non_aggregate_part(self): 

250 return self._non_aggregate_part 

251 

252 def __iter__(self): 

253 current_kwargs = self._op_kwargs 

254 previous_next_token = None 

255 next_token = {key: None for key in self._input_token} 

256 if self._starting_token is not None: 

257 # If the starting token exists, populate the next_token with the 

258 # values inside it. This ensures that we have the service's 

259 # pagination token on hand if we need to truncate after the 

260 # first response. 

261 next_token = self._parse_starting_token()[0] 

262 # The number of items from result_key we've seen so far. 

263 total_items = 0 

264 first_request = True 

265 primary_result_key = self.result_keys[0] 

266 starting_truncation = 0 

267 self._inject_starting_params(current_kwargs) 

268 while True: 

269 response = self._make_request(current_kwargs) 

270 parsed = self._extract_parsed_response(response) 

271 if first_request: 

272 # The first request is handled differently. We could 

273 # possibly have a resume/starting token that tells us where 

274 # to index into the retrieved page. 

275 if self._starting_token is not None: 

276 starting_truncation = self._handle_first_request( 

277 parsed, primary_result_key, starting_truncation 

278 ) 

279 first_request = False 

280 self._record_non_aggregate_key_values(parsed) 

281 else: 

282 # If this isn't the first request, we have already sliced into 

283 # the first request and had to make additional requests after. 

284 # We no longer need to add this to truncation. 

285 starting_truncation = 0 

286 current_response = primary_result_key.search(parsed) 

287 if current_response is None: 

288 current_response = [] 

289 num_current_response = len(current_response) 

290 truncate_amount = 0 

291 if self._max_items is not None: 

292 truncate_amount = ( 

293 total_items + num_current_response - self._max_items 

294 ) 

295 if truncate_amount > 0: 

296 self._truncate_response( 

297 parsed, 

298 primary_result_key, 

299 truncate_amount, 

300 starting_truncation, 

301 next_token, 

302 ) 

303 yield response 

304 break 

305 else: 

306 yield response 

307 total_items += num_current_response 

308 next_token = self._get_next_token(parsed) 

309 if all(t is None for t in next_token.values()): 

310 break 

311 if ( 

312 self._max_items is not None 

313 and total_items == self._max_items 

314 ): 

315 # We're on a page boundary so we can set the current 

316 # next token to be the resume token. 

317 self.resume_token = next_token 

318 break 

319 if ( 

320 previous_next_token is not None 

321 and previous_next_token == next_token 

322 ): 

323 message = ( 

324 f"The same next token was received " 

325 f"twice: {next_token}" 

326 ) 

327 raise PaginationError(message=message) 

328 self._inject_token_into_kwargs(current_kwargs, next_token) 

329 previous_next_token = next_token 

330 

331 def search(self, expression): 

332 """Applies a JMESPath expression to a paginator 

333 

334 Each page of results is searched using the provided JMESPath 

335 expression. If the result is not a list, it is yielded 

336 directly. If the result is a list, each element in the result 

337 is yielded individually (essentially implementing a flatmap in 

338 which the JMESPath search is the mapping function). 

339 

340 :type expression: str 

341 :param expression: JMESPath expression to apply to each page. 

342 

343 :return: Returns an iterator that yields the individual 

344 elements of applying a JMESPath expression to each page of 

345 results. 

346 """ 

347 compiled = jmespath.compile(expression) 

348 for page in self: 

349 results = compiled.search(page) 

350 if isinstance(results, list): 

351 yield from results 

352 else: 

353 # Yield result directly if it is not a list. 

354 yield results 

355 

356 def _make_request(self, current_kwargs): 

357 return self._method(**current_kwargs) 

358 

359 def _extract_parsed_response(self, response): 

360 return response 

361 

362 def _record_non_aggregate_key_values(self, response): 

363 non_aggregate_keys = {} 

364 for expression in self._non_aggregate_key_exprs: 

365 result = expression.search(response) 

366 set_value_from_jmespath( 

367 non_aggregate_keys, expression.expression, result 

368 ) 

369 self._non_aggregate_part = non_aggregate_keys 

370 

371 def _inject_starting_params(self, op_kwargs): 

372 # If the user has specified a starting token we need to 

373 # inject that into the operation's kwargs. 

374 if self._starting_token is not None: 

375 # Don't need to do anything special if there is no starting 

376 # token specified. 

377 next_token = self._parse_starting_token()[0] 

378 self._inject_token_into_kwargs(op_kwargs, next_token) 

379 if self._page_size is not None: 

380 # Pass the page size as the parameter name for limiting 

381 # page size, also known as the limit_key. 

382 op_kwargs[self._limit_key] = self._page_size 

383 

384 def _inject_token_into_kwargs(self, op_kwargs, next_token): 

385 for name, token in next_token.items(): 

386 if (token is not None) and (token != 'None'): 

387 op_kwargs[name] = token 

388 elif name in op_kwargs: 

389 del op_kwargs[name] 

390 

391 def _handle_first_request( 

392 self, parsed, primary_result_key, starting_truncation 

393 ): 

394 # If the payload is an array or string, we need to slice into it 

395 # and only return the truncated amount. 

396 starting_truncation = self._parse_starting_token()[1] 

397 all_data = primary_result_key.search(parsed) 

398 if isinstance(all_data, (list, str)): 

399 data = all_data[starting_truncation:] 

400 else: 

401 data = None 

402 set_value_from_jmespath(parsed, primary_result_key.expression, data) 

403 # We also need to truncate any secondary result keys 

404 # because they were not truncated in the previous last 

405 # response. 

406 for token in self.result_keys: 

407 if token == primary_result_key: 

408 continue 

409 sample = token.search(parsed) 

410 if isinstance(sample, list): 

411 empty_value = [] 

412 elif isinstance(sample, str): 

413 empty_value = '' 

414 elif isinstance(sample, (int, float)): 

415 empty_value = 0 

416 else: 

417 empty_value = None 

418 set_value_from_jmespath(parsed, token.expression, empty_value) 

419 return starting_truncation 

420 

421 def _truncate_response( 

422 self, 

423 parsed, 

424 primary_result_key, 

425 truncate_amount, 

426 starting_truncation, 

427 next_token, 

428 ): 

429 original = primary_result_key.search(parsed) 

430 if original is None: 

431 original = [] 

432 amount_to_keep = len(original) - truncate_amount 

433 truncated = original[:amount_to_keep] 

434 set_value_from_jmespath( 

435 parsed, primary_result_key.expression, truncated 

436 ) 

437 # The issue here is that even though we know how much we've truncated 

438 # we need to account for this globally including any starting 

439 # left truncation. For example: 

440 # Raw response: [0,1,2,3] 

441 # Starting index: 1 

442 # Max items: 1 

443 # Starting left truncation: [1, 2, 3] 

444 # End right truncation for max items: [1] 

445 # However, even though we only kept 1, this is post 

446 # left truncation so the next starting index should be 2, not 1 

447 # (left_truncation + amount_to_keep). 

448 next_token['boto_truncate_amount'] = ( 

449 amount_to_keep + starting_truncation 

450 ) 

451 self.resume_token = next_token 

452 

453 def _get_next_token(self, parsed): 

454 if self._more_results is not None: 

455 if not self._more_results.search(parsed): 

456 return {} 

457 next_tokens = {} 

458 for output_token, input_key in zip( 

459 self._output_token, self._input_token 

460 ): 

461 next_token = output_token.search(parsed) 

462 # We do not want to include any empty strings as actual tokens. 

463 # Treat them as None. 

464 if next_token: 

465 next_tokens[input_key] = next_token 

466 else: 

467 next_tokens[input_key] = None 

468 return next_tokens 

469 

470 def result_key_iters(self): 

471 teed_results = tee(self, len(self.result_keys)) 

472 return [ 

473 ResultKeyIterator(i, result_key) 

474 for i, result_key in zip(teed_results, self.result_keys) 

475 ] 

476 

477 def build_full_result(self): 

478 complete_result = {} 

479 for response in self: 

480 page = response 

481 # We want to try to catch operation object pagination 

482 # and format correctly for those. They come in the form 

483 # of a tuple of two elements: (http_response, parsed_responsed). 

484 # We want the parsed_response as that is what the page iterator 

485 # uses. We can remove it though once operation objects are removed. 

486 if isinstance(response, tuple) and len(response) == 2: 

487 page = response[1] 

488 # We're incrementally building the full response page 

489 # by page. For each page in the response we need to 

490 # inject the necessary components from the page 

491 # into the complete_result. 

492 for result_expression in self.result_keys: 

493 # In order to incrementally update a result key 

494 # we need to search the existing value from complete_result, 

495 # then we need to search the _current_ page for the 

496 # current result key value. Then we append the current 

497 # value onto the existing value, and re-set that value 

498 # as the new value. 

499 result_value = result_expression.search(page) 

500 if result_value is None: 

501 continue 

502 existing_value = result_expression.search(complete_result) 

503 if existing_value is None: 

504 # Set the initial result 

505 set_value_from_jmespath( 

506 complete_result, 

507 result_expression.expression, 

508 result_value, 

509 ) 

510 continue 

511 # Now both result_value and existing_value contain something 

512 if isinstance(result_value, list): 

513 existing_value.extend(result_value) 

514 elif isinstance(result_value, (int, float, str)): 

515 # Modify the existing result with the sum or concatenation 

516 set_value_from_jmespath( 

517 complete_result, 

518 result_expression.expression, 

519 existing_value + result_value, 

520 ) 

521 merge_dicts(complete_result, self.non_aggregate_part) 

522 if self.resume_token is not None: 

523 complete_result['NextToken'] = self.resume_token 

524 return complete_result 

525 

526 def _parse_starting_token(self): 

527 if self._starting_token is None: 

528 return None 

529 

530 # The starting token is a dict passed as a base64 encoded string. 

531 next_token = self._starting_token 

532 try: 

533 next_token = self._token_decoder.decode(next_token) 

534 index = 0 

535 if 'boto_truncate_amount' in next_token: 

536 index = next_token.get('boto_truncate_amount') 

537 del next_token['boto_truncate_amount'] 

538 except (ValueError, TypeError): 

539 next_token, index = self._parse_starting_token_deprecated() 

540 return next_token, index 

541 

542 def _parse_starting_token_deprecated(self): 

543 """ 

544 This handles parsing of old style starting tokens, and attempts to 

545 coerce them into the new style. 

546 """ 

547 log.debug( 

548 "Attempting to fall back to old starting token parser. For " 

549 "token: %s" % self._starting_token 

550 ) 

551 if self._starting_token is None: 

552 return None 

553 

554 parts = self._starting_token.split('___') 

555 next_token = [] 

556 index = 0 

557 if len(parts) == len(self._input_token) + 1: 

558 try: 

559 index = int(parts.pop()) 

560 except ValueError: 

561 # This doesn't look like a valid old-style token, so we're 

562 # passing it along as an opaque service token. 

563 parts = [self._starting_token] 

564 

565 for part in parts: 

566 if part == 'None': 

567 next_token.append(None) 

568 else: 

569 next_token.append(part) 

570 return self._convert_deprecated_starting_token(next_token), index 

571 

572 def _convert_deprecated_starting_token(self, deprecated_token): 

573 """ 

574 This attempts to convert a deprecated starting token into the new 

575 style. 

576 """ 

577 len_deprecated_token = len(deprecated_token) 

578 len_input_token = len(self._input_token) 

579 if len_deprecated_token > len_input_token: 

580 raise ValueError("Bad starting token: %s" % self._starting_token) 

581 elif len_deprecated_token < len_input_token: 

582 log.debug( 

583 "Old format starting token does not contain all input " 

584 "tokens. Setting the rest, in order, as None." 

585 ) 

586 for i in range(len_input_token - len_deprecated_token): 

587 deprecated_token.append(None) 

588 return dict(zip(self._input_token, deprecated_token)) 

589 

590 

591class Paginator: 

592 PAGE_ITERATOR_CLS = PageIterator 

593 

594 def __init__(self, method, pagination_config, model): 

595 self._model = model 

596 self._method = method 

597 self._pagination_cfg = pagination_config 

598 self._output_token = self._get_output_tokens(self._pagination_cfg) 

599 self._input_token = self._get_input_tokens(self._pagination_cfg) 

600 self._more_results = self._get_more_results_token(self._pagination_cfg) 

601 self._non_aggregate_keys = self._get_non_aggregate_keys( 

602 self._pagination_cfg 

603 ) 

604 self._result_keys = self._get_result_keys(self._pagination_cfg) 

605 self._limit_key = self._get_limit_key(self._pagination_cfg) 

606 

607 @property 

608 def result_keys(self): 

609 return self._result_keys 

610 

611 def _get_non_aggregate_keys(self, config): 

612 keys = [] 

613 for key in config.get('non_aggregate_keys', []): 

614 keys.append(jmespath.compile(key)) 

615 return keys 

616 

617 def _get_output_tokens(self, config): 

618 output = [] 

619 output_token = config['output_token'] 

620 if not isinstance(output_token, list): 

621 output_token = [output_token] 

622 for config in output_token: 

623 output.append(jmespath.compile(config)) 

624 return output 

625 

626 def _get_input_tokens(self, config): 

627 input_token = self._pagination_cfg['input_token'] 

628 if not isinstance(input_token, list): 

629 input_token = [input_token] 

630 return input_token 

631 

632 def _get_more_results_token(self, config): 

633 more_results = config.get('more_results') 

634 if more_results is not None: 

635 return jmespath.compile(more_results) 

636 

637 def _get_result_keys(self, config): 

638 result_key = config.get('result_key') 

639 if result_key is not None: 

640 if not isinstance(result_key, list): 

641 result_key = [result_key] 

642 result_key = [jmespath.compile(rk) for rk in result_key] 

643 return result_key 

644 

645 def _get_limit_key(self, config): 

646 return config.get('limit_key') 

647 

648 def paginate(self, **kwargs): 

649 """Create paginator object for an operation. 

650 

651 This returns an iterable object. Iterating over 

652 this object will yield a single page of a response 

653 at a time. 

654 

655 """ 

656 page_params = self._extract_paging_params(kwargs) 

657 return self.PAGE_ITERATOR_CLS( 

658 self._method, 

659 self._input_token, 

660 self._output_token, 

661 self._more_results, 

662 self._result_keys, 

663 self._non_aggregate_keys, 

664 self._limit_key, 

665 page_params['MaxItems'], 

666 page_params['StartingToken'], 

667 page_params['PageSize'], 

668 kwargs, 

669 ) 

670 

671 def _extract_paging_params(self, kwargs): 

672 pagination_config = kwargs.pop('PaginationConfig', {}) 

673 max_items = pagination_config.get('MaxItems', None) 

674 if max_items is not None: 

675 max_items = int(max_items) 

676 page_size = pagination_config.get('PageSize', None) 

677 if page_size is not None: 

678 if self._limit_key is None: 

679 raise PaginationError( 

680 message="PageSize parameter is not supported for the " 

681 "pagination interface for this operation." 

682 ) 

683 input_members = self._model.input_shape.members 

684 limit_key_shape = input_members.get(self._limit_key) 

685 if limit_key_shape.type_name == 'string': 

686 if not isinstance(page_size, str): 

687 page_size = str(page_size) 

688 else: 

689 page_size = int(page_size) 

690 return { 

691 'MaxItems': max_items, 

692 'StartingToken': pagination_config.get('StartingToken', None), 

693 'PageSize': page_size, 

694 } 

695 

696 

697class ResultKeyIterator: 

698 """Iterates over the results of paginated responses. 

699 

700 Each iterator is associated with a single result key. 

701 Iterating over this object will give you each element in 

702 the result key list. 

703 

704 :param pages_iterator: An iterator that will give you 

705 pages of results (a ``PageIterator`` class). 

706 :param result_key: The JMESPath expression representing 

707 the result key. 

708 

709 """ 

710 

711 def __init__(self, pages_iterator, result_key): 

712 self._pages_iterator = pages_iterator 

713 self.result_key = result_key 

714 

715 def __iter__(self): 

716 for page in self._pages_iterator: 

717 results = self.result_key.search(page) 

718 if results is None: 

719 results = [] 

720 yield from results