Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/paginate.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

371 statements  

1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13 

14import base64 

15import json 

16import logging 

17from functools import partial 

18from itertools import tee 

19 

20import jmespath 

21 

22from botocore.context import with_current_context 

23from botocore.exceptions import PaginationError 

24from botocore.useragent import register_feature_id 

25from botocore.utils import merge_dicts, set_value_from_jmespath 

26 

27log = logging.getLogger(__name__) 

28 

29 

30class TokenEncoder: 

31 """Encodes dictionaries into opaque strings. 

32 

33 This for the most part json dumps + base64 encoding, but also supports 

34 having bytes in the dictionary in addition to the types that json can 

35 handle by default. 

36 

37 This is intended for use in encoding pagination tokens, which in some 

38 cases can be complex structures and / or contain bytes. 

39 """ 

40 

41 def encode(self, token): 

42 """Encodes a dictionary to an opaque string. 

43 

44 :type token: dict 

45 :param token: A dictionary containing pagination information, 

46 particularly the service pagination token(s) but also other boto 

47 metadata. 

48 

49 :rtype: str 

50 :returns: An opaque string 

51 """ 

52 try: 

53 # Try just using json dumps first to avoid having to traverse 

54 # and encode the dict. In 99.9999% of cases this will work. 

55 json_string = json.dumps(token) 

56 except (TypeError, UnicodeDecodeError): 

57 # If normal dumping failed, go through and base64 encode all bytes. 

58 encoded_token, encoded_keys = self._encode(token, []) 

59 

60 # Save the list of all the encoded key paths. We can safely 

61 # assume that no service will ever use this key. 

62 encoded_token['boto_encoded_keys'] = encoded_keys 

63 

64 # Now that the bytes are all encoded, dump the json. 

65 json_string = json.dumps(encoded_token) 

66 

67 # base64 encode the json string to produce an opaque token string. 

68 return base64.b64encode(json_string.encode('utf-8')).decode('utf-8') 

69 

70 def _encode(self, data, path): 

71 """Encode bytes in given data, keeping track of the path traversed.""" 

72 if isinstance(data, dict): 

73 return self._encode_dict(data, path) 

74 elif isinstance(data, list): 

75 return self._encode_list(data, path) 

76 elif isinstance(data, bytes): 

77 return self._encode_bytes(data, path) 

78 else: 

79 return data, [] 

80 

81 def _encode_list(self, data, path): 

82 """Encode any bytes in a list, noting the index of what is encoded.""" 

83 new_data = [] 

84 encoded = [] 

85 for i, value in enumerate(data): 

86 new_path = path + [i] 

87 new_value, new_encoded = self._encode(value, new_path) 

88 new_data.append(new_value) 

89 encoded.extend(new_encoded) 

90 return new_data, encoded 

91 

92 def _encode_dict(self, data, path): 

93 """Encode any bytes in a dict, noting the index of what is encoded.""" 

94 new_data = {} 

95 encoded = [] 

96 for key, value in data.items(): 

97 new_path = path + [key] 

98 new_value, new_encoded = self._encode(value, new_path) 

99 new_data[key] = new_value 

100 encoded.extend(new_encoded) 

101 return new_data, encoded 

102 

103 def _encode_bytes(self, data, path): 

104 """Base64 encode a byte string.""" 

105 return base64.b64encode(data).decode('utf-8'), [path] 

106 

107 

108class TokenDecoder: 

109 """Decodes token strings back into dictionaries. 

110 

111 This performs the inverse operation to the TokenEncoder, accepting 

112 opaque strings and decoding them into a useable form. 

113 """ 

114 

115 def decode(self, token): 

116 """Decodes an opaque string to a dictionary. 

117 

118 :type token: str 

119 :param token: A token string given by the botocore pagination 

120 interface. 

121 

122 :rtype: dict 

123 :returns: A dictionary containing pagination information, 

124 particularly the service pagination token(s) but also other boto 

125 metadata. 

126 """ 

127 json_string = base64.b64decode(token.encode('utf-8')).decode('utf-8') 

128 decoded_token = json.loads(json_string) 

129 

