Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/paginate.py: 17%
365 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
14import base64
15import json
16import logging
17from itertools import tee
19import jmespath
21from botocore.exceptions import PaginationError
22from botocore.utils import merge_dicts, set_value_from_jmespath
24log = logging.getLogger(__name__)
27class TokenEncoder:
28 """Encodes dictionaries into opaque strings.
30 This for the most part json dumps + base64 encoding, but also supports
31 having bytes in the dictionary in addition to the types that json can
32 handle by default.
34 This is intended for use in encoding pagination tokens, which in some
35 cases can be complex structures and / or contain bytes.
36 """
38 def encode(self, token):
39 """Encodes a dictionary to an opaque string.
41 :type token: dict
42 :param token: A dictionary containing pagination information,
43 particularly the service pagination token(s) but also other boto
44 metadata.
46 :rtype: str
47 :returns: An opaque string
48 """
49 try:
50 # Try just using json dumps first to avoid having to traverse
51 # and encode the dict. In 99.9999% of cases this will work.
52 json_string = json.dumps(token)
53 except (TypeError, UnicodeDecodeError):
54 # If normal dumping failed, go through and base64 encode all bytes.
55 encoded_token, encoded_keys = self._encode(token, [])
57 # Save the list of all the encoded key paths. We can safely
58 # assume that no service will ever use this key.
59 encoded_token['boto_encoded_keys'] = encoded_keys
61 # Now that the bytes are all encoded, dump the json.
62 json_string = json.dumps(encoded_token)
64 # base64 encode the json string to produce an opaque token string.
65 return base64.b64encode(json_string.encode('utf-8')).decode('utf-8')
67 def _encode(self, data, path):
68 """Encode bytes in given data, keeping track of the path traversed."""
69 if isinstance(data, dict):
70 return self._encode_dict(data, path)
71 elif isinstance(data, list):
72 return self._encode_list(data, path)
73 elif isinstance(data, bytes):
74 return self._encode_bytes(data, path)
75 else:
76 return data, []
78 def _encode_list(self, data, path):
79 """Encode any bytes in a list, noting the index of what is encoded."""
80 new_data = []
81 encoded = []
82 for i, value in enumerate(data):
83 new_path = path + [i]
84 new_value, new_encoded = self._encode(value, new_path)
85 new_data.append(new_value)
86 encoded.extend(new_encoded)
87 return new_data, encoded
89 def _encode_dict(self, data, path):
90 """Encode any bytes in a dict, noting the index of what is encoded."""
91 new_data = {}
92 encoded = []
93 for key, value in data.items():
94 new_path = path + [key]
95 new_value, new_encoded = self._encode(value, new_path)
96 new_data[key] = new_value
97 encoded.extend(new_encoded)
98 return new_data, encoded
100 def _encode_bytes(self, data, path):
101 """Base64 encode a byte string."""
102 return base64.b64encode(data).decode('utf-8'), [path]
105class TokenDecoder:
106 """Decodes token strings back into dictionaries.
108 This performs the inverse operation to the TokenEncoder, accepting
109 opaque strings and decoding them into a useable form.
110 """
112 def decode(self, token):
113 """Decodes an opaque string to a dictionary.
115 :type token: str
116 :param token: A token string given by the botocore pagination
117 interface.
119 :rtype: dict
120 :returns: A dictionary containing pagination information,
121 particularly the service pagination token(s) but also other boto
122 metadata.
123 """
124 json_string = base64.b64decode(token.encode('utf-8')).decode('utf-8')
125 decoded_token = json.loads(json_string)
127 # Remove the encoding metadata as it is read since it will no longer
128 # be needed.
129 encoded_keys = decoded_token.pop('boto_encoded_keys', None)
130 if encoded_keys is None:
131 return decoded_token
132 else:
133 return self._decode(decoded_token, encoded_keys)
135 def _decode(self, token, encoded_keys):
136 """Find each encoded value and decode it."""
137 for key in encoded_keys:
138 encoded = self._path_get(token, key)
139 decoded = base64.b64decode(encoded.encode('utf-8'))
140 self._path_set(token, key, decoded)
141 return token
143 def _path_get(self, data, path):
144 """Return the nested data at the given path.
146 For instance:
147 data = {'foo': ['bar', 'baz']}
148 path = ['foo', 0]
149 ==> 'bar'
150 """
151 # jmespath isn't used here because it would be difficult to actually
152 # create the jmespath query when taking all of the unknowns of key
153 # structure into account. Gross though this is, it is simple and not
154 # very error prone.
155 d = data
156 for step in path:
157 d = d[step]
158 return d
160 def _path_set(self, data, path, value):
161 """Set the value of a key in the given data.
163 Example:
164 data = {'foo': ['bar', 'baz']}
165 path = ['foo', 1]
166 value = 'bin'
167 ==> data = {'foo': ['bar', 'bin']}
168 """
169 container = self._path_get(data, path[:-1])
170 container[path[-1]] = value
173class PaginatorModel:
174 def __init__(self, paginator_config):
175 self._paginator_config = paginator_config['pagination']
177 def get_paginator(self, operation_name):
178 try:
179 single_paginator_config = self._paginator_config[operation_name]
180 except KeyError:
181 raise ValueError(
182 "Paginator for operation does not exist: %s" % operation_name
183 )
184 return single_paginator_config
187class PageIterator:
188 """An iterable object to paginate API results.
189 Please note it is NOT a python iterator.
190 Use ``iter`` to wrap this as a generator.
191 """
193 def __init__(
194 self,
195 method,
196 input_token,
197 output_token,
198 more_results,
199 result_keys,
200 non_aggregate_keys,
201 limit_key,
202 max_items,
203 starting_token,
204 page_size,
205 op_kwargs,
206 ):
207 self._method = method
208 self._input_token = input_token
209 self._output_token = output_token
210 self._more_results = more_results
211 self._result_keys = result_keys
212 self._max_items = max_items
213 self._limit_key = limit_key
214 self._starting_token = starting_token
215 self._page_size = page_size
216 self._op_kwargs = op_kwargs
217 self._resume_token = None
218 self._non_aggregate_key_exprs = non_aggregate_keys
219 self._non_aggregate_part = {}
220 self._token_encoder = TokenEncoder()
221 self._token_decoder = TokenDecoder()
223 @property
224 def result_keys(self):
225 return self._result_keys
227 @property
228 def resume_token(self):
229 """Token to specify to resume pagination."""
230 return self._resume_token
232 @resume_token.setter
233 def resume_token(self, value):
234 if not isinstance(value, dict):
235 raise ValueError("Bad starting token: %s" % value)
237 if 'boto_truncate_amount' in value:
238 token_keys = sorted(self._input_token + ['boto_truncate_amount'])
239 else:
240 token_keys = sorted(self._input_token)
241 dict_keys = sorted(value.keys())
243 if token_keys == dict_keys:
244 self._resume_token = self._token_encoder.encode(value)
245 else:
246 raise ValueError("Bad starting token: %s" % value)
248 @property
249 def non_aggregate_part(self):
250 return self._non_aggregate_part
252 def __iter__(self):
253 current_kwargs = self._op_kwargs
254 previous_next_token = None
255 next_token = {key: None for key in self._input_token}
256 if self._starting_token is not None:
257 # If the starting token exists, populate the next_token with the
258 # values inside it. This ensures that we have the service's
259 # pagination token on hand if we need to truncate after the
260 # first response.
261 next_token = self._parse_starting_token()[0]
262 # The number of items from result_key we've seen so far.
263 total_items = 0
264 first_request = True
265 primary_result_key = self.result_keys[0]
266 starting_truncation = 0
267 self._inject_starting_params(current_kwargs)
268 while True:
269 response = self._make_request(current_kwargs)
270 parsed = self._extract_parsed_response(response)
271 if first_request:
272 # The first request is handled differently. We could
273 # possibly have a resume/starting token that tells us where
274 # to index into the retrieved page.
275 if self._starting_token is not None:
276 starting_truncation = self._handle_first_request(
277 parsed, primary_result_key, starting_truncation
278 )
279 first_request = False
280 self._record_non_aggregate_key_values(parsed)
281 else:
282 # If this isn't the first request, we have already sliced into
283 # the first request and had to make additional requests after.
284 # We no longer need to add this to truncation.
285 starting_truncation = 0
286 current_response = primary_result_key.search(parsed)
287 if current_response is None:
288 current_response = []
289 num_current_response = len(current_response)
290 truncate_amount = 0
291 if self._max_items is not None:
292 truncate_amount = (
293 total_items + num_current_response - self._max_items
294 )
295 if truncate_amount > 0:
296 self._truncate_response(
297 parsed,
298 primary_result_key,
299 truncate_amount,
300 starting_truncation,
301 next_token,
302 )
303 yield response
304 break
305 else:
306 yield response
307 total_items += num_current_response
308 next_token = self._get_next_token(parsed)
309 if all(t is None for t in next_token.values()):
310 break
311 if (
312 self._max_items is not None
313 and total_items == self._max_items
314 ):
315 # We're on a page boundary so we can set the current
316 # next token to be the resume token.
317 self.resume_token = next_token
318 break
319 if (
320 previous_next_token is not None
321 and previous_next_token == next_token
322 ):
323 message = (
324 f"The same next token was received "
325 f"twice: {next_token}"
326 )
327 raise PaginationError(message=message)
328 self._inject_token_into_kwargs(current_kwargs, next_token)
329 previous_next_token = next_token
331 def search(self, expression):
332 """Applies a JMESPath expression to a paginator
334 Each page of results is searched using the provided JMESPath
335 expression. If the result is not a list, it is yielded
336 directly. If the result is a list, each element in the result
337 is yielded individually (essentially implementing a flatmap in
338 which the JMESPath search is the mapping function).
340 :type expression: str
341 :param expression: JMESPath expression to apply to each page.
343 :return: Returns an iterator that yields the individual
344 elements of applying a JMESPath expression to each page of
345 results.
346 """
347 compiled = jmespath.compile(expression)
348 for page in self:
349 results = compiled.search(page)
350 if isinstance(results, list):
351 yield from results
352 else:
353 # Yield result directly if it is not a list.
354 yield results
356 def _make_request(self, current_kwargs):
357 return self._method(**current_kwargs)
359 def _extract_parsed_response(self, response):
360 return response
362 def _record_non_aggregate_key_values(self, response):
363 non_aggregate_keys = {}
364 for expression in self._non_aggregate_key_exprs:
365 result = expression.search(response)
366 set_value_from_jmespath(
367 non_aggregate_keys, expression.expression, result
368 )
369 self._non_aggregate_part = non_aggregate_keys
371 def _inject_starting_params(self, op_kwargs):
372 # If the user has specified a starting token we need to
373 # inject that into the operation's kwargs.
374 if self._starting_token is not None:
375 # Don't need to do anything special if there is no starting
376 # token specified.
377 next_token = self._parse_starting_token()[0]
378 self._inject_token_into_kwargs(op_kwargs, next_token)
379 if self._page_size is not None:
380 # Pass the page size as the parameter name for limiting
381 # page size, also known as the limit_key.
382 op_kwargs[self._limit_key] = self._page_size
384 def _inject_token_into_kwargs(self, op_kwargs, next_token):
385 for name, token in next_token.items():
386 if (token is not None) and (token != 'None'):
387 op_kwargs[name] = token
388 elif name in op_kwargs:
389 del op_kwargs[name]
391 def _handle_first_request(
392 self, parsed, primary_result_key, starting_truncation
393 ):
394 # If the payload is an array or string, we need to slice into it
395 # and only return the truncated amount.
396 starting_truncation = self._parse_starting_token()[1]
397 all_data = primary_result_key.search(parsed)
398 if isinstance(all_data, (list, str)):
399 data = all_data[starting_truncation:]
400 else:
401 data = None
402 set_value_from_jmespath(parsed, primary_result_key.expression, data)
403 # We also need to truncate any secondary result keys
404 # because they were not truncated in the previous last
405 # response.
406 for token in self.result_keys:
407 if token == primary_result_key:
408 continue
409 sample = token.search(parsed)
410 if isinstance(sample, list):
411 empty_value = []
412 elif isinstance(sample, str):
413 empty_value = ''
414 elif isinstance(sample, (int, float)):
415 empty_value = 0
416 else:
417 empty_value = None
418 set_value_from_jmespath(parsed, token.expression, empty_value)
419 return starting_truncation
421 def _truncate_response(
422 self,
423 parsed,
424 primary_result_key,
425 truncate_amount,
426 starting_truncation,
427 next_token,
428 ):
429 original = primary_result_key.search(parsed)
430 if original is None:
431 original = []
432 amount_to_keep = len(original) - truncate_amount
433 truncated = original[:amount_to_keep]
434 set_value_from_jmespath(
435 parsed, primary_result_key.expression, truncated
436 )
437 # The issue here is that even though we know how much we've truncated
438 # we need to account for this globally including any starting
439 # left truncation. For example:
440 # Raw response: [0,1,2,3]
441 # Starting index: 1
442 # Max items: 1
443 # Starting left truncation: [1, 2, 3]
444 # End right truncation for max items: [1]
445 # However, even though we only kept 1, this is post
446 # left truncation so the next starting index should be 2, not 1
447 # (left_truncation + amount_to_keep).
448 next_token['boto_truncate_amount'] = (
449 amount_to_keep + starting_truncation
450 )
451 self.resume_token = next_token
453 def _get_next_token(self, parsed):
454 if self._more_results is not None:
455 if not self._more_results.search(parsed):
456 return {}
457 next_tokens = {}
458 for output_token, input_key in zip(
459 self._output_token, self._input_token
460 ):
461 next_token = output_token.search(parsed)
462 # We do not want to include any empty strings as actual tokens.
463 # Treat them as None.
464 if next_token:
465 next_tokens[input_key] = next_token
466 else:
467 next_tokens[input_key] = None
468 return next_tokens
470 def result_key_iters(self):
471 teed_results = tee(self, len(self.result_keys))
472 return [
473 ResultKeyIterator(i, result_key)
474 for i, result_key in zip(teed_results, self.result_keys)
475 ]
477 def build_full_result(self):
478 complete_result = {}
479 for response in self:
480 page = response
481 # We want to try to catch operation object pagination
482 # and format correctly for those. They come in the form
483 # of a tuple of two elements: (http_response, parsed_responsed).
484 # We want the parsed_response as that is what the page iterator
485 # uses. We can remove it though once operation objects are removed.
486 if isinstance(response, tuple) and len(response) == 2:
487 page = response[1]
488 # We're incrementally building the full response page
489 # by page. For each page in the response we need to
490 # inject the necessary components from the page
491 # into the complete_result.
492 for result_expression in self.result_keys:
493 # In order to incrementally update a result key
494 # we need to search the existing value from complete_result,
495 # then we need to search the _current_ page for the
496 # current result key value. Then we append the current
497 # value onto the existing value, and re-set that value
498 # as the new value.
499 result_value = result_expression.search(page)
500 if result_value is None:
501 continue
502 existing_value = result_expression.search(complete_result)
503 if existing_value is None:
504 # Set the initial result
505 set_value_from_jmespath(
506 complete_result,
507 result_expression.expression,
508 result_value,
509 )
510 continue
511 # Now both result_value and existing_value contain something
512 if isinstance(result_value, list):
513 existing_value.extend(result_value)
514 elif isinstance(result_value, (int, float, str)):
515 # Modify the existing result with the sum or concatenation
516 set_value_from_jmespath(
517 complete_result,
518 result_expression.expression,
519 existing_value + result_value,
520 )
521 merge_dicts(complete_result, self.non_aggregate_part)
522 if self.resume_token is not None:
523 complete_result['NextToken'] = self.resume_token
524 return complete_result
526 def _parse_starting_token(self):
527 if self._starting_token is None:
528 return None
530 # The starting token is a dict passed as a base64 encoded string.
531 next_token = self._starting_token
532 try:
533 next_token = self._token_decoder.decode(next_token)
534 index = 0
535 if 'boto_truncate_amount' in next_token:
536 index = next_token.get('boto_truncate_amount')
537 del next_token['boto_truncate_amount']
538 except (ValueError, TypeError):
539 next_token, index = self._parse_starting_token_deprecated()
540 return next_token, index
542 def _parse_starting_token_deprecated(self):
543 """
544 This handles parsing of old style starting tokens, and attempts to
545 coerce them into the new style.
546 """
547 log.debug(
548 "Attempting to fall back to old starting token parser. For "
549 "token: %s" % self._starting_token
550 )
551 if self._starting_token is None:
552 return None
554 parts = self._starting_token.split('___')
555 next_token = []
556 index = 0
557 if len(parts) == len(self._input_token) + 1:
558 try:
559 index = int(parts.pop())
560 except ValueError:
561 # This doesn't look like a valid old-style token, so we're
562 # passing it along as an opaque service token.
563 parts = [self._starting_token]
565 for part in parts:
566 if part == 'None':
567 next_token.append(None)
568 else:
569 next_token.append(part)
570 return self._convert_deprecated_starting_token(next_token), index
572 def _convert_deprecated_starting_token(self, deprecated_token):
573 """
574 This attempts to convert a deprecated starting token into the new
575 style.
576 """
577 len_deprecated_token = len(deprecated_token)
578 len_input_token = len(self._input_token)
579 if len_deprecated_token > len_input_token:
580 raise ValueError("Bad starting token: %s" % self._starting_token)
581 elif len_deprecated_token < len_input_token:
582 log.debug(
583 "Old format starting token does not contain all input "
584 "tokens. Setting the rest, in order, as None."
585 )
586 for i in range(len_input_token - len_deprecated_token):
587 deprecated_token.append(None)
588 return dict(zip(self._input_token, deprecated_token))
591class Paginator:
592 PAGE_ITERATOR_CLS = PageIterator
594 def __init__(self, method, pagination_config, model):
595 self._model = model
596 self._method = method
597 self._pagination_cfg = pagination_config
598 self._output_token = self._get_output_tokens(self._pagination_cfg)
599 self._input_token = self._get_input_tokens(self._pagination_cfg)
600 self._more_results = self._get_more_results_token(self._pagination_cfg)
601 self._non_aggregate_keys = self._get_non_aggregate_keys(
602 self._pagination_cfg
603 )
604 self._result_keys = self._get_result_keys(self._pagination_cfg)
605 self._limit_key = self._get_limit_key(self._pagination_cfg)
607 @property
608 def result_keys(self):
609 return self._result_keys
611 def _get_non_aggregate_keys(self, config):
612 keys = []
613 for key in config.get('non_aggregate_keys', []):
614 keys.append(jmespath.compile(key))
615 return keys
617 def _get_output_tokens(self, config):
618 output = []
619 output_token = config['output_token']
620 if not isinstance(output_token, list):
621 output_token = [output_token]
622 for config in output_token:
623 output.append(jmespath.compile(config))
624 return output
626 def _get_input_tokens(self, config):
627 input_token = self._pagination_cfg['input_token']
628 if not isinstance(input_token, list):
629 input_token = [input_token]
630 return input_token
632 def _get_more_results_token(self, config):
633 more_results = config.get('more_results')
634 if more_results is not None:
635 return jmespath.compile(more_results)
637 def _get_result_keys(self, config):
638 result_key = config.get('result_key')
639 if result_key is not None:
640 if not isinstance(result_key, list):
641 result_key = [result_key]
642 result_key = [jmespath.compile(rk) for rk in result_key]
643 return result_key
645 def _get_limit_key(self, config):
646 return config.get('limit_key')
648 def paginate(self, **kwargs):
649 """Create paginator object for an operation.
651 This returns an iterable object. Iterating over
652 this object will yield a single page of a response
653 at a time.
655 """
656 page_params = self._extract_paging_params(kwargs)
657 return self.PAGE_ITERATOR_CLS(
658 self._method,
659 self._input_token,
660 self._output_token,
661 self._more_results,
662 self._result_keys,
663 self._non_aggregate_keys,
664 self._limit_key,
665 page_params['MaxItems'],
666 page_params['StartingToken'],
667 page_params['PageSize'],
668 kwargs,
669 )
671 def _extract_paging_params(self, kwargs):
672 pagination_config = kwargs.pop('PaginationConfig', {})
673 max_items = pagination_config.get('MaxItems', None)
674 if max_items is not None:
675 max_items = int(max_items)
676 page_size = pagination_config.get('PageSize', None)
677 if page_size is not None:
678 if self._limit_key is None:
679 raise PaginationError(
680 message="PageSize parameter is not supported for the "
681 "pagination interface for this operation."
682 )
683 input_members = self._model.input_shape.members
684 limit_key_shape = input_members.get(self._limit_key)
685 if limit_key_shape.type_name == 'string':
686 if not isinstance(page_size, str):
687 page_size = str(page_size)
688 else:
689 page_size = int(page_size)
690 return {
691 'MaxItems': max_items,
692 'StartingToken': pagination_config.get('StartingToken', None),
693 'PageSize': page_size,
694 }
697class ResultKeyIterator:
698 """Iterates over the results of paginated responses.
700 Each iterator is associated with a single result key.
701 Iterating over this object will give you each element in
702 the result key list.
704 :param pages_iterator: An iterator that will give you
705 pages of results (a ``PageIterator`` class).
706 :param result_key: The JMESPath expression representing
707 the result key.
709 """
711 def __init__(self, pages_iterator, result_key):
712 self._pages_iterator = pages_iterator
713 self.result_key = result_key
715 def __iter__(self):
716 for page in self._pages_iterator:
717 results = self.result_key.search(page)
718 if results is None:
719 results = []
720 yield from results