1# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13
14import base64
15import json
16import logging
17from functools import partial
18from itertools import tee
19
20import jmespath
21
22from botocore.context import with_current_context
23from botocore.exceptions import PaginationError
24from botocore.useragent import register_feature_id
25from botocore.utils import merge_dicts, set_value_from_jmespath
26
27log = logging.getLogger(__name__)
28
29
30class TokenEncoder:
31 """Encodes dictionaries into opaque strings.
32
33 This for the most part json dumps + base64 encoding, but also supports
34 having bytes in the dictionary in addition to the types that json can
35 handle by default.
36
37 This is intended for use in encoding pagination tokens, which in some
38 cases can be complex structures and / or contain bytes.
39 """
40
41 def encode(self, token):
42 """Encodes a dictionary to an opaque string.
43
44 :type token: dict
45 :param token: A dictionary containing pagination information,
46 particularly the service pagination token(s) but also other boto
47 metadata.
48
49 :rtype: str
50 :returns: An opaque string
51 """
52 try:
53 # Try just using json dumps first to avoid having to traverse
54 # and encode the dict. In 99.9999% of cases this will work.
55 json_string = json.dumps(token)
56 except (TypeError, UnicodeDecodeError):
57 # If normal dumping failed, go through and base64 encode all bytes.
58 encoded_token, encoded_keys = self._encode(token, [])
59
60 # Save the list of all the encoded key paths. We can safely
61 # assume that no service will ever use this key.
62 encoded_token['boto_encoded_keys'] = encoded_keys
63
64 # Now that the bytes are all encoded, dump the json.
65 json_string = json.dumps(encoded_token)
66
67 # base64 encode the json string to produce an opaque token string.
68 return base64.b64encode(json_string.encode('utf-8')).decode('utf-8')
69
70 def _encode(self, data, path):
71 """Encode bytes in given data, keeping track of the path traversed."""
72 if isinstance(data, dict):
73 return self._encode_dict(data, path)
74 elif isinstance(data, list):
75 return self._encode_list(data, path)
76 elif isinstance(data, bytes):
77 return self._encode_bytes(data, path)
78 else:
79 return data, []
80
81 def _encode_list(self, data, path):
82 """Encode any bytes in a list, noting the index of what is encoded."""
83 new_data = []
84 encoded = []
85 for i, value in enumerate(data):
86 new_path = path + [i]
87 new_value, new_encoded = self._encode(value, new_path)
88 new_data.append(new_value)
89 encoded.extend(new_encoded)
90 return new_data, encoded
91
92 def _encode_dict(self, data, path):
93 """Encode any bytes in a dict, noting the index of what is encoded."""
94 new_data = {}
95 encoded = []
96 for key, value in data.items():
97 new_path = path + [key]
98 new_value, new_encoded = self._encode(value, new_path)
99 new_data[key] = new_value
100 encoded.extend(new_encoded)
101 return new_data, encoded
102
103 def _encode_bytes(self, data, path):
104 """Base64 encode a byte string."""
105 return base64.b64encode(data).decode('utf-8'), [path]
106
107
108class TokenDecoder:
109 """Decodes token strings back into dictionaries.
110
111 This performs the inverse operation to the TokenEncoder, accepting
112 opaque strings and decoding them into a useable form.
113 """
114
115 def decode(self, token):
116 """Decodes an opaque string to a dictionary.
117
118 :type token: str
119 :param token: A token string given by the botocore pagination
120 interface.
121
122 :rtype: dict
123 :returns: A dictionary containing pagination information,
124 particularly the service pagination token(s) but also other boto
125 metadata.
126 """
127 json_string = base64.b64decode(token.encode('utf-8')).decode('utf-8')
128 decoded_token = json.loads(json_string)
129
130 # Remove the encoding metadata as it is read since it will no longer
131 # be needed.
132 encoded_keys = decoded_token.pop('boto_encoded_keys', None)
133 if encoded_keys is None:
134 return decoded_token
135 else:
136 return self._decode(decoded_token, encoded_keys)
137
138 def _decode(self, token, encoded_keys):
139 """Find each encoded value and decode it."""
140 for key in encoded_keys:
141 encoded = self._path_get(token, key)
142 decoded = base64.b64decode(encoded.encode('utf-8'))
143 self._path_set(token, key, decoded)
144 return token
145
146 def _path_get(self, data, path):
147 """Return the nested data at the given path.
148
149 For instance:
150 data = {'foo': ['bar', 'baz']}
151 path = ['foo', 0]
152 ==> 'bar'
153 """
154 # jmespath isn't used here because it would be difficult to actually
155 # create the jmespath query when taking all of the unknowns of key
156 # structure into account. Gross though this is, it is simple and not
157 # very error prone.
158 d = data
159 for step in path:
160 d = d[step]
161 return d
162
163 def _path_set(self, data, path, value):
164 """Set the value of a key in the given data.
165
166 Example:
167 data = {'foo': ['bar', 'baz']}
168 path = ['foo', 1]
169 value = 'bin'
170 ==> data = {'foo': ['bar', 'bin']}
171 """
172 container = self._path_get(data, path[:-1])
173 container[path[-1]] = value
174
175
176class PaginatorModel:
177 def __init__(self, paginator_config):
178 self._paginator_config = paginator_config['pagination']
179
180 def get_paginator(self, operation_name):
181 try:
182 single_paginator_config = self._paginator_config[operation_name]
183 except KeyError:
184 raise ValueError(
185 f"Paginator for operation does not exist: {operation_name}"
186 )
187 return single_paginator_config
188
189
190class PageIterator:
191 """An iterable object to paginate API results.
192 Please note it is NOT a python iterator.
193 Use ``iter`` to wrap this as a generator.
194 """
195
196 def __init__(
197 self,
198 method,
199 input_token,
200 output_token,
201 more_results,
202 result_keys,
203 non_aggregate_keys,
204 limit_key,
205 max_items,
206 starting_token,
207 page_size,
208 op_kwargs,
209 ):
210 self._method = method
211 self._input_token = input_token
212 self._output_token = output_token
213 self._more_results = more_results
214 self._result_keys = result_keys
215 self._max_items = max_items
216 self._limit_key = limit_key
217 self._starting_token = starting_token
218 self._page_size = page_size
219 self._op_kwargs = op_kwargs
220 self._resume_token = None
221 self._non_aggregate_key_exprs = non_aggregate_keys
222 self._non_aggregate_part = {}
223 self._token_encoder = TokenEncoder()
224 self._token_decoder = TokenDecoder()
225
226 @property
227 def result_keys(self):
228 return self._result_keys
229
230 @property
231 def resume_token(self):
232 """Token to specify to resume pagination."""
233 return self._resume_token
234
235 @resume_token.setter
236 def resume_token(self, value):
237 if not isinstance(value, dict):
238 raise ValueError(f"Bad starting token: {value}")
239
240 if 'boto_truncate_amount' in value:
241 token_keys = sorted(self._input_token + ['boto_truncate_amount'])
242 else:
243 token_keys = sorted(self._input_token)
244 dict_keys = sorted(value.keys())
245
246 if token_keys == dict_keys:
247 self._resume_token = self._token_encoder.encode(value)
248 else:
249 raise ValueError(f"Bad starting token: {value}")
250
251 @property
252 def non_aggregate_part(self):
253 return self._non_aggregate_part
254
255 def __iter__(self):
256 current_kwargs = self._op_kwargs
257 previous_next_token = None
258 next_token = {key: None for key in self._input_token}
259 if self._starting_token is not None:
260 # If the starting token exists, populate the next_token with the
261 # values inside it. This ensures that we have the service's
262 # pagination token on hand if we need to truncate after the
263 # first response.
264 next_token = self._parse_starting_token()[0]
265 # The number of items from result_key we've seen so far.
266 total_items = 0
267 first_request = True
268 primary_result_key = self.result_keys[0]
269 starting_truncation = 0
270 self._inject_starting_params(current_kwargs)
271 while True:
272 response = self._make_request(current_kwargs)
273 parsed = self._extract_parsed_response(response)
274 if first_request:
275 # The first request is handled differently. We could
276 # possibly have a resume/starting token that tells us where
277 # to index into the retrieved page.
278 if self._starting_token is not None:
279 starting_truncation = self._handle_first_request(
280 parsed, primary_result_key, starting_truncation
281 )
282 first_request = False
283 self._record_non_aggregate_key_values(parsed)
284 else:
285 # If this isn't the first request, we have already sliced into
286 # the first request and had to make additional requests after.
287 # We no longer need to add this to truncation.
288 starting_truncation = 0
289 current_response = primary_result_key.search(parsed)
290 if current_response is None:
291 current_response = []
292 num_current_response = len(current_response)
293 truncate_amount = 0
294 if self._max_items is not None:
295 truncate_amount = (
296 total_items + num_current_response - self._max_items
297 )
298 if truncate_amount > 0:
299 self._truncate_response(
300 parsed,
301 primary_result_key,
302 truncate_amount,
303 starting_truncation,
304 next_token,
305 )
306 yield response
307 break
308 else:
309 yield response
310 total_items += num_current_response
311 next_token = self._get_next_token(parsed)
312 if all(t is None for t in next_token.values()):
313 break
314 if (
315 self._max_items is not None
316 and total_items == self._max_items
317 ):
318 # We're on a page boundary so we can set the current
319 # next token to be the resume token.
320 self.resume_token = next_token
321 break
322 if (
323 previous_next_token is not None
324 and previous_next_token == next_token
325 ):
326 message = (
327 f"The same next token was received twice: {next_token}"
328 )
329 raise PaginationError(message=message)
330 self._inject_token_into_kwargs(current_kwargs, next_token)
331 previous_next_token = next_token
332
333 def search(self, expression):
334 """Applies a JMESPath expression to a paginator
335
336 Each page of results is searched using the provided JMESPath
337 expression. If the result is not a list, it is yielded
338 directly. If the result is a list, each element in the result
339 is yielded individually (essentially implementing a flatmap in
340 which the JMESPath search is the mapping function).
341
342 :type expression: str
343 :param expression: JMESPath expression to apply to each page.
344
345 :return: Returns an iterator that yields the individual
346 elements of applying a JMESPath expression to each page of
347 results.
348 """
349 compiled = jmespath.compile(expression)
350 for page in self:
351 results = compiled.search(page)
352 if isinstance(results, list):
353 yield from results
354 else:
355 # Yield result directly if it is not a list.
356 yield results
357
358 @with_current_context(partial(register_feature_id, 'PAGINATOR'))
359 def _make_request(self, current_kwargs):
360 return self._method(**current_kwargs)
361
362 def _extract_parsed_response(self, response):
363 return response
364
365 def _record_non_aggregate_key_values(self, response):
366 non_aggregate_keys = {}
367 for expression in self._non_aggregate_key_exprs:
368 result = expression.search(response)
369 set_value_from_jmespath(
370 non_aggregate_keys, expression.expression, result
371 )
372 self._non_aggregate_part = non_aggregate_keys
373
374 def _inject_starting_params(self, op_kwargs):
375 # If the user has specified a starting token we need to
376 # inject that into the operation's kwargs.
377 if self._starting_token is not None:
378 # Don't need to do anything special if there is no starting
379 # token specified.
380 next_token = self._parse_starting_token()[0]
381 self._inject_token_into_kwargs(op_kwargs, next_token)
382 if self._page_size is not None:
383 # Pass the page size as the parameter name for limiting
384 # page size, also known as the limit_key.
385 op_kwargs[self._limit_key] = self._page_size
386
387 def _inject_token_into_kwargs(self, op_kwargs, next_token):
388 for name, token in next_token.items():
389 if (token is not None) and (token != 'None'):
390 op_kwargs[name] = token
391 elif name in op_kwargs:
392 del op_kwargs[name]
393
394 def _handle_first_request(
395 self, parsed, primary_result_key, starting_truncation
396 ):
397 # If the payload is an array or string, we need to slice into it
398 # and only return the truncated amount.
399 starting_truncation = self._parse_starting_token()[1]
400 all_data = primary_result_key.search(parsed)
401 if isinstance(all_data, (list, str)):
402 data = all_data[starting_truncation:]
403 else:
404 data = None
405 set_value_from_jmespath(parsed, primary_result_key.expression, data)
406 # We also need to truncate any secondary result keys
407 # because they were not truncated in the previous last
408 # response.
409 for token in self.result_keys:
410 if token == primary_result_key:
411 continue
412 sample = token.search(parsed)
413 if isinstance(sample, list):
414 empty_value = []
415 elif isinstance(sample, str):
416 empty_value = ''
417 elif isinstance(sample, (int, float)):
418 # Even though we may be resuming from a truncated page, we
419 # still start from the actual numeric secondary result. For
420 # DynamoDB's Count/ScannedCount, this will still show how many
421 # items the server evaluated, even if the client is truncating
422 # due to a StartingToken.
423 empty_value = sample
424 else:
425 empty_value = None
426 set_value_from_jmespath(parsed, token.expression, empty_value)
427 return starting_truncation
428
429 def _truncate_response(
430 self,
431 parsed,
432 primary_result_key,
433 truncate_amount,
434 starting_truncation,
435 next_token,
436 ):
437 original = primary_result_key.search(parsed)
438 if original is None:
439 original = []
440 amount_to_keep = len(original) - truncate_amount
441 truncated = original[:amount_to_keep]
442 set_value_from_jmespath(
443 parsed, primary_result_key.expression, truncated
444 )
445 # The issue here is that even though we know how much we've truncated
446 # we need to account for this globally including any starting
447 # left truncation. For example:
448 # Raw response: [0,1,2,3]
449 # Starting index: 1
450 # Max items: 1
451 # Starting left truncation: [1, 2, 3]
452 # End right truncation for max items: [1]
453 # However, even though we only kept 1, this is post
454 # left truncation so the next starting index should be 2, not 1
455 # (left_truncation + amount_to_keep).
456 next_token['boto_truncate_amount'] = (
457 amount_to_keep + starting_truncation
458 )
459 self.resume_token = next_token
460
461 def _get_next_token(self, parsed):
462 if self._more_results is not None:
463 if not self._more_results.search(parsed):
464 return {}
465 next_tokens = {}
466 for output_token, input_key in zip(
467 self._output_token, self._input_token
468 ):
469 next_token = output_token.search(parsed)
470 # We do not want to include any empty strings as actual tokens.
471 # Treat them as None.
472 if next_token:
473 next_tokens[input_key] = next_token
474 else:
475 next_tokens[input_key] = None
476 return next_tokens
477
478 def result_key_iters(self):
479 teed_results = tee(self, len(self.result_keys))
480 return [
481 ResultKeyIterator(i, result_key)
482 for i, result_key in zip(teed_results, self.result_keys)
483 ]
484
485 def build_full_result(self):
486 complete_result = {}
487 for response in self:
488 page = response
489 # We want to try to catch operation object pagination
490 # and format correctly for those. They come in the form
491 # of a tuple of two elements: (http_response, parsed_responsed).
492 # We want the parsed_response as that is what the page iterator
493 # uses. We can remove it though once operation objects are removed.
494 if isinstance(response, tuple) and len(response) == 2:
495 page = response[1]
496 # We're incrementally building the full response page
497 # by page. For each page in the response we need to
498 # inject the necessary components from the page
499 # into the complete_result.
500 for result_expression in self.result_keys:
501 # In order to incrementally update a result key
502 # we need to search the existing value from complete_result,
503 # then we need to search the _current_ page for the
504 # current result key value. Then we append the current
505 # value onto the existing value, and re-set that value
506 # as the new value.
507 result_value = result_expression.search(page)
508 if result_value is None:
509 continue
510 existing_value = result_expression.search(complete_result)
511 if existing_value is None:
512 # Set the initial result
513 set_value_from_jmespath(
514 complete_result,
515 result_expression.expression,
516 result_value,
517 )
518 continue
519 # Now both result_value and existing_value contain something
520 if isinstance(result_value, list):
521 existing_value.extend(result_value)
522 elif isinstance(result_value, (int, float, str)):
523 # Modify the existing result with the sum or concatenation
524 set_value_from_jmespath(
525 complete_result,
526 result_expression.expression,
527 existing_value + result_value,
528 )
529 merge_dicts(complete_result, self.non_aggregate_part)
530 if self.resume_token is not None:
531 complete_result['NextToken'] = self.resume_token
532 return complete_result
533
534 def _parse_starting_token(self):
535 if self._starting_token is None:
536 return None
537
538 # The starting token is a dict passed as a base64 encoded string.
539 next_token = self._starting_token
540 try:
541 next_token = self._token_decoder.decode(next_token)
542 index = 0
543 if 'boto_truncate_amount' in next_token:
544 index = next_token.get('boto_truncate_amount')
545 del next_token['boto_truncate_amount']
546 except (ValueError, TypeError):
547 next_token, index = self._parse_starting_token_deprecated()
548 return next_token, index
549
550 def _parse_starting_token_deprecated(self):
551 """
552 This handles parsing of old style starting tokens, and attempts to
553 coerce them into the new style.
554 """
555 log.debug(
556 "Attempting to fall back to old starting token parser. For token: %s",
557 self._starting_token,
558 )
559 if self._starting_token is None:
560 return None
561
562 parts = self._starting_token.split('___')
563 next_token = []
564 index = 0
565 if len(parts) == len(self._input_token) + 1:
566 try:
567 index = int(parts.pop())
568 except ValueError:
569 # This doesn't look like a valid old-style token, so we're
570 # passing it along as an opaque service token.
571 parts = [self._starting_token]
572
573 for part in parts:
574 if part == 'None':
575 next_token.append(None)
576 else:
577 next_token.append(part)
578 return self._convert_deprecated_starting_token(next_token), index
579
580 def _convert_deprecated_starting_token(self, deprecated_token):
581 """
582 This attempts to convert a deprecated starting token into the new
583 style.
584 """
585 len_deprecated_token = len(deprecated_token)
586 len_input_token = len(self._input_token)
587 if len_deprecated_token > len_input_token:
588 raise ValueError(f"Bad starting token: {self._starting_token}")
589 elif len_deprecated_token < len_input_token:
590 log.debug(
591 "Old format starting token does not contain all input "
592 "tokens. Setting the rest, in order, as None."
593 )
594 for i in range(len_input_token - len_deprecated_token):
595 deprecated_token.append(None)
596 return dict(zip(self._input_token, deprecated_token))
597
598
599class Paginator:
600 PAGE_ITERATOR_CLS = PageIterator
601
602 def __init__(self, method, pagination_config, model):
603 self._model = model
604 self._method = method
605 self._pagination_cfg = pagination_config
606 self._output_token = self._get_output_tokens(self._pagination_cfg)
607 self._input_token = self._get_input_tokens(self._pagination_cfg)
608 self._more_results = self._get_more_results_token(self._pagination_cfg)
609 self._non_aggregate_keys = self._get_non_aggregate_keys(
610 self._pagination_cfg
611 )
612 self._result_keys = self._get_result_keys(self._pagination_cfg)
613 self._limit_key = self._get_limit_key(self._pagination_cfg)
614
615 @property
616 def result_keys(self):
617 return self._result_keys
618
619 def _get_non_aggregate_keys(self, config):
620 keys = []
621 for key in config.get('non_aggregate_keys', []):
622 keys.append(jmespath.compile(key))
623 return keys
624
625 def _get_output_tokens(self, config):
626 output = []
627 output_token = config['output_token']
628 if not isinstance(output_token, list):
629 output_token = [output_token]
630 for config in output_token:
631 output.append(jmespath.compile(config))
632 return output
633
634 def _get_input_tokens(self, config):
635 input_token = self._pagination_cfg['input_token']
636 if not isinstance(input_token, list):
637 input_token = [input_token]
638 return input_token
639
640 def _get_more_results_token(self, config):
641 more_results = config.get('more_results')
642 if more_results is not None:
643 return jmespath.compile(more_results)
644
645 def _get_result_keys(self, config):
646 result_key = config.get('result_key')
647 if result_key is not None:
648 if not isinstance(result_key, list):
649 result_key = [result_key]
650 result_key = [jmespath.compile(rk) for rk in result_key]
651 return result_key
652
653 def _get_limit_key(self, config):
654 return config.get('limit_key')
655
656 def paginate(self, **kwargs):
657 """Create paginator object for an operation.
658
659 This returns an iterable object. Iterating over
660 this object will yield a single page of a response
661 at a time.
662
663 """
664 page_params = self._extract_paging_params(kwargs)
665 return self.PAGE_ITERATOR_CLS(
666 self._method,
667 self._input_token,
668 self._output_token,
669 self._more_results,
670 self._result_keys,
671 self._non_aggregate_keys,
672 self._limit_key,
673 page_params['MaxItems'],
674 page_params['StartingToken'],
675 page_params['PageSize'],
676 kwargs,
677 )
678
679 def _extract_paging_params(self, kwargs):
680 pagination_config = kwargs.pop('PaginationConfig', {})
681 max_items = pagination_config.get('MaxItems', None)
682 if max_items is not None:
683 max_items = int(max_items)
684 page_size = pagination_config.get('PageSize', None)
685 if page_size is not None:
686 if self._limit_key is None:
687 raise PaginationError(
688 message="PageSize parameter is not supported for the "
689 "pagination interface for this operation."
690 )
691 input_members = self._model.input_shape.members
692 limit_key_shape = input_members.get(self._limit_key)
693 if limit_key_shape.type_name == 'string':
694 if not isinstance(page_size, str):
695 page_size = str(page_size)
696 else:
697 page_size = int(page_size)
698 return {
699 'MaxItems': max_items,
700 'StartingToken': pagination_config.get('StartingToken', None),
701 'PageSize': page_size,
702 }
703
704
705class ResultKeyIterator:
706 """Iterates over the results of paginated responses.
707
708 Each iterator is associated with a single result key.
709 Iterating over this object will give you each element in
710 the result key list.
711
712 :param pages_iterator: An iterator that will give you
713 pages of results (a ``PageIterator`` class).
714 :param result_key: The JMESPath expression representing
715 the result key.
716
717 """
718
719 def __init__(self, pages_iterator, result_key):
720 self._pages_iterator = pages_iterator
721 self.result_key = result_key
722
723 def __iter__(self):
724 for page in self._pages_iterator:
725 results = self.result_key.search(page)
726 if results is None:
727 results = []
728 yield from results