130 # Remove the encoding metadata as it is read since it will no longer 

131 # be needed. 

132 encoded_keys = decoded_token.pop('boto_encoded_keys', None) 

133 if encoded_keys is None: 

134 return decoded_token 

135 else: 

136 return self._decode(decoded_token, encoded_keys) 

137 

138 def _decode(self, token, encoded_keys): 

139 """Find each encoded value and decode it.""" 

140 for key in encoded_keys: 

141 encoded = self._path_get(token, key) 

142 decoded = base64.b64decode(encoded.encode('utf-8')) 

143 self._path_set(token, key, decoded) 

144 return token 

145 

146 def _path_get(self, data, path): 

147 """Return the nested data at the given path. 

148 

149 For instance: 

150 data = {'foo': ['bar', 'baz']} 

151 path = ['foo', 0] 

152 ==> 'bar' 

153 """ 

154 # jmespath isn't used here because it would be difficult to actually 

155 # create the jmespath query when taking all of the unknowns of key 

156 # structure into account. Gross though this is, it is simple and not 

157 # very error prone. 

158 d = data 

159 for step in path: 

160 d = d[step] 

161 return d 

162 

163 def _path_set(self, data, path, value): 

164 """Set the value of a key in the given data. 

165 

166 Example: 

167 data = {'foo': ['bar', 'baz']} 

168 path = ['foo', 1] 

169 value = 'bin' 

170 ==> data = {'foo': ['bar', 'bin']} 

171 """ 

172 container = self._path_get(data, path[:-1]) 

173 container[path[-1]] = value 

174 

175 

176class PaginatorModel: 

177 def __init__(self, paginator_config): 

178 self._paginator_config = paginator_config['pagination'] 

179 

180 def get_paginator(self, operation_name): 

181 try: 

182 single_paginator_config = self._paginator_config[operation_name] 

183 except KeyError: 

184 raise ValueError( 

185 f"Paginator for operation does not exist: {operation_name}" 

186 ) 

187 return single_paginator_config 

188 

189 

190class PageIterator: 

191 """An iterable object to paginate API results. 

192 Please note it is NOT a python iterator. 

193 Use ``iter`` to wrap this as a generator. 

194 """ 

195 

196 def __init__( 

197 self, 

198 method, 

199 input_token, 

200 output_token, 

201 more_results, 

202 result_keys, 

203 non_aggregate_keys, 

204 limit_key, 

205 max_items, 

206 starting_token, 

207 page_size, 

208 op_kwargs, 

209 ): 

210 self._method = method 

211 self._input_token = input_token 

212 self._output_token = output_token 

213 self._more_results = more_results 

214 self._result_keys = result_keys 

215 self._max_items = max_items 

216 self._limit_key = limit_key 

217 self._starting_token = starting_token 

218 self._page_size = page_size 

219 self._op_kwargs = op_kwargs 

220 self._resume_token = None 

221 self._non_aggregate_key_exprs = non_aggregate_keys 

222 self._non_aggregate_part = {} 

223 self._token_encoder = TokenEncoder() 

224 self._token_decoder = TokenDecoder() 

225 

226 @property 

227 def result_keys(self): 

228 return self._result_keys 

229 

230 @property 

231 def resume_token(self): 

232 """Token to specify to resume pagination.""" 

233 return self._resume_token 

234 

235 @resume_token.setter 

236 def resume_token(self, value): 

237 if not isinstance(value, dict): 

238 raise ValueError(f"Bad starting token: {value}") 

239 

240 if 'boto_truncate_amount' in value: 

241 token_keys = sorted(self._input_token + ['boto_truncate_amount']) 

242 else: 

243 token_keys = sorted(self._input_token) 

244 dict_keys = sorted(value.keys()) 

245 

246 if token_keys == dict_keys: 

247 self._resume_token = self._token_encoder.encode(value) 

248 else: 

249 raise ValueError(f"Bad starting token: {value}") 

250 

251 @property 

252 def non_aggregate_part(self): 

253 return self._non_aggregate_part 

254 

255 def __iter__(self): 

256 current_kwargs = self._op_kwargs 

257 previous_next_token = None 

