Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/signers.py: 17%
242 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import base64
14import datetime
15import json
16import weakref
18import botocore
19import botocore.auth
20from botocore.awsrequest import create_request_object, prepare_request_dict
21from botocore.compat import OrderedDict
22from botocore.exceptions import (
23 UnknownClientMethodError,
24 UnknownSignatureVersionError,
25 UnsupportedSignatureVersionError,
26)
27from botocore.utils import ArnParser, datetime2timestamp
29# Keep these imported. There's pre-existing code that uses them.
30from botocore.utils import fix_s3_host # noqa
33class RequestSigner:
34 """
35 An object to sign requests before they go out over the wire using
36 one of the authentication mechanisms defined in ``auth.py``. This
37 class fires two events scoped to a service and operation name:
39 * choose-signer: Allows overriding the auth signer name.
40 * before-sign: Allows mutating the request before signing.
42 Together these events allow for customization of the request
43 signing pipeline, including overrides, request path manipulation,
44 and disabling signing per operation.
47 :type service_id: botocore.model.ServiceId
48 :param service_id: The service id for the service, e.g. ``S3``
50 :type region_name: string
51 :param region_name: Name of the service region, e.g. ``us-east-1``
53 :type signing_name: string
54 :param signing_name: Service signing name. This is usually the
55 same as the service name, but can differ. E.g.
56 ``emr`` vs. ``elasticmapreduce``.
58 :type signature_version: string
59 :param signature_version: Signature name like ``v4``.
61 :type credentials: :py:class:`~botocore.credentials.Credentials`
62 :param credentials: User credentials with which to sign requests.
64 :type event_emitter: :py:class:`~botocore.hooks.BaseEventHooks`
65 :param event_emitter: Extension mechanism to fire events.
66 """
68 def __init__(
69 self,
70 service_id,
71 region_name,
72 signing_name,
73 signature_version,
74 credentials,
75 event_emitter,
76 auth_token=None,
77 ):
78 self._region_name = region_name
79 self._signing_name = signing_name
80 self._signature_version = signature_version
81 self._credentials = credentials
82 self._auth_token = auth_token
83 self._service_id = service_id
85 # We need weakref to prevent leaking memory in Python 2.6 on Linux 2.6
86 self._event_emitter = weakref.proxy(event_emitter)
88 @property
89 def region_name(self):
90 return self._region_name
92 @property
93 def signature_version(self):
94 return self._signature_version
96 @property
97 def signing_name(self):
98 return self._signing_name
100 def handler(self, operation_name=None, request=None, **kwargs):
101 # This is typically hooked up to the "request-created" event
102 # from a client's event emitter. When a new request is created
103 # this method is invoked to sign the request.
104 # Don't call this method directly.
105 return self.sign(operation_name, request)
107 def sign(
108 self,
109 operation_name,
110 request,
111 region_name=None,
112 signing_type='standard',
113 expires_in=None,
114 signing_name=None,
115 ):
116 """Sign a request before it goes out over the wire.
118 :type operation_name: string
119 :param operation_name: The name of the current operation, e.g.
120 ``ListBuckets``.
121 :type request: AWSRequest
122 :param request: The request object to be sent over the wire.
124 :type region_name: str
125 :param region_name: The region to sign the request for.
127 :type signing_type: str
128 :param signing_type: The type of signing to perform. This can be one of
129 three possible values:
131 * 'standard' - This should be used for most requests.
132 * 'presign-url' - This should be used when pre-signing a request.
133 * 'presign-post' - This should be used when pre-signing an S3 post.
135 :type expires_in: int
136 :param expires_in: The number of seconds the presigned url is valid
137 for. This parameter is only valid for signing type 'presign-url'.
139 :type signing_name: str
140 :param signing_name: The name to use for the service when signing.
141 """
142 explicit_region_name = region_name
143 if region_name is None:
144 region_name = self._region_name
146 if signing_name is None:
147 signing_name = self._signing_name
149 signature_version = self._choose_signer(
150 operation_name, signing_type, request.context
151 )
153 # Allow mutating request before signing
154 self._event_emitter.emit(
155 'before-sign.{}.{}'.format(
156 self._service_id.hyphenize(), operation_name
157 ),
158 request=request,
159 signing_name=signing_name,
160 region_name=self._region_name,
161 signature_version=signature_version,
162 request_signer=self,
163 operation_name=operation_name,
164 )
166 if signature_version != botocore.UNSIGNED:
167 kwargs = {
168 'signing_name': signing_name,
169 'region_name': region_name,
170 'signature_version': signature_version,
171 }
172 if expires_in is not None:
173 kwargs['expires'] = expires_in
174 signing_context = request.context.get('signing', {})
175 if not explicit_region_name and signing_context.get('region'):
176 kwargs['region_name'] = signing_context['region']
177 if signing_context.get('signing_name'):
178 kwargs['signing_name'] = signing_context['signing_name']
179 if signing_context.get('identity_cache') is not None:
180 self._resolve_identity_cache(
181 kwargs,
182 signing_context['identity_cache'],
183 signing_context['cache_key'],
184 )
185 try:
186 auth = self.get_auth_instance(**kwargs)
187 except UnknownSignatureVersionError as e:
188 if signing_type != 'standard':
189 raise UnsupportedSignatureVersionError(
190 signature_version=signature_version
191 )
192 else:
193 raise e
195 auth.add_auth(request)
197 def _resolve_identity_cache(self, kwargs, cache, cache_key):
198 kwargs['identity_cache'] = cache
199 kwargs['cache_key'] = cache_key
201 def _choose_signer(self, operation_name, signing_type, context):
202 """
203 Allow setting the signature version via the choose-signer event.
204 A value of `botocore.UNSIGNED` means no signing will be performed.
206 :param operation_name: The operation to sign.
207 :param signing_type: The type of signing that the signer is to be used
208 for.
209 :return: The signature version to sign with.
210 """
211 signing_type_suffix_map = {
212 'presign-post': '-presign-post',
213 'presign-url': '-query',
214 }
215 suffix = signing_type_suffix_map.get(signing_type, '')
217 # operation specific signing context takes precedent over client-level
218 # defaults
219 signature_version = context.get('auth_type') or self._signature_version
220 signing = context.get('signing', {})
221 signing_name = signing.get('signing_name', self._signing_name)
222 region_name = signing.get('region', self._region_name)
223 if (
224 signature_version is not botocore.UNSIGNED
225 and not signature_version.endswith(suffix)
226 ):
227 signature_version += suffix
229 handler, response = self._event_emitter.emit_until_response(
230 'choose-signer.{}.{}'.format(
231 self._service_id.hyphenize(), operation_name
232 ),
233 signing_name=signing_name,
234 region_name=region_name,
235 signature_version=signature_version,
236 context=context,
237 )
239 if response is not None:
240 signature_version = response
241 # The suffix needs to be checked again in case we get an improper
242 # signature version from choose-signer.
243 if (
244 signature_version is not botocore.UNSIGNED
245 and not signature_version.endswith(suffix)
246 ):
247 signature_version += suffix
249 return signature_version
251 def get_auth_instance(
252 self, signing_name, region_name, signature_version=None, **kwargs
253 ):
254 """
255 Get an auth instance which can be used to sign a request
256 using the given signature version.
258 :type signing_name: string
259 :param signing_name: Service signing name. This is usually the
260 same as the service name, but can differ. E.g.
261 ``emr`` vs. ``elasticmapreduce``.
263 :type region_name: string
264 :param region_name: Name of the service region, e.g. ``us-east-1``
266 :type signature_version: string
267 :param signature_version: Signature name like ``v4``.
269 :rtype: :py:class:`~botocore.auth.BaseSigner`
270 :return: Auth instance to sign a request.
271 """
272 if signature_version is None:
273 signature_version = self._signature_version
275 cls = botocore.auth.AUTH_TYPE_MAPS.get(signature_version)
276 if cls is None:
277 raise UnknownSignatureVersionError(
278 signature_version=signature_version
279 )
281 if cls.REQUIRES_TOKEN is True:
282 frozen_token = None
283 if self._auth_token is not None:
284 frozen_token = self._auth_token.get_frozen_token()
285 auth = cls(frozen_token)
286 return auth
288 credentials = self._credentials
289 if getattr(cls, "REQUIRES_IDENTITY_CACHE", None) is True:
290 cache = kwargs["identity_cache"]
291 key = kwargs["cache_key"]
292 credentials = cache.get_credentials(key)
293 del kwargs["cache_key"]
295 # If there's no credentials provided (i.e credentials is None),
296 # then we'll pass a value of "None" over to the auth classes,
297 # which already handle the cases where no credentials have
298 # been provided.
299 frozen_credentials = None
300 if credentials is not None:
301 frozen_credentials = credentials.get_frozen_credentials()
302 kwargs['credentials'] = frozen_credentials
303 if cls.REQUIRES_REGION:
304 if self._region_name is None:
305 raise botocore.exceptions.NoRegionError()
306 kwargs['region_name'] = region_name
307 kwargs['service_name'] = signing_name
308 auth = cls(**kwargs)
309 return auth
311 # Alias get_auth for backwards compatibility.
312 get_auth = get_auth_instance
314 def generate_presigned_url(
315 self,
316 request_dict,
317 operation_name,
318 expires_in=3600,
319 region_name=None,
320 signing_name=None,
321 ):
322 """Generates a presigned url
324 :type request_dict: dict
325 :param request_dict: The prepared request dictionary returned by
326 ``botocore.awsrequest.prepare_request_dict()``
328 :type operation_name: str
329 :param operation_name: The operation being signed.
331 :type expires_in: int
332 :param expires_in: The number of seconds the presigned url is valid
333 for. By default it expires in an hour (3600 seconds)
335 :type region_name: string
336 :param region_name: The region name to sign the presigned url.
338 :type signing_name: str
339 :param signing_name: The name to use for the service when signing.
341 :returns: The presigned url
342 """
343 request = create_request_object(request_dict)
344 self.sign(
345 operation_name,
346 request,
347 region_name,
348 'presign-url',
349 expires_in,
350 signing_name,
351 )
353 request.prepare()
354 return request.url
357class CloudFrontSigner:
358 '''A signer to create a signed CloudFront URL.
360 First you create a cloudfront signer based on a normalized RSA signer::
362 import rsa
363 def rsa_signer(message):
364 private_key = open('private_key.pem', 'r').read()
365 return rsa.sign(
366 message,
367 rsa.PrivateKey.load_pkcs1(private_key.encode('utf8')),
368 'SHA-1') # CloudFront requires SHA-1 hash
369 cf_signer = CloudFrontSigner(key_id, rsa_signer)
371 To sign with a canned policy::
373 signed_url = cf_signer.generate_signed_url(
374 url, date_less_than=datetime(2015, 12, 1))
376 To sign with a custom policy::
378 signed_url = cf_signer.generate_signed_url(url, policy=my_policy)
379 '''
381 def __init__(self, key_id, rsa_signer):
382 """Create a CloudFrontSigner.
384 :type key_id: str
385 :param key_id: The CloudFront Key Pair ID
387 :type rsa_signer: callable
388 :param rsa_signer: An RSA signer.
389 Its only input parameter will be the message to be signed,
390 and its output will be the signed content as a binary string.
391 The hash algorithm needed by CloudFront is SHA-1.
392 """
393 self.key_id = key_id
394 self.rsa_signer = rsa_signer
396 def generate_presigned_url(self, url, date_less_than=None, policy=None):
397 """Creates a signed CloudFront URL based on given parameters.
399 :type url: str
400 :param url: The URL of the protected object
402 :type date_less_than: datetime
403 :param date_less_than: The URL will expire after that date and time
405 :type policy: str
406 :param policy: The custom policy, possibly built by self.build_policy()
408 :rtype: str
409 :return: The signed URL.
410 """
411 both_args_supplied = date_less_than is not None and policy is not None
412 neither_arg_supplied = date_less_than is None and policy is None
413 if both_args_supplied or neither_arg_supplied:
414 e = 'Need to provide either date_less_than or policy, but not both'
415 raise ValueError(e)
416 if date_less_than is not None:
417 # We still need to build a canned policy for signing purpose
418 policy = self.build_policy(url, date_less_than)
419 if isinstance(policy, str):
420 policy = policy.encode('utf8')
421 if date_less_than is not None:
422 params = ['Expires=%s' % int(datetime2timestamp(date_less_than))]
423 else:
424 params = ['Policy=%s' % self._url_b64encode(policy).decode('utf8')]
425 signature = self.rsa_signer(policy)
426 params.extend(
427 [
428 f"Signature={self._url_b64encode(signature).decode('utf8')}",
429 f"Key-Pair-Id={self.key_id}",
430 ]
431 )
432 return self._build_url(url, params)
434 def _build_url(self, base_url, extra_params):
435 separator = '&' if '?' in base_url else '?'
436 return base_url + separator + '&'.join(extra_params)
438 def build_policy(
439 self, resource, date_less_than, date_greater_than=None, ip_address=None
440 ):
441 """A helper to build policy.
443 :type resource: str
444 :param resource: The URL or the stream filename of the protected object
446 :type date_less_than: datetime
447 :param date_less_than: The URL will expire after the time has passed
449 :type date_greater_than: datetime
450 :param date_greater_than: The URL will not be valid until this time
452 :type ip_address: str
453 :param ip_address: Use 'x.x.x.x' for an IP, or 'x.x.x.x/x' for a subnet
455 :rtype: str
456 :return: The policy in a compact string.
457 """
458 # Note:
459 # 1. Order in canned policy is significant. Special care has been taken
460 # to ensure the output will match the order defined by the document.
461 # There is also a test case to ensure that order.
462 # SEE: http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-canned-policy.html#private-content-canned-policy-creating-policy-statement
463 # 2. Albeit the order in custom policy is not required by CloudFront,
464 # we still use OrderedDict internally to ensure the result is stable
465 # and also matches canned policy requirement.
466 # SEE: http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-custom-policy.html
467 moment = int(datetime2timestamp(date_less_than))
468 condition = OrderedDict({"DateLessThan": {"AWS:EpochTime": moment}})
469 if ip_address:
470 if '/' not in ip_address:
471 ip_address += '/32'
472 condition["IpAddress"] = {"AWS:SourceIp": ip_address}
473 if date_greater_than:
474 moment = int(datetime2timestamp(date_greater_than))
475 condition["DateGreaterThan"] = {"AWS:EpochTime": moment}
476 ordered_payload = [('Resource', resource), ('Condition', condition)]
477 custom_policy = {"Statement": [OrderedDict(ordered_payload)]}
478 return json.dumps(custom_policy, separators=(',', ':'))
480 def _url_b64encode(self, data):
481 # Required by CloudFront. See also:
482 # http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-linux-openssl.html
483 return (
484 base64.b64encode(data)
485 .replace(b'+', b'-')
486 .replace(b'=', b'_')
487 .replace(b'/', b'~')
488 )
491def add_generate_db_auth_token(class_attributes, **kwargs):
492 class_attributes['generate_db_auth_token'] = generate_db_auth_token
495def generate_db_auth_token(self, DBHostname, Port, DBUsername, Region=None):
496 """Generates an auth token used to connect to a db with IAM credentials.
498 :type DBHostname: str
499 :param DBHostname: The hostname of the database to connect to.
501 :type Port: int
502 :param Port: The port number the database is listening on.
504 :type DBUsername: str
505 :param DBUsername: The username to log in as.
507 :type Region: str
508 :param Region: The region the database is in. If None, the client
509 region will be used.
511 :return: A presigned url which can be used as an auth token.
512 """
513 region = Region
514 if region is None:
515 region = self.meta.region_name
517 params = {
518 'Action': 'connect',
519 'DBUser': DBUsername,
520 }
522 request_dict = {
523 'url_path': '/',
524 'query_string': '',
525 'headers': {},
526 'body': params,
527 'method': 'GET',
528 }
530 # RDS requires that the scheme not be set when sent over. This can cause
531 # issues when signing because the Python url parsing libraries follow
532 # RFC 1808 closely, which states that a netloc must be introduced by `//`.
533 # Otherwise the url is presumed to be relative, and thus the whole
534 # netloc would be treated as a path component. To work around this we
535 # introduce https here and remove it once we're done processing it.
536 scheme = 'https://'
537 endpoint_url = f'{scheme}{DBHostname}:{Port}'
538 prepare_request_dict(request_dict, endpoint_url)
539 presigned_url = self._request_signer.generate_presigned_url(
540 operation_name='connect',
541 request_dict=request_dict,
542 region_name=region,
543 expires_in=900,
544 signing_name='rds-db',
545 )
546 return presigned_url[len(scheme) :]
549class S3PostPresigner:
550 def __init__(self, request_signer):
551 self._request_signer = request_signer
553 def generate_presigned_post(
554 self,
555 request_dict,
556 fields=None,
557 conditions=None,
558 expires_in=3600,
559 region_name=None,
560 ):
561 """Generates the url and the form fields used for a presigned s3 post
563 :type request_dict: dict
564 :param request_dict: The prepared request dictionary returned by
565 ``botocore.awsrequest.prepare_request_dict()``
567 :type fields: dict
568 :param fields: A dictionary of prefilled form fields to build on top
569 of.
571 :type conditions: list
572 :param conditions: A list of conditions to include in the policy. Each
573 element can be either a list or a structure. For example:
574 [
575 {"acl": "public-read"},
576 {"bucket": "mybucket"},
577 ["starts-with", "$key", "mykey"]
578 ]
580 :type expires_in: int
581 :param expires_in: The number of seconds the presigned post is valid
582 for.
584 :type region_name: string
585 :param region_name: The region name to sign the presigned post to.
587 :rtype: dict
588 :returns: A dictionary with two elements: ``url`` and ``fields``.
589 Url is the url to post to. Fields is a dictionary filled with
590 the form fields and respective values to use when submitting the
591 post. For example:
593 {'url': 'https://mybucket.s3.amazonaws.com
594 'fields': {'acl': 'public-read',
595 'key': 'mykey',
596 'signature': 'mysignature',
597 'policy': 'mybase64 encoded policy'}
598 }
599 """
600 if fields is None:
601 fields = {}
603 if conditions is None:
604 conditions = []
606 # Create the policy for the post.
607 policy = {}
609 # Create an expiration date for the policy
610 datetime_now = datetime.datetime.utcnow()
611 expire_date = datetime_now + datetime.timedelta(seconds=expires_in)
612 policy['expiration'] = expire_date.strftime(botocore.auth.ISO8601)
614 # Append all of the conditions that the user supplied.
615 policy['conditions'] = []
616 for condition in conditions:
617 policy['conditions'].append(condition)
619 # Store the policy and the fields in the request for signing
620 request = create_request_object(request_dict)
621 request.context['s3-presign-post-fields'] = fields
622 request.context['s3-presign-post-policy'] = policy
624 self._request_signer.sign(
625 'PutObject', request, region_name, 'presign-post'
626 )
627 # Return the url and the fields for th form to post.
628 return {'url': request.url, 'fields': fields}
631def add_generate_presigned_url(class_attributes, **kwargs):
632 class_attributes['generate_presigned_url'] = generate_presigned_url
635def generate_presigned_url(
636 self, ClientMethod, Params=None, ExpiresIn=3600, HttpMethod=None
637):
638 """Generate a presigned url given a client, its method, and arguments
640 :type ClientMethod: string
641 :param ClientMethod: The client method to presign for
643 :type Params: dict
644 :param Params: The parameters normally passed to
645 ``ClientMethod``.
647 :type ExpiresIn: int
648 :param ExpiresIn: The number of seconds the presigned url is valid
649 for. By default it expires in an hour (3600 seconds)
651 :type HttpMethod: string
652 :param HttpMethod: The http method to use on the generated url. By
653 default, the http method is whatever is used in the method's model.
655 :returns: The presigned url
656 """
657 client_method = ClientMethod
658 params = Params
659 if params is None:
660 params = {}
661 expires_in = ExpiresIn
662 http_method = HttpMethod
663 context = {
664 'is_presign_request': True,
665 'use_global_endpoint': _should_use_global_endpoint(self),
666 }
668 request_signer = self._request_signer
670 try:
671 operation_name = self._PY_TO_OP_NAME[client_method]
672 except KeyError:
673 raise UnknownClientMethodError(method_name=client_method)
675 operation_model = self.meta.service_model.operation_model(operation_name)
676 params = self._emit_api_params(
677 api_params=params,
678 operation_model=operation_model,
679 context=context,
680 )
681 bucket_is_arn = ArnParser.is_arn(params.get('Bucket', ''))
682 (
683 endpoint_url,
684 additional_headers,
685 properties,
686 ) = self._resolve_endpoint_ruleset(
687 operation_model,
688 params,
689 context,
690 ignore_signing_region=(not bucket_is_arn),
691 )
693 request_dict = self._convert_to_request_dict(
694 api_params=params,
695 operation_model=operation_model,
696 endpoint_url=endpoint_url,
697 context=context,
698 headers=additional_headers,
699 set_user_agent_header=False,
700 )
702 # Switch out the http method if user specified it.
703 if http_method is not None:
704 request_dict['method'] = http_method
706 # Generate the presigned url.
707 return request_signer.generate_presigned_url(
708 request_dict=request_dict,
709 expires_in=expires_in,
710 operation_name=operation_name,
711 )
714def add_generate_presigned_post(class_attributes, **kwargs):
715 class_attributes['generate_presigned_post'] = generate_presigned_post
718def generate_presigned_post(
719 self, Bucket, Key, Fields=None, Conditions=None, ExpiresIn=3600
720):
721 """Builds the url and the form fields used for a presigned s3 post
723 :type Bucket: string
724 :param Bucket: The name of the bucket to presign the post to. Note that
725 bucket related conditions should not be included in the
726 ``conditions`` parameter.
728 :type Key: string
729 :param Key: Key name, optionally add ${filename} to the end to
730 attach the submitted filename. Note that key related conditions and
731 fields are filled out for you and should not be included in the
732 ``Fields`` or ``Conditions`` parameter.
734 :type Fields: dict
735 :param Fields: A dictionary of prefilled form fields to build on top
736 of. Elements that may be included are acl, Cache-Control,
737 Content-Type, Content-Disposition, Content-Encoding, Expires,
738 success_action_redirect, redirect, success_action_status,
739 and x-amz-meta-.
741 Note that if a particular element is included in the fields
742 dictionary it will not be automatically added to the conditions
743 list. You must specify a condition for the element as well.
745 :type Conditions: list
746 :param Conditions: A list of conditions to include in the policy. Each
747 element can be either a list or a structure. For example:
749 [
750 {"acl": "public-read"},
751 ["content-length-range", 2, 5],
752 ["starts-with", "$success_action_redirect", ""]
753 ]
755 Conditions that are included may pertain to acl,
756 content-length-range, Cache-Control, Content-Type,
757 Content-Disposition, Content-Encoding, Expires,
758 success_action_redirect, redirect, success_action_status,
759 and/or x-amz-meta-.
761 Note that if you include a condition, you must specify
762 the a valid value in the fields dictionary as well. A value will
763 not be added automatically to the fields dictionary based on the
764 conditions.
766 :type ExpiresIn: int
767 :param ExpiresIn: The number of seconds the presigned post
768 is valid for.
770 :rtype: dict
771 :returns: A dictionary with two elements: ``url`` and ``fields``.
772 Url is the url to post to. Fields is a dictionary filled with
773 the form fields and respective values to use when submitting the
774 post. For example:
776 {'url': 'https://mybucket.s3.amazonaws.com
777 'fields': {'acl': 'public-read',
778 'key': 'mykey',
779 'signature': 'mysignature',
780 'policy': 'mybase64 encoded policy'}
781 }
782 """
783 bucket = Bucket
784 key = Key
785 fields = Fields
786 conditions = Conditions
787 expires_in = ExpiresIn
789 if fields is None:
790 fields = {}
791 else:
792 fields = fields.copy()
794 if conditions is None:
795 conditions = []
797 context = {
798 'is_presign_request': True,
799 'use_global_endpoint': _should_use_global_endpoint(self),
800 }
802 post_presigner = S3PostPresigner(self._request_signer)
804 # We choose the CreateBucket operation model because its url gets
805 # serialized to what a presign post requires.
806 operation_model = self.meta.service_model.operation_model('CreateBucket')
807 params = self._emit_api_params(
808 api_params={'Bucket': bucket},
809 operation_model=operation_model,
810 context=context,
811 )
812 bucket_is_arn = ArnParser.is_arn(params.get('Bucket', ''))
813 (
814 endpoint_url,
815 additional_headers,
816 properties,
817 ) = self._resolve_endpoint_ruleset(
818 operation_model,
819 params,
820 context,
821 ignore_signing_region=(not bucket_is_arn),
822 )
824 request_dict = self._convert_to_request_dict(
825 api_params=params,
826 operation_model=operation_model,
827 endpoint_url=endpoint_url,
828 context=context,
829 headers=additional_headers,
830 set_user_agent_header=False,
831 )
833 # Append that the bucket name to the list of conditions.
834 conditions.append({'bucket': bucket})
836 # If the key ends with filename, the only constraint that can be
837 # imposed is if it starts with the specified prefix.
838 if key.endswith('${filename}'):
839 conditions.append(["starts-with", '$key', key[: -len('${filename}')]])
840 else:
841 conditions.append({'key': key})
843 # Add the key to the fields.
844 fields['key'] = key
846 return post_presigner.generate_presigned_post(
847 request_dict=request_dict,
848 fields=fields,
849 conditions=conditions,
850 expires_in=expires_in,
851 )
854def _should_use_global_endpoint(client):
855 if client.meta.partition != 'aws':
856 return False
857 s3_config = client.meta.config.s3
858 if s3_config:
859 if s3_config.get('use_dualstack_endpoint', False):
860 return False
861 if (
862 s3_config.get('us_east_1_regional_endpoint') == 'regional'
863 and client.meta.config.region_name == 'us-east-1'
864 ):
865 return False
866 if s3_config.get('addressing_style') == 'virtual':
867 return False
868 return True