Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/signers.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import base64
14import datetime
15import json
16import weakref
18import botocore
19import botocore.auth
20from botocore.awsrequest import create_request_object, prepare_request_dict
21from botocore.compat import OrderedDict
22from botocore.exceptions import (
23 UnknownClientMethodError,
24 UnknownSignatureVersionError,
25 UnsupportedSignatureVersionError,
26)
27from botocore.utils import ArnParser, datetime2timestamp
29# Keep these imported. There's pre-existing code that uses them.
30from botocore.utils import fix_s3_host # noqa
33class RequestSigner:
34 """
35 An object to sign requests before they go out over the wire using
36 one of the authentication mechanisms defined in ``auth.py``. This
37 class fires two events scoped to a service and operation name:
39 * choose-signer: Allows overriding the auth signer name.
40 * before-sign: Allows mutating the request before signing.
42 Together these events allow for customization of the request
43 signing pipeline, including overrides, request path manipulation,
44 and disabling signing per operation.
47 :type service_id: botocore.model.ServiceId
48 :param service_id: The service id for the service, e.g. ``S3``
50 :type region_name: string
51 :param region_name: Name of the service region, e.g. ``us-east-1``
53 :type signing_name: string
54 :param signing_name: Service signing name. This is usually the
55 same as the service name, but can differ. E.g.
56 ``emr`` vs. ``elasticmapreduce``.
58 :type signature_version: string
59 :param signature_version: Signature name like ``v4``.
61 :type credentials: :py:class:`~botocore.credentials.Credentials`
62 :param credentials: User credentials with which to sign requests.
64 :type event_emitter: :py:class:`~botocore.hooks.BaseEventHooks`
65 :param event_emitter: Extension mechanism to fire events.
66 """
68 def __init__(
69 self,
70 service_id,
71 region_name,
72 signing_name,
73 signature_version,
74 credentials,
75 event_emitter,
76 auth_token=None,
77 ):
78 self._region_name = region_name
79 self._signing_name = signing_name
80 self._signature_version = signature_version
81 self._credentials = credentials
82 self._auth_token = auth_token
83 self._service_id = service_id
85 # We need weakref to prevent leaking memory in Python 2.6 on Linux 2.6
86 self._event_emitter = weakref.proxy(event_emitter)
88 @property
89 def region_name(self):
90 return self._region_name
92 @property
93 def signature_version(self):
94 return self._signature_version
96 @property
97 def signing_name(self):
98 return self._signing_name
100 def handler(self, operation_name=None, request=None, **kwargs):
101 # This is typically hooked up to the "request-created" event
102 # from a client's event emitter. When a new request is created
103 # this method is invoked to sign the request.
104 # Don't call this method directly.
105 return self.sign(operation_name, request)
107 def sign(
108 self,
109 operation_name,
110 request,
111 region_name=None,
112 signing_type='standard',
113 expires_in=None,
114 signing_name=None,
115 ):
116 """Sign a request before it goes out over the wire.
118 :type operation_name: string
119 :param operation_name: The name of the current operation, e.g.
120 ``ListBuckets``.
121 :type request: AWSRequest
122 :param request: The request object to be sent over the wire.
124 :type region_name: str
125 :param region_name: The region to sign the request for.
127 :type signing_type: str
128 :param signing_type: The type of signing to perform. This can be one of
129 three possible values:
131 * 'standard' - This should be used for most requests.
132 * 'presign-url' - This should be used when pre-signing a request.
133 * 'presign-post' - This should be used when pre-signing an S3 post.
135 :type expires_in: int
136 :param expires_in: The number of seconds the presigned url is valid
137 for. This parameter is only valid for signing type 'presign-url'.
139 :type signing_name: str
140 :param signing_name: The name to use for the service when signing.
141 """
142 explicit_region_name = region_name
143 if region_name is None:
144 region_name = self._region_name
146 if signing_name is None:
147 signing_name = self._signing_name
149 signature_version = self._choose_signer(
150 operation_name, signing_type, request.context
151 )
153 # Allow mutating request before signing
154 self._event_emitter.emit(
155 f'before-sign.{self._service_id.hyphenize()}.{operation_name}',
156 request=request,
157 signing_name=signing_name,
158 region_name=self._region_name,
159 signature_version=signature_version,
160 request_signer=self,
161 operation_name=operation_name,
162 )
164 if signature_version != botocore.UNSIGNED:
165 kwargs = {
166 'signing_name': signing_name,
167 'region_name': region_name,
168 'signature_version': signature_version,
169 }
170 if expires_in is not None:
171 kwargs['expires'] = expires_in
172 signing_context = request.context.get('signing', {})
173 if not explicit_region_name and signing_context.get('region'):
174 kwargs['region_name'] = signing_context['region']
175 if signing_context.get('signing_name'):
176 kwargs['signing_name'] = signing_context['signing_name']
177 if signing_context.get('request_credentials'):
178 kwargs['request_credentials'] = signing_context[
179 'request_credentials'
180 ]
181 if signing_context.get('identity_cache') is not None:
182 self._resolve_identity_cache(
183 kwargs,
184 signing_context['identity_cache'],
185 signing_context['cache_key'],
186 )
187 try:
188 auth = self.get_auth_instance(**kwargs)
189 except UnknownSignatureVersionError as e:
190 if signing_type != 'standard':
191 raise UnsupportedSignatureVersionError(
192 signature_version=signature_version
193 )
194 else:
195 raise e
197 auth.add_auth(request)
199 def _resolve_identity_cache(self, kwargs, cache, cache_key):
200 kwargs['identity_cache'] = cache
201 kwargs['cache_key'] = cache_key
203 def _choose_signer(self, operation_name, signing_type, context):
204 """
205 Allow setting the signature version via the choose-signer event.
206 A value of `botocore.UNSIGNED` means no signing will be performed.
208 :param operation_name: The operation to sign.
209 :param signing_type: The type of signing that the signer is to be used
210 for.
211 :return: The signature version to sign with.
212 """
213 signing_type_suffix_map = {
214 'presign-post': '-presign-post',
215 'presign-url': '-query',
216 }
217 suffix = signing_type_suffix_map.get(signing_type, '')
219 # operation specific signing context takes precedent over client-level
220 # defaults
221 signature_version = context.get('auth_type') or self._signature_version
222 signing = context.get('signing', {})
223 signing_name = signing.get('signing_name', self._signing_name)
224 region_name = signing.get('region', self._region_name)
225 if (
226 signature_version is not botocore.UNSIGNED
227 and not signature_version.endswith(suffix)
228 ):
229 signature_version += suffix
231 handler, response = self._event_emitter.emit_until_response(
232 f'choose-signer.{self._service_id.hyphenize()}.{operation_name}',
233 signing_name=signing_name,
234 region_name=region_name,
235 signature_version=signature_version,
236 context=context,
237 )
239 if response is not None:
240 signature_version = response
241 # The suffix needs to be checked again in case we get an improper
242 # signature version from choose-signer.
243 if (
244 signature_version is not botocore.UNSIGNED
245 and not signature_version.endswith(suffix)
246 ):
247 signature_version += suffix
249 return signature_version
251 def get_auth_instance(
252 self,
253 signing_name,
254 region_name,
255 signature_version=None,
256 request_credentials=None,
257 **kwargs,
258 ):
259 """
260 Get an auth instance which can be used to sign a request
261 using the given signature version.
263 :type signing_name: string
264 :param signing_name: Service signing name. This is usually the
265 same as the service name, but can differ. E.g.
266 ``emr`` vs. ``elasticmapreduce``.
268 :type region_name: string
269 :param region_name: Name of the service region, e.g. ``us-east-1``
271 :type signature_version: string
272 :param signature_version: Signature name like ``v4``.
274 :rtype: :py:class:`~botocore.auth.BaseSigner`
275 :return: Auth instance to sign a request.
276 """
277 if signature_version is None:
278 signature_version = self._signature_version
280 cls = botocore.auth.AUTH_TYPE_MAPS.get(signature_version)
281 if cls is None:
282 raise UnknownSignatureVersionError(
283 signature_version=signature_version
284 )
286 if cls.REQUIRES_TOKEN is True:
287 frozen_token = None
288 if self._auth_token is not None:
289 frozen_token = self._auth_token.get_frozen_token()
290 auth = cls(frozen_token)
291 return auth
293 credentials = request_credentials or self._credentials
294 if getattr(cls, "REQUIRES_IDENTITY_CACHE", None) is True:
295 cache = kwargs["identity_cache"]
296 key = kwargs["cache_key"]
297 credentials = cache.get_credentials(key)
298 del kwargs["cache_key"]
300 # If there's no credentials provided (i.e credentials is None),
301 # then we'll pass a value of "None" over to the auth classes,
302 # which already handle the cases where no credentials have
303 # been provided.
304 frozen_credentials = None
305 if credentials is not None:
306 frozen_credentials = credentials.get_frozen_credentials()
307 kwargs['credentials'] = frozen_credentials
308 if cls.REQUIRES_REGION:
309 if self._region_name is None:
310 raise botocore.exceptions.NoRegionError()
311 kwargs['region_name'] = region_name
312 kwargs['service_name'] = signing_name
313 auth = cls(**kwargs)
314 return auth
316 # Alias get_auth for backwards compatibility.
317 get_auth = get_auth_instance
319 def generate_presigned_url(
320 self,
321 request_dict,
322 operation_name,
323 expires_in=3600,
324 region_name=None,
325 signing_name=None,
326 ):
327 """Generates a presigned url
329 :type request_dict: dict
330 :param request_dict: The prepared request dictionary returned by
331 ``botocore.awsrequest.prepare_request_dict()``
333 :type operation_name: str
334 :param operation_name: The operation being signed.
336 :type expires_in: int
337 :param expires_in: The number of seconds the presigned url is valid
338 for. By default it expires in an hour (3600 seconds)
340 :type region_name: string
341 :param region_name: The region name to sign the presigned url.
343 :type signing_name: str
344 :param signing_name: The name to use for the service when signing.
346 :returns: The presigned url
347 """
348 request = create_request_object(request_dict)
349 self.sign(
350 operation_name,
351 request,
352 region_name,
353 'presign-url',
354 expires_in,
355 signing_name,
356 )
358 request.prepare()
359 return request.url
362class CloudFrontSigner:
363 '''A signer to create a signed CloudFront URL.
365 First you create a cloudfront signer based on a normalized RSA signer::
367 import rsa
368 def rsa_signer(message):
369 private_key = open('private_key.pem', 'r').read()
370 return rsa.sign(
371 message,
372 rsa.PrivateKey.load_pkcs1(private_key.encode('utf8')),
373 'SHA-1') # CloudFront requires SHA-1 hash
374 cf_signer = CloudFrontSigner(key_id, rsa_signer)
376 To sign with a canned policy::
378 signed_url = cf_signer.generate_signed_url(
379 url, date_less_than=datetime(2015, 12, 1))
381 To sign with a custom policy::
383 signed_url = cf_signer.generate_signed_url(url, policy=my_policy)
384 '''
386 def __init__(self, key_id, rsa_signer):
387 """Create a CloudFrontSigner.
389 :type key_id: str
390 :param key_id: The CloudFront Key Pair ID
392 :type rsa_signer: callable
393 :param rsa_signer: An RSA signer.
394 Its only input parameter will be the message to be signed,
395 and its output will be the signed content as a binary string.
396 The hash algorithm needed by CloudFront is SHA-1.
397 """
398 self.key_id = key_id
399 self.rsa_signer = rsa_signer
401 def generate_presigned_url(self, url, date_less_than=None, policy=None):
402 """Creates a signed CloudFront URL based on given parameters.
404 :type url: str
405 :param url: The URL of the protected object
407 :type date_less_than: datetime
408 :param date_less_than: The URL will expire after that date and time
410 :type policy: str
411 :param policy: The custom policy, possibly built by self.build_policy()
413 :rtype: str
414 :return: The signed URL.
415 """
416 both_args_supplied = date_less_than is not None and policy is not None
417 neither_arg_supplied = date_less_than is None and policy is None
418 if both_args_supplied or neither_arg_supplied:
419 e = 'Need to provide either date_less_than or policy, but not both'
420 raise ValueError(e)
421 if date_less_than is not None:
422 # We still need to build a canned policy for signing purpose
423 policy = self.build_policy(url, date_less_than)
424 if isinstance(policy, str):
425 policy = policy.encode('utf8')
426 if date_less_than is not None:
427 params = [f'Expires={int(datetime2timestamp(date_less_than))}']
428 else:
429 params = [f"Policy={self._url_b64encode(policy).decode('utf8')}"]
430 signature = self.rsa_signer(policy)
431 params.extend(
432 [
433 f"Signature={self._url_b64encode(signature).decode('utf8')}",
434 f"Key-Pair-Id={self.key_id}",
435 ]
436 )
437 return self._build_url(url, params)
439 def _build_url(self, base_url, extra_params):
440 separator = '&' if '?' in base_url else '?'
441 return base_url + separator + '&'.join(extra_params)
443 def build_policy(
444 self, resource, date_less_than, date_greater_than=None, ip_address=None
445 ):
446 """A helper to build policy.
448 :type resource: str
449 :param resource: The URL or the stream filename of the protected object
451 :type date_less_than: datetime
452 :param date_less_than: The URL will expire after the time has passed
454 :type date_greater_than: datetime
455 :param date_greater_than: The URL will not be valid until this time
457 :type ip_address: str
458 :param ip_address: Use 'x.x.x.x' for an IP, or 'x.x.x.x/x' for a subnet
460 :rtype: str
461 :return: The policy in a compact string.
462 """
463 # Note:
464 # 1. Order in canned policy is significant. Special care has been taken
465 # to ensure the output will match the order defined by the document.
466 # There is also a test case to ensure that order.
467 # SEE: http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-canned-policy.html#private-content-canned-policy-creating-policy-statement
468 # 2. Albeit the order in custom policy is not required by CloudFront,
469 # we still use OrderedDict internally to ensure the result is stable
470 # and also matches canned policy requirement.
471 # SEE: http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-custom-policy.html
472 moment = int(datetime2timestamp(date_less_than))
473 condition = OrderedDict({"DateLessThan": {"AWS:EpochTime": moment}})
474 if ip_address:
475 if '/' not in ip_address:
476 ip_address += '/32'
477 condition["IpAddress"] = {"AWS:SourceIp": ip_address}
478 if date_greater_than:
479 moment = int(datetime2timestamp(date_greater_than))
480 condition["DateGreaterThan"] = {"AWS:EpochTime": moment}
481 ordered_payload = [('Resource', resource), ('Condition', condition)]
482 custom_policy = {"Statement": [OrderedDict(ordered_payload)]}
483 return json.dumps(custom_policy, separators=(',', ':'))
485 def _url_b64encode(self, data):
486 # Required by CloudFront. See also:
487 # http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-linux-openssl.html
488 return (
489 base64.b64encode(data)
490 .replace(b'+', b'-')
491 .replace(b'=', b'_')
492 .replace(b'/', b'~')
493 )
496def add_generate_db_auth_token(class_attributes, **kwargs):
497 class_attributes['generate_db_auth_token'] = generate_db_auth_token
500def generate_db_auth_token(self, DBHostname, Port, DBUsername, Region=None):
501 """Generates an auth token used to connect to a db with IAM credentials.
503 :type DBHostname: str
504 :param DBHostname: The hostname of the database to connect to.
506 :type Port: int
507 :param Port: The port number the database is listening on.
509 :type DBUsername: str
510 :param DBUsername: The username to log in as.
512 :type Region: str
513 :param Region: The region the database is in. If None, the client
514 region will be used.
516 :return: A presigned url which can be used as an auth token.
517 """
518 region = Region
519 if region is None:
520 region = self.meta.region_name
522 params = {
523 'Action': 'connect',
524 'DBUser': DBUsername,
525 }
527 request_dict = {
528 'url_path': '/',
529 'query_string': '',
530 'headers': {},
531 'body': params,
532 'method': 'GET',
533 }
535 # RDS requires that the scheme not be set when sent over. This can cause
536 # issues when signing because the Python url parsing libraries follow
537 # RFC 1808 closely, which states that a netloc must be introduced by `//`.
538 # Otherwise the url is presumed to be relative, and thus the whole
539 # netloc would be treated as a path component. To work around this we
540 # introduce https here and remove it once we're done processing it.
541 scheme = 'https://'
542 endpoint_url = f'{scheme}{DBHostname}:{Port}'
543 prepare_request_dict(request_dict, endpoint_url)
544 presigned_url = self._request_signer.generate_presigned_url(
545 operation_name='connect',
546 request_dict=request_dict,
547 region_name=region,
548 expires_in=900,
549 signing_name='rds-db',
550 )
551 return presigned_url[len(scheme) :]
554class S3PostPresigner:
555 def __init__(self, request_signer):
556 self._request_signer = request_signer
558 def generate_presigned_post(
559 self,
560 request_dict,
561 fields=None,
562 conditions=None,
563 expires_in=3600,
564 region_name=None,
565 ):
566 """Generates the url and the form fields used for a presigned s3 post
568 :type request_dict: dict
569 :param request_dict: The prepared request dictionary returned by
570 ``botocore.awsrequest.prepare_request_dict()``
572 :type fields: dict
573 :param fields: A dictionary of prefilled form fields to build on top
574 of.
576 :type conditions: list
577 :param conditions: A list of conditions to include in the policy. Each
578 element can be either a list or a structure. For example:
579 [
580 {"acl": "public-read"},
581 {"bucket": "mybucket"},
582 ["starts-with", "$key", "mykey"]
583 ]
585 :type expires_in: int
586 :param expires_in: The number of seconds the presigned post is valid
587 for.
589 :type region_name: string
590 :param region_name: The region name to sign the presigned post to.
592 :rtype: dict
593 :returns: A dictionary with two elements: ``url`` and ``fields``.
594 Url is the url to post to. Fields is a dictionary filled with
595 the form fields and respective values to use when submitting the
596 post. For example:
598 {'url': 'https://mybucket.s3.amazonaws.com
599 'fields': {'acl': 'public-read',
600 'key': 'mykey',
601 'signature': 'mysignature',
602 'policy': 'mybase64 encoded policy'}
603 }
604 """
605 if fields is None:
606 fields = {}
608 if conditions is None:
609 conditions = []
611 # Create the policy for the post.
612 policy = {}
614 # Create an expiration date for the policy
615 datetime_now = datetime.datetime.utcnow()
616 expire_date = datetime_now + datetime.timedelta(seconds=expires_in)
617 policy['expiration'] = expire_date.strftime(botocore.auth.ISO8601)
619 # Append all of the conditions that the user supplied.
620 policy['conditions'] = []
621 for condition in conditions:
622 policy['conditions'].append(condition)
624 # Store the policy and the fields in the request for signing
625 request = create_request_object(request_dict)
626 request.context['s3-presign-post-fields'] = fields
627 request.context['s3-presign-post-policy'] = policy
629 self._request_signer.sign(
630 'PutObject', request, region_name, 'presign-post'
631 )
632 # Return the url and the fields for th form to post.
633 return {'url': request.url, 'fields': fields}
636def add_generate_presigned_url(class_attributes, **kwargs):
637 class_attributes['generate_presigned_url'] = generate_presigned_url
640def generate_presigned_url(
641 self, ClientMethod, Params=None, ExpiresIn=3600, HttpMethod=None
642):
643 """Generate a presigned url given a client, its method, and arguments
645 :type ClientMethod: string
646 :param ClientMethod: The client method to presign for
648 :type Params: dict
649 :param Params: The parameters normally passed to
650 ``ClientMethod``.
652 :type ExpiresIn: int
653 :param ExpiresIn: The number of seconds the presigned url is valid
654 for. By default it expires in an hour (3600 seconds)
656 :type HttpMethod: string
657 :param HttpMethod: The http method to use on the generated url. By
658 default, the http method is whatever is used in the method's model.
660 :returns: The presigned url
661 """
662 client_method = ClientMethod
663 params = Params
664 if params is None:
665 params = {}
666 expires_in = ExpiresIn
667 http_method = HttpMethod
668 context = {
669 'is_presign_request': True,
670 'use_global_endpoint': _should_use_global_endpoint(self),
671 }
673 request_signer = self._request_signer
675 try:
676 operation_name = self._PY_TO_OP_NAME[client_method]
677 except KeyError:
678 raise UnknownClientMethodError(method_name=client_method)
680 operation_model = self.meta.service_model.operation_model(operation_name)
681 params = self._emit_api_params(
682 api_params=params,
683 operation_model=operation_model,
684 context=context,
685 )
686 bucket_is_arn = ArnParser.is_arn(params.get('Bucket', ''))
687 (
688 endpoint_url,
689 additional_headers,
690 properties,
691 ) = self._resolve_endpoint_ruleset(
692 operation_model,
693 params,
694 context,
695 ignore_signing_region=(not bucket_is_arn),
696 )
698 request_dict = self._convert_to_request_dict(
699 api_params=params,
700 operation_model=operation_model,
701 endpoint_url=endpoint_url,
702 context=context,
703 headers=additional_headers,
704 set_user_agent_header=False,
705 )
707 # Switch out the http method if user specified it.
708 if http_method is not None:
709 request_dict['method'] = http_method
711 # Generate the presigned url.
712 return request_signer.generate_presigned_url(
713 request_dict=request_dict,
714 expires_in=expires_in,
715 operation_name=operation_name,
716 )
719def add_generate_presigned_post(class_attributes, **kwargs):
720 class_attributes['generate_presigned_post'] = generate_presigned_post
723def generate_presigned_post(
724 self, Bucket, Key, Fields=None, Conditions=None, ExpiresIn=3600
725):
726 """Builds the url and the form fields used for a presigned s3 post
728 :type Bucket: string
729 :param Bucket: The name of the bucket to presign the post to. Note that
730 bucket related conditions should not be included in the
731 ``conditions`` parameter.
733 :type Key: string
734 :param Key: Key name, optionally add ${filename} to the end to
735 attach the submitted filename. Note that key related conditions and
736 fields are filled out for you and should not be included in the
737 ``Fields`` or ``Conditions`` parameter.
739 :type Fields: dict
740 :param Fields: A dictionary of prefilled form fields to build on top
741 of. Elements that may be included are acl, Cache-Control,
742 Content-Type, Content-Disposition, Content-Encoding, Expires,
743 success_action_redirect, redirect, success_action_status,
744 and x-amz-meta-.
746 Note that if a particular element is included in the fields
747 dictionary it will not be automatically added to the conditions
748 list. You must specify a condition for the element as well.
750 :type Conditions: list
751 :param Conditions: A list of conditions to include in the policy. Each
752 element can be either a list or a structure. For example:
754 [
755 {"acl": "public-read"},
756 ["content-length-range", 2, 5],
757 ["starts-with", "$success_action_redirect", ""]
758 ]
760 Conditions that are included may pertain to acl,
761 content-length-range, Cache-Control, Content-Type,
762 Content-Disposition, Content-Encoding, Expires,
763 success_action_redirect, redirect, success_action_status,
764 and/or x-amz-meta-.
766 Note that if you include a condition, you must specify
767 the a valid value in the fields dictionary as well. A value will
768 not be added automatically to the fields dictionary based on the
769 conditions.
771 :type ExpiresIn: int
772 :param ExpiresIn: The number of seconds the presigned post
773 is valid for.
775 :rtype: dict
776 :returns: A dictionary with two elements: ``url`` and ``fields``.
777 Url is the url to post to. Fields is a dictionary filled with
778 the form fields and respective values to use when submitting the
779 post. For example:
781 {'url': 'https://mybucket.s3.amazonaws.com
782 'fields': {'acl': 'public-read',
783 'key': 'mykey',
784 'signature': 'mysignature',
785 'policy': 'mybase64 encoded policy'}
786 }
787 """
788 bucket = Bucket
789 key = Key
790 fields = Fields
791 conditions = Conditions
792 expires_in = ExpiresIn
794 if fields is None:
795 fields = {}
796 else:
797 fields = fields.copy()
799 if conditions is None:
800 conditions = []
802 context = {
803 'is_presign_request': True,
804 'use_global_endpoint': _should_use_global_endpoint(self),
805 }
807 post_presigner = S3PostPresigner(self._request_signer)
809 # We choose the CreateBucket operation model because its url gets
810 # serialized to what a presign post requires.
811 operation_model = self.meta.service_model.operation_model('CreateBucket')
812 params = self._emit_api_params(
813 api_params={'Bucket': bucket},
814 operation_model=operation_model,
815 context=context,
816 )
817 bucket_is_arn = ArnParser.is_arn(params.get('Bucket', ''))
818 (
819 endpoint_url,
820 additional_headers,
821 properties,
822 ) = self._resolve_endpoint_ruleset(
823 operation_model,
824 params,
825 context,
826 ignore_signing_region=(not bucket_is_arn),
827 )
829 request_dict = self._convert_to_request_dict(
830 api_params=params,
831 operation_model=operation_model,
832 endpoint_url=endpoint_url,
833 context=context,
834 headers=additional_headers,
835 set_user_agent_header=False,
836 )
838 # Append that the bucket name to the list of conditions.
839 conditions.append({'bucket': bucket})
841 # If the key ends with filename, the only constraint that can be
842 # imposed is if it starts with the specified prefix.
843 if key.endswith('${filename}'):
844 conditions.append(["starts-with", '$key', key[: -len('${filename}')]])
845 else:
846 conditions.append({'key': key})
848 # Add the key to the fields.
849 fields['key'] = key
851 return post_presigner.generate_presigned_post(
852 request_dict=request_dict,
853 fields=fields,
854 conditions=conditions,
855 expires_in=expires_in,
856 )
859def _should_use_global_endpoint(client):
860 if client.meta.partition != 'aws':
861 return False
862 s3_config = client.meta.config.s3
863 if s3_config:
864 if s3_config.get('use_dualstack_endpoint', False):
865 return False
866 if (
867 s3_config.get('us_east_1_regional_endpoint') == 'regional'
868 and client.meta.config.region_name == 'us-east-1'
869 ):
870 return False
871 if s3_config.get('addressing_style') == 'virtual':
872 return False
873 return True