258 next_token = {key: None for key in self._input_token} 

259 if self._starting_token is not None: 

260 # If the starting token exists, populate the next_token with the 

261 # values inside it. This ensures that we have the service's 

262 # pagination token on hand if we need to truncate after the 

263 # first response. 

264 next_token = self._parse_starting_token()[0] 

265 # The number of items from result_key we've seen so far. 

266 total_items = 0 

267 first_request = True 

268 primary_result_key = self.result_keys[0] 

269 starting_truncation = 0 

270 self._inject_starting_params(current_kwargs) 

271 while True: 

272 response = self._make_request(current_kwargs) 

273 parsed = self._extract_parsed_response(response) 

274 if first_request: 

275 # The first request is handled differently. We could 

276 # possibly have a resume/starting token that tells us where 

277 # to index into the retrieved page. 

278 if self._starting_token is not None: 

279 starting_truncation = self._handle_first_request( 

280 parsed, primary_result_key, starting_truncation 

281 ) 

282 first_request = False 

283 self._record_non_aggregate_key_values(parsed) 

284 else: 

285 # If this isn't the first request, we have already sliced into 

286 # the first request and had to make additional requests after. 

287 # We no longer need to add this to truncation. 

288 starting_truncation = 0 

289 current_response = primary_result_key.search(parsed) 

290 if current_response is None: 

291 current_response = [] 

292 num_current_response = len(current_response) 

293 truncate_amount = 0 

294 if self._max_items is not None: 

295 truncate_amount = ( 

296 total_items + num_current_response - self._max_items 

297 ) 

298 if truncate_amount > 0: 

299 self._truncate_response( 

300 parsed, 

301 primary_result_key, 

302 truncate_amount, 

303 starting_truncation, 

304 next_token, 

305 ) 

306 yield response 

307 break 

308 else: 

309 yield response 

310 total_items += num_current_response 

311 next_token = self._get_next_token(parsed) 

312 if all(t is None for t in next_token.values()): 

313 break 

314 if ( 

315 self._max_items is not None 

316 and total_items == self._max_items 

317 ): 

318 # We're on a page boundary so we can set the current 

319 # next token to be the resume token. 

320 self.resume_token = next_token 

321 break 

322 if ( 

323 previous_next_token is not None 

324 and previous_next_token == next_token 

325 ): 

326 message = ( 

327 f"The same next token was received twice: {next_token}" 

328 ) 

329 raise PaginationError(message=message) 

330 self._inject_token_into_kwargs(current_kwargs, next_token) 

331 previous_next_token = next_token 

332 

333 def search(self, expression): 

334 """Applies a JMESPath expression to a paginator 

335 

336 Each page of results is searched using the provided JMESPath 

337 expression. If the result is not a list, it is yielded 

338 directly. If the result is a list, each element in the result 

339 is yielded individually (essentially implementing a flatmap in 

340 which the JMESPath search is the mapping function). 

341 

342 :type expression: str 

343 :param expression: JMESPath expression to apply to each page. 

344 

345 :return: Returns an iterator that yields the individual 

346 elements of applying a JMESPath expression to each page of 

347 results. 

348 """ 

349 compiled = jmespath.compile(expression) 

350 for page in self: 

351 results = compiled.search(page) 

352 if isinstance(results, list): 

353 yield from results 

354 else: 

355 # Yield result directly if it is not a list. 

356 yield results 

357 

358 @with_current_context(partial(register_feature_id, 'PAGINATOR')) 

359 def _make_request(self, current_kwargs): 

360 return self._method(**current_kwargs) 

361 

362 def _extract_parsed_response(self, response): 

363 return response 

364 

365 def _record_non_aggregate_key_values(self, response): 

366 non_aggregate_keys = {} 

367 for expression in self._non_aggregate_key_exprs: 

368 result = expression.search(response) 

369 set_value_from_jmespath( 

370 non_aggregate_keys, expression.expression, result 

371 ) 

372 self._non_aggregate_part = non_aggregate_keys 

373 

374 def _inject_starting_params(self, op_kwargs): 

375 # If the user has specified a starting token we need to 

376 # inject that into the operation's kwargs. 

377 if self._starting_token is not None: 

378 # Don't need to do anything special if there is no starting 

379 # token specified. 

380 next_token = self._parse_starting_token()[0] 

381 self._inject_token_into_kwargs(op_kwargs, next_token) 

382 if self._page_size is not None: 

383 # Pass the page size as the parameter name for limiting 

384 # page size, also known as the limit_key. 

385 op_kwargs[self._limit_key] = self._page_size 

386 

387 def _inject_token_into_kwargs(self, op_kwargs, next_token): 

388 for name, token in next_token.items(): 

389 if (token is not None) and (token != 'None'): 

390 op_kwargs[name] = token 

391 elif name in op_kwargs: 

392 del op_kwargs[name] 

393 

394 def _handle_first_request( 

395 self, parsed, primary_result_key, starting_truncation 

396 ): 

397 # If the payload is an array or string, we need to slice into it 

398 # and only return the truncated amount. 

399 starting_truncation = self._parse_starting_token()[1] 

400 all_data = primary_result_key.search(parsed) 

401 if isinstance(all_data, (list, str)): 

402 data = all_data[starting_truncation:] 

403 else: 

404 data = None 

405 set_value_from_jmespath(parsed, primary_result_key.expression, data) 

406 # We also need to truncate any secondary result keys 

407 # because they were not truncated in the previous last 

408 # response. 

409 for token in self.result_keys: 

410 if token == primary_result_key: 

411 continue 

412 sample = token.search(parsed) 

413 if isinstance(sample, list): 

414 empty_value = [] 

415 elif isinstance(sample, str): 

416 empty_value = '' 

417 elif isinstance(sample, (int, float)): 

418 empty_value = 0 

419 else: 

420 empty_value = None 

421 set_value_from_jmespath(parsed, token.expression, empty_value) 

422 return starting_truncation 

423 

424 def _truncate_response( 

425 self, 

426 parsed, 

427 primary_result_key, 

428 truncate_amount, 

429 starting_truncation, 

430 next_token, 

431 ): 

432 original = primary_result_key.search(parsed) 

433 if original is None: 

434 original = [] 

435 amount_to_keep = len(original) - truncate_amount 

436 truncated = original[:amount_to_keep] 

437 set_value_from_jmespath( 

438 parsed, primary_result_key.expression, truncated 

439 ) 

440 # The issue here is that even though we know how much we've truncated 

441 # we need to account for this globally including any starting 

442 # left truncation. For example: 

443 # Raw response: [0,1,2,3] 

444 # Starting index: 1 

445 # Max items: 1 

446 # Starting left truncation: [1, 2, 3] 

447 # End right truncation for max items: [1] 

448 # However, even though we only kept 1, this is post 

449 # left truncation so the next starting index should be 2, not 1 

450 # (left_truncation + amount_to_keep). 

451 next_token['boto_truncate_amount'] = ( 

452 amount_to_keep + starting_truncation 

453 ) 

454 self.resume_token = next_token 

455 

456 def _get_next_token(self, parsed): 

457 if self._more_results is not None: 

458 if not self._more_results.search(parsed): 

459 return {} 

460 next_tokens = {} 

461 for output_token, input_key in zip( 

462 self._output_token, self._input_token 

463 ): 

464 next_token = output_token.search(parsed) 

465 # We do not want to include any empty strings as actual tokens. 

466 # Treat them as None. 

467 if next_token: 

468 next_tokens[input_key] = next_token 

469 else: 

470 next_tokens[input_key] = None 

471 return next_tokens 

472 

473 def result_key_iters(self): 

474 teed_results = tee(self, len(self.result_keys)) 

475 return [ 

476 ResultKeyIterator(i, result_key) 

477 for i, result_key in zip(teed_results, self.result_keys) 

478 ] 

479 

480 def build_full_result(self): 

481 complete_result = {} 

482 for response in self: 

483 page = response 

484 # We want to try to catch operation object pagination 

485 # and format correctly for those. They come in the form 

486 # of a tuple of two elements: (http_response, parsed_responsed). 

487 # We want the parsed_response as that is what the page iterator 

488 # uses. We can remove it though once operation objects are removed. 

489 if isinstance(response, tuple) and len(response) == 2: 

490 page = response[1] 

491 # We're incrementally building the full response page 

492 # by page. For each page in the response we need to 

493 # inject the necessary components from the page 

494 # into the complete_result. 

495 for result_expression in self.result_keys: 

496 # In order to incrementally update a result key 

497 # we need to search the existing value from complete_result, 

498 # then we need to search the _current_ page for the 

499 # current result key value. Then we append the current 

500 # value onto the existing value, and re-set that value 

501 # as the new value. 

502 result_value = result_expression.search(page) 

503 if result_value is None: 

504 continue 

505 existing_value = result_expression.search(complete_result) 

506 if existing_value is None: 

507 # Set the initial result 

508 set_value_from_jmespath( 

509 complete_result, 

510 result_expression.expression, 

511 result_value, 

512 ) 

513 continue 

514 # Now both result_value and existing_value contain something 

515 if isinstance(result_value, list): 

516 existing_value.extend(result_value) 

517 elif isinstance(result_value, (int, float, str)): 

518 # Modify the existing result with the sum or concatenation 

519 set_value_from_jmespath( 

520 complete_result, 

521 result_expression.expression, 

522 existing_value + result_value, 

523 ) 

524 merge_dicts(complete_result, self.non_aggregate_part) 

525 if self.resume_token is not None: 

526 complete_result['NextToken'] = self.resume_token 

527 return complete_result 

528 

529 def _parse_starting_token(self): 

530 if self._starting_token is None: 

531 return None 

532 

533 # The starting token is a dict passed as a base64 encoded string. 

534 next_token = self._starting_token 

535 try: 

536 next_token = self._token_decoder.decode(next_token) 

537 index = 0 

538 if 'boto_truncate_amount' in next_token: 

539 index = next_token.get('boto_truncate_amount') 

540 del next_token['boto_truncate_amount'] 

541 except (ValueError, TypeError): 

542 next_token, index = self._parse_starting_token_deprecated() 

543 return next_token, index 

544 

545 def _parse_starting_token_deprecated(self): 

546 """ 

547 This handles parsing of old style starting tokens, and attempts to 

548 coerce them into the new style. 

549 """ 

550 log.debug( 

551 "Attempting to fall back to old starting token parser. For " 

552 f"token: {self._starting_token}" 

553 ) 

554 if self._starting_token is None: 

555 return None 

556 

557 parts = self._starting_token.split('___') 

558 next_token = [] 

559 index = 0 

560 if len(parts) == len(self._input_token) + 1: 

561 try: 

562 index = int(parts.pop()) 

563 except ValueError: 

564 # This doesn't look like a valid old-style token, so we're 

565 # passing it along as an opaque service token. 

566 parts = [self._starting_token] 

567 

568 for part in parts: 

569 if part == 'None': 

570 next_token.append(None) 

571 else: 

572 next_token.append(part) 

573 return self._convert_deprecated_starting_token(next_token), index 

574 

575 def _convert_deprecated_starting_token(self, deprecated_token): 

576 """ 

577 This attempts to convert a deprecated starting token into the new 

578 style. 

579 """ 

580 len_deprecated_token = len(deprecated_token) 

581 len_input_token = len(self._input_token) 

582 if len_deprecated_token > len_input_token: 

583 raise ValueError(f"Bad starting token: {self._starting_token}") 

584 elif len_deprecated_token < len_input_token: 

585 log.debug( 

586 "Old format starting token does not contain all input " 

587 "tokens. Setting the rest, in order, as None." 

588 ) 

589 for i in range(len_input_token - len_deprecated_token): 

590 deprecated_token.append(None) 

591 return dict(zip(self._input_token, deprecated_token)) 

592 

593 

594class Paginator: 

595 PAGE_ITERATOR_CLS = PageIterator 

596 

597 def __init__(self, method, pagination_config, model): 

598 self._model = model 

599 self._method = method 

600 self._pagination_cfg = pagination_config 

601 self._output_token = self._get_output_tokens(self._pagination_cfg) 

602 self._input_token = self._get_input_tokens(self._pagination_cfg) 

603 self._more_results = self._get_more_results_token(self._pagination_cfg) 

604 self._non_aggregate_keys = self._get_non_aggregate_keys( 

605 self._pagination_cfg 

606 ) 

607 self._result_keys = self._get_result_keys(self._pagination_cfg) 

608 self._limit_key = self._get_limit_key(self._pagination_cfg) 

609 

610 @property 

611 def result_keys(self): 

612 return self._result_keys 

613 

614 def _get_non_aggregate_keys(self, config): 

615 keys = [] 

616 for key in config.get('non_aggregate_keys', []): 

617 keys.append(jmespath.compile(key)) 

618 return keys 

619 

620 def _get_output_tokens(self, config): 

621 output = [] 

622 output_token = config['output_token'] 

623 if not isinstance(output_token, list): 

624 output_token = [output_token] 

625 for config in output_token: 

626 output.append(jmespath.compile(config)) 

627 return output 

628 

629 def _get_input_tokens(self, config): 

630 input_token = self._pagination_cfg['input_token'] 

631 if not isinstance(input_token, list): 

632 input_token = [input_token] 

633 return input_token 

634 

635 def _get_more_results_token(self, config): 

636 more_results = config.get('more_results') 

637 if more_results is not None: 

638 return jmespath.compile(more_results) 

639 

640 def _get_result_keys(self, config): 

641 result_key = config.get('result_key') 

642 if result_key is not None: 

643 if not isinstance(result_key, list): 

644 result_key = [result_key] 

645 result_key = [jmespath.compile(rk) for rk in result_key] 

646 return result_key 

647 

648 def _get_limit_key(self, config): 

649 return config.get('limit_key') 

650 

651 def paginate(self, **kwargs): 

652 """Create paginator object for an operation. 

653 

654 This returns an iterable object. Iterating over 

655 this object will yield a single page of a response 

656 at a time. 

657 

658 """ 

659 page_params = self._extract_paging_params(kwargs) 

660 return self.PAGE_ITERATOR_CLS( 

661 self._method, 

662 self._input_token, 

663 self._output_token, 

664 self._more_results, 

665 self._result_keys, 

666 self._non_aggregate_keys, 

667 self._limit_key, 

668 page_params['MaxItems'], 

669 page_params['StartingToken'], 

670 page_params['PageSize'], 

671 kwargs, 

672 ) 

673 

674 def _extract_paging_params(self, kwargs): 

675 pagination_config = kwargs.pop('PaginationConfig', {}) 

676 max_items = pagination_config.get('MaxItems', None) 

677 if max_items is not None: 

678 max_items = int(max_items) 

679 page_size = pagination_config.get('PageSize', None) 

680 if page_size is not None: 

681 if self._limit_key is None: 

682 raise PaginationError( 

683 message="PageSize parameter is not supported for the " 

684 "pagination interface for this operation." 

685 ) 

686 input_members = self._model.input_shape.members 

687 limit_key_shape = input_members.get(self._limit_key) 

688 if limit_key_shape.type_name == 'string': 

689 if not isinstance(page_size, str): 

690 page_size = str(page_size) 

691 else: 

692 page_size = int(page_size) 

693 return { 

694 'MaxItems': max_items, 

695 'StartingToken': pagination_config.get('StartingToken', None), 

696 'PageSize': page_size, 

697 } 

698 

699 

700class ResultKeyIterator: 

701 """Iterates over the results of paginated responses. 

702 

703 Each iterator is associated with a single result key. 

704 Iterating over this object will give you each element in 

705 the result key list. 

706 

707 :param pages_iterator: An iterator that will give you 

708 pages of results (a ``PageIterator`` class). 

709 :param result_key: The JMESPath expression representing 

710 the result key. 

711 

712 """ 

713 

714 def __init__(self, pages_iterator, result_key): 

715 self._pages_iterator = pages_iterator 

716 self.result_key = result_key 

717 

718 def __iter__(self): 

719 for page in self._pages_iterator: 

720 results = self.result_key.search(page) 

721 if results is None: 

722 results = [] 

723 yield from results