Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/auth.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

616 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import base64 

15import calendar 

16import datetime 

17import functools 

18import hmac 

19import json 

20import logging 

21import time 

22from collections.abc import Mapping 

23from email.utils import formatdate 

24from hashlib import sha1, sha256 

25from operator import itemgetter 

26 

27from botocore.compat import ( 

28 HAS_CRT, 

29 MD5_AVAILABLE, # noqa: F401 

30 HTTPHeaders, 

31 encodebytes, 

32 ensure_unicode, 

33 get_current_datetime, 

34 parse_qs, 

35 quote, 

36 unquote, 

37 urlsplit, 

38 urlunsplit, 

39) 

40from botocore.exceptions import ( 

41 NoAuthTokenError, 

42 NoCredentialsError, 

43 UnknownSignatureVersionError, 

44 UnsupportedSignatureVersionError, 

45) 

46from botocore.utils import ( 

47 is_valid_ipv6_endpoint_url, 

48 normalize_url_path, 

49 percent_encode_sequence, 

50) 

51 

52logger = logging.getLogger(__name__) 

53 

54 

55EMPTY_SHA256_HASH = ( 

56 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' 

57) 

58# This is the buffer size used when calculating sha256 checksums. 

59# Experimenting with various buffer sizes showed that this value generally 

60# gave the best result (in terms of performance). 

61PAYLOAD_BUFFER = 1024 * 1024 

62ISO8601 = '%Y-%m-%dT%H:%M:%SZ' 

63SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ' 

64SIGNED_HEADERS_BLACKLIST = [ 

65 'expect', 

66 'transfer-encoding', 

67 'user-agent', 

68 'x-amzn-trace-id', 

69] 

70UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD' 

71STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER' 

72 

73 

74def _host_from_url(url): 

75 # Given URL, derive value for host header. Ensure that value: 

76 # 1) is lowercase 

77 # 2) excludes port, if it was the default port 

78 # 3) excludes userinfo 

79 url_parts = urlsplit(url) 

80 host = url_parts.hostname # urlsplit's hostname is always lowercase 

81 if is_valid_ipv6_endpoint_url(url): 

82 host = f'[{host}]' 

83 default_ports = { 

84 'http': 80, 

85 'https': 443, 

86 } 

87 if url_parts.port is not None: 

88 if url_parts.port != default_ports.get(url_parts.scheme): 

89 host = f'{host}:{url_parts.port}' 

90 return host 

91 

92 

93def _get_body_as_dict(request): 

94 # For query services, request.data is form-encoded and is already a 

95 # dict, but for other services such as rest-json it could be a json 

96 # string or bytes. In those cases we attempt to load the data as a 

97 # dict. 

98 data = request.data 

99 if isinstance(data, bytes): 

100 data = json.loads(data.decode('utf-8')) 

101 elif isinstance(data, str): 

102 data = json.loads(data) 

103 return data 

104 

105 

106class BaseSigner: 

107 REQUIRES_REGION = False 

108 REQUIRES_TOKEN = False 

109 

110 def add_auth(self, request): 

111 raise NotImplementedError("add_auth") 

112 

113 

114class TokenSigner(BaseSigner): 

115 REQUIRES_TOKEN = True 

116 """ 

117 Signers that expect an authorization token to perform the authorization 

118 """ 

119 

120 def __init__(self, auth_token): 

121 self.auth_token = auth_token 

122 

123 

124class SigV2Auth(BaseSigner): 

125 """ 

126 Sign a request with Signature V2. 

127 """ 

128 

129 def __init__(self, credentials): 

130 self.credentials = credentials 

131 

132 def calc_signature(self, request, params): 

133 logger.debug("Calculating signature using v2 auth.") 

134 split = urlsplit(request.url) 

135 path = split.path 

136 if len(path) == 0: 

137 path = '/' 

138 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n" 

139 lhmac = hmac.new( 

140 self.credentials.secret_key.encode("utf-8"), digestmod=sha256 

141 ) 

142 pairs = [] 

143 for key in sorted(params): 

144 # Any previous signature should not be a part of this 

145 # one, so we skip that particular key. This prevents 

146 # issues during retries. 

147 if key == 'Signature': 

148 continue 

149 value = str(params[key]) 

150 quoted_key = quote(key.encode('utf-8'), safe='') 

151 quoted_value = quote(value.encode('utf-8'), safe='-_~') 

152 pairs.append(f'{quoted_key}={quoted_value}') 

153 qs = '&'.join(pairs) 

154 string_to_sign += qs 

155 logger.debug('String to sign: %s', string_to_sign) 

156 lhmac.update(string_to_sign.encode('utf-8')) 

157 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8') 

158 return (qs, b64) 

159 

160 def add_auth(self, request): 

161 # The auth handler is the last thing called in the 

162 # preparation phase of a prepared request. 

163 # Because of this we have to parse the query params 

164 # from the request body so we can update them with 

165 # the sigv2 auth params. 

166 if self.credentials is None: 

167 raise NoCredentialsError() 

168 if request.data: 

169 # POST 

170 params = request.data 

171 else: 

172 # GET 

173 params = request.params 

174 params['AWSAccessKeyId'] = self.credentials.access_key 

175 params['SignatureVersion'] = '2' 

176 params['SignatureMethod'] = 'HmacSHA256' 

177 params['Timestamp'] = time.strftime(ISO8601, time.gmtime()) 

178 if self.credentials.token: 

179 params['SecurityToken'] = self.credentials.token 

180 qs, signature = self.calc_signature(request, params) 

181 params['Signature'] = signature 

182 return request 

183 

184 

185class SigV3Auth(BaseSigner): 

186 def __init__(self, credentials): 

187 self.credentials = credentials 

188 

189 def add_auth(self, request): 

190 if self.credentials is None: 

191 raise NoCredentialsError() 

192 if 'Date' in request.headers: 

193 del request.headers['Date'] 

194 request.headers['Date'] = formatdate(usegmt=True) 

195 if self.credentials.token: 

196 if 'X-Amz-Security-Token' in request.headers: 

197 del request.headers['X-Amz-Security-Token'] 

198 request.headers['X-Amz-Security-Token'] = self.credentials.token 

199 new_hmac = hmac.new( 

200 self.credentials.secret_key.encode('utf-8'), digestmod=sha256 

201 ) 

202 new_hmac.update(request.headers['Date'].encode('utf-8')) 

203 encoded_signature = encodebytes(new_hmac.digest()).strip() 

204 signature = ( 

205 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key}," 

206 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}" 

207 ) 

208 if 'X-Amzn-Authorization' in request.headers: 

209 del request.headers['X-Amzn-Authorization'] 

210 request.headers['X-Amzn-Authorization'] = signature 

211 

212 

213class SigV4Auth(BaseSigner): 

214 """ 

215 Sign a request with Signature V4. 

216 """ 

217 

218 REQUIRES_REGION = True 

219 

220 def __init__(self, credentials, service_name, region_name): 

221 self.credentials = credentials 

222 # We initialize these value here so the unit tests can have 

223 # valid values. But these will get overriden in ``add_auth`` 

224 # later for real requests. 

225 self._region_name = region_name 

226 self._service_name = service_name 

227 

228 def _sign(self, key, msg, hex=False): 

229 if hex: 

230 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest() 

231 else: 

232 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest() 

233 return sig 

234 

235 def headers_to_sign(self, request): 

236 """ 

237 Select the headers from the request that need to be included 

238 in the StringToSign. 

239 """ 

240 header_map = HTTPHeaders() 

241 for name, value in request.headers.items(): 

242 lname = name.lower() 

243 if lname not in SIGNED_HEADERS_BLACKLIST: 

244 header_map[lname] = value 

245 if 'host' not in header_map: 

246 # TODO: We should set the host ourselves, instead of relying on our 

247 # HTTP client to set it for us. 

248 header_map['host'] = _host_from_url(request.url) 

249 return header_map 

250 

251 def canonical_query_string(self, request): 

252 # The query string can come from two parts. One is the 

253 # params attribute of the request. The other is from the request 

254 # url (in which case we have to re-split the url into its components 

255 # and parse out the query string component). 

256 if request.params: 

257 return self._canonical_query_string_params(request.params) 

258 else: 

259 return self._canonical_query_string_url(urlsplit(request.url)) 

260 

261 def _canonical_query_string_params(self, params): 

262 # [(key, value), (key2, value2)] 

263 key_val_pairs = [] 

264 if isinstance(params, Mapping): 

265 params = params.items() 

266 for key, value in params: 

267 key_val_pairs.append( 

268 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~')) 

269 ) 

270 sorted_key_vals = [] 

271 # Sort by the URI-encoded key names, and in the case of 

272 # repeated keys, then sort by the value. 

273 for key, value in sorted(key_val_pairs): 

274 sorted_key_vals.append(f'{key}={value}') 

275 canonical_query_string = '&'.join(sorted_key_vals) 

276 return canonical_query_string 

277 

278 def _canonical_query_string_url(self, parts): 

279 canonical_query_string = '' 

280 if parts.query: 

281 # [(key, value), (key2, value2)] 

282 key_val_pairs = [] 

283 for pair in parts.query.split('&'): 

284 key, _, value = pair.partition('=') 

285 key_val_pairs.append((key, value)) 

286 sorted_key_vals = [] 

287 # Sort by the URI-encoded key names, and in the case of 

288 # repeated keys, then sort by the value. 

289 for key, value in sorted(key_val_pairs): 

290 sorted_key_vals.append(f'{key}={value}') 

291 canonical_query_string = '&'.join(sorted_key_vals) 

292 return canonical_query_string 

293 

294 def canonical_headers(self, headers_to_sign): 

295 """ 

296 Return the headers that need to be included in the StringToSign 

297 in their canonical form by converting all header keys to lower 

298 case, sorting them in alphabetical order and then joining 

299 them into a string, separated by newlines. 

300 """ 

301 headers = [] 

302 sorted_header_names = sorted(set(headers_to_sign)) 

303 for key in sorted_header_names: 

304 value = ','.join( 

305 self._header_value(v) for v in headers_to_sign.get_all(key) 

306 ) 

307 headers.append(f'{key}:{ensure_unicode(value)}') 

308 return '\n'.join(headers) 

309 

310 def _header_value(self, value): 

311 # From the sigv4 docs: 

312 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue) 

313 # 

314 # The Trimall function removes excess white space before and after 

315 # values, and converts sequential spaces to a single space. 

316 return ' '.join(value.split()) 

317 

318 def signed_headers(self, headers_to_sign): 

319 headers = sorted(n.lower().strip() for n in set(headers_to_sign)) 

320 return ';'.join(headers) 

321 

322 def _is_streaming_checksum_payload(self, request): 

323 checksum_context = request.context.get('checksum', {}) 

324 algorithm = checksum_context.get('request_algorithm') 

325 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer' 

326 

327 def payload(self, request): 

328 if self._is_streaming_checksum_payload(request): 

329 return STREAMING_UNSIGNED_PAYLOAD_TRAILER 

330 elif not self._should_sha256_sign_payload(request): 

331 # When payload signing is disabled, we use this static string in 

332 # place of the payload checksum. 

333 return UNSIGNED_PAYLOAD 

334 request_body = request.body 

335 if request_body and hasattr(request_body, 'seek'): 

336 position = request_body.tell() 

337 read_chunksize = functools.partial( 

338 request_body.read, PAYLOAD_BUFFER 

339 ) 

340 checksum = sha256() 

341 for chunk in iter(read_chunksize, b''): 

342 checksum.update(chunk) 

343 hex_checksum = checksum.hexdigest() 

344 request_body.seek(position) 

345 return hex_checksum 

346 elif request_body: 

347 # The request serialization has ensured that 

348 # request.body is a bytes() type. 

349 return sha256(request_body).hexdigest() 

350 else: 

351 return EMPTY_SHA256_HASH 

352 

353 def _should_sha256_sign_payload(self, request): 

354 # Payloads will always be signed over insecure connections. 

355 if not request.url.startswith('https'): 

356 return True 

357 

358 # Certain operations may have payload signing disabled by default. 

359 # Since we don't have access to the operation model, we pass in this 

360 # bit of metadata through the request context. 

361 return request.context.get('payload_signing_enabled', True) 

362 

363 def canonical_request(self, request): 

364 cr = [request.method.upper()] 

365 path = self._normalize_url_path(urlsplit(request.url).path) 

366 cr.append(path) 

367 cr.append(self.canonical_query_string(request)) 

368 headers_to_sign = self.headers_to_sign(request) 

369 cr.append(self.canonical_headers(headers_to_sign) + '\n') 

370 cr.append(self.signed_headers(headers_to_sign)) 

371 if 'X-Amz-Content-SHA256' in request.headers: 

372 body_checksum = request.headers['X-Amz-Content-SHA256'] 

373 else: 

374 body_checksum = self.payload(request) 

375 cr.append(body_checksum) 

376 return '\n'.join(cr) 

377 

378 def _normalize_url_path(self, path): 

379 normalized_path = quote(normalize_url_path(path), safe='/~') 

380 return normalized_path 

381 

382 def scope(self, request): 

383 scope = [self.credentials.access_key] 

384 scope.append(request.context['timestamp'][0:8]) 

385 scope.append(self._region_name) 

386 scope.append(self._service_name) 

387 scope.append('aws4_request') 

388 return '/'.join(scope) 

389 

390 def credential_scope(self, request): 

391 scope = [] 

392 scope.append(request.context['timestamp'][0:8]) 

393 scope.append(self._region_name) 

394 scope.append(self._service_name) 

395 scope.append('aws4_request') 

396 return '/'.join(scope) 

397 

398 def string_to_sign(self, request, canonical_request): 

399 """ 

400 Return the canonical StringToSign as well as a dict 

401 containing the original version of all headers that 

402 were included in the StringToSign. 

403 """ 

404 sts = ['AWS4-HMAC-SHA256'] 

405 sts.append(request.context['timestamp']) 

406 sts.append(self.credential_scope(request)) 

407 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest()) 

408 return '\n'.join(sts) 

409 

410 def signature(self, string_to_sign, request): 

411 key = self.credentials.secret_key 

412 k_date = self._sign( 

413 (f"AWS4{key}").encode(), request.context["timestamp"][0:8] 

414 ) 

415 k_region = self._sign(k_date, self._region_name) 

416 k_service = self._sign(k_region, self._service_name) 

417 k_signing = self._sign(k_service, 'aws4_request') 

418 return self._sign(k_signing, string_to_sign, hex=True) 

419 

420 def add_auth(self, request): 

421 if self.credentials is None: 

422 raise NoCredentialsError() 

423 datetime_now = get_current_datetime() 

424 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

425 # This could be a retry. Make sure the previous 

426 # authorization header is removed first. 

427 self._modify_request_before_signing(request) 

428 canonical_request = self.canonical_request(request) 

429 logger.debug("Calculating signature using v4 auth.") 

430 logger.debug('CanonicalRequest:\n%s', canonical_request) 

431 string_to_sign = self.string_to_sign(request, canonical_request) 

432 logger.debug('StringToSign:\n%s', string_to_sign) 

433 signature = self.signature(string_to_sign, request) 

434 logger.debug('Signature:\n%s', signature) 

435 

436 self._inject_signature_to_request(request, signature) 

437 

438 def _inject_signature_to_request(self, request, signature): 

439 auth_str = [f'AWS4-HMAC-SHA256 Credential={self.scope(request)}'] 

440 headers_to_sign = self.headers_to_sign(request) 

441 auth_str.append( 

442 f"SignedHeaders={self.signed_headers(headers_to_sign)}" 

443 ) 

444 auth_str.append(f'Signature={signature}') 

445 request.headers['Authorization'] = ', '.join(auth_str) 

446 return request 

447 

448 def _modify_request_before_signing(self, request): 

449 if 'Authorization' in request.headers: 

450 del request.headers['Authorization'] 

451 self._set_necessary_date_headers(request) 

452 if self.credentials.token: 

453 if 'X-Amz-Security-Token' in request.headers: 

454 del request.headers['X-Amz-Security-Token'] 

455 request.headers['X-Amz-Security-Token'] = self.credentials.token 

456 

457 if not request.context.get('payload_signing_enabled', True): 

458 if 'X-Amz-Content-SHA256' in request.headers: 

459 del request.headers['X-Amz-Content-SHA256'] 

460 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD 

461 

462 def _set_necessary_date_headers(self, request): 

463 # The spec allows for either the Date _or_ the X-Amz-Date value to be 

464 # used so we check both. If there's a Date header, we use the date 

465 # header. Otherwise we use the X-Amz-Date header. 

466 if 'Date' in request.headers: 

467 del request.headers['Date'] 

468 datetime_timestamp = datetime.datetime.strptime( 

469 request.context['timestamp'], SIGV4_TIMESTAMP 

470 ) 

471 request.headers['Date'] = formatdate( 

472 int(calendar.timegm(datetime_timestamp.timetuple())) 

473 ) 

474 if 'X-Amz-Date' in request.headers: 

475 del request.headers['X-Amz-Date'] 

476 else: 

477 if 'X-Amz-Date' in request.headers: 

478 del request.headers['X-Amz-Date'] 

479 request.headers['X-Amz-Date'] = request.context['timestamp'] 

480 

481 

482class S3SigV4Auth(SigV4Auth): 

483 def _modify_request_before_signing(self, request): 

484 super()._modify_request_before_signing(request) 

485 if 'X-Amz-Content-SHA256' in request.headers: 

486 del request.headers['X-Amz-Content-SHA256'] 

487 

488 request.headers['X-Amz-Content-SHA256'] = self.payload(request) 

489 

490 def _should_sha256_sign_payload(self, request): 

491 # S3 allows optional body signing, so to minimize the performance 

492 # impact, we opt to not SHA256 sign the body on streaming uploads, 

493 # provided that we're on https. 

494 client_config = request.context.get('client_config') 

495 s3_config = getattr(client_config, 's3', None) 

496 

497 # The config could be None if it isn't set, or if the customer sets it 

498 # to None. 

499 if s3_config is None: 

500 s3_config = {} 

501 

502 # The explicit configuration takes precedence over any implicit 

503 # configuration. 

504 sign_payload = s3_config.get('payload_signing_enabled', None) 

505 if sign_payload is not None: 

506 return sign_payload 

507 

508 # We require that both a checksum be present and https be enabled 

509 # to implicitly disable body signing. The combination of TLS and 

510 # a checksum is sufficiently secure and durable for us to be 

511 # confident in the request without body signing. 

512 checksum_header = 'Content-MD5' 

513 checksum_context = request.context.get('checksum', {}) 

514 algorithm = checksum_context.get('request_algorithm') 

515 if isinstance(algorithm, dict) and algorithm.get('in') == 'header': 

516 checksum_header = algorithm['name'] 

517 if ( 

518 not request.url.startswith("https") 

519 or checksum_header not in request.headers 

520 ): 

521 return True 

522 

523 # If the input is streaming we disable body signing by default. 

524 if request.context.get('has_streaming_input', False): 

525 return False 

526 

527 # If the S3-specific checks had no results, delegate to the generic 

528 # checks. 

529 return super()._should_sha256_sign_payload(request) 

530 

531 def _normalize_url_path(self, path): 

532 # For S3, we do not normalize the path. 

533 return path 

534 

535 

536class S3ExpressAuth(S3SigV4Auth): 

537 REQUIRES_IDENTITY_CACHE = True 

538 

539 def __init__( 

540 self, credentials, service_name, region_name, *, identity_cache 

541 ): 

542 super().__init__(credentials, service_name, region_name) 

543 self._identity_cache = identity_cache 

544 

545 def add_auth(self, request): 

546 super().add_auth(request) 

547 

548 def _modify_request_before_signing(self, request): 

549 super()._modify_request_before_signing(request) 

550 if 'x-amz-s3session-token' not in request.headers: 

551 request.headers['x-amz-s3session-token'] = self.credentials.token 

552 # S3Express does not support STS' X-Amz-Security-Token 

553 if 'X-Amz-Security-Token' in request.headers: 

554 del request.headers['X-Amz-Security-Token'] 

555 

556 

557class S3ExpressPostAuth(S3ExpressAuth): 

558 REQUIRES_IDENTITY_CACHE = True 

559 

560 def add_auth(self, request): 

561 datetime_now = get_current_datetime() 

562 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

563 

564 fields = {} 

565 if request.context.get('s3-presign-post-fields', None) is not None: 

566 fields = request.context['s3-presign-post-fields'] 

567 

568 policy = {} 

569 conditions = [] 

570 if request.context.get('s3-presign-post-policy', None) is not None: 

571 policy = request.context['s3-presign-post-policy'] 

572 if policy.get('conditions', None) is not None: 

573 conditions = policy['conditions'] 

574 

575 policy['conditions'] = conditions 

576 

577 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

578 fields['x-amz-credential'] = self.scope(request) 

579 fields['x-amz-date'] = request.context['timestamp'] 

580 

581 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

582 conditions.append({'x-amz-credential': self.scope(request)}) 

583 conditions.append({'x-amz-date': request.context['timestamp']}) 

584 

585 if self.credentials.token is not None: 

586 fields['X-Amz-S3session-Token'] = self.credentials.token 

587 conditions.append( 

588 {'X-Amz-S3session-Token': self.credentials.token} 

589 ) 

590 

591 # Dump the base64 encoded policy into the fields dictionary. 

592 fields['policy'] = base64.b64encode( 

593 json.dumps(policy).encode('utf-8') 

594 ).decode('utf-8') 

595 

596 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

597 

598 request.context['s3-presign-post-fields'] = fields 

599 request.context['s3-presign-post-policy'] = policy 

600 

601 

602class S3ExpressQueryAuth(S3ExpressAuth): 

603 DEFAULT_EXPIRES = 300 

604 REQUIRES_IDENTITY_CACHE = True 

605 

606 def __init__( 

607 self, 

608 credentials, 

609 service_name, 

610 region_name, 

611 *, 

612 identity_cache, 

613 expires=DEFAULT_EXPIRES, 

614 ): 

615 super().__init__( 

616 credentials, 

617 service_name, 

618 region_name, 

619 identity_cache=identity_cache, 

620 ) 

621 self._expires = expires 

622 

623 def _modify_request_before_signing(self, request): 

624 # We automatically set this header, so if it's the auto-set value we 

625 # want to get rid of it since it doesn't make sense for presigned urls. 

626 content_type = request.headers.get('content-type') 

627 blocklisted_content_type = ( 

628 'application/x-www-form-urlencoded; charset=utf-8' 

629 ) 

630 if content_type == blocklisted_content_type: 

631 del request.headers['content-type'] 

632 

633 # Note that we're not including X-Amz-Signature. 

634 # From the docs: "The Canonical Query String must include all the query 

635 # parameters from the preceding table except for X-Amz-Signature. 

636 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

637 

638 auth_params = { 

639 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

640 'X-Amz-Credential': self.scope(request), 

641 'X-Amz-Date': request.context['timestamp'], 

642 'X-Amz-Expires': self._expires, 

643 'X-Amz-SignedHeaders': signed_headers, 

644 } 

645 if self.credentials.token is not None: 

646 auth_params['X-Amz-S3session-Token'] = self.credentials.token 

647 # Now parse the original query string to a dict, inject our new query 

648 # params, and serialize back to a query string. 

649 url_parts = urlsplit(request.url) 

650 # parse_qs makes each value a list, but in our case we know we won't 

651 # have repeated keys so we know we have single element lists which we 

652 # can convert back to scalar values. 

653 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

654 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

655 

656 if request.params: 

657 query_dict.update(request.params) 

658 request.params = {} 

659 # The spec is particular about this. It *has* to be: 

660 # https://<endpoint>?<operation params>&<auth params> 

661 # You can't mix the two types of params together, i.e just keep doing 

662 # new_query_params.update(op_params) 

663 # new_query_params.update(auth_params) 

664 # percent_encode_sequence(new_query_params) 

665 operation_params = '' 

666 if request.data: 

667 # We also need to move the body params into the query string. To 

668 # do this, we first have to convert it to a dict. 

669 query_dict.update(_get_body_as_dict(request)) 

670 request.data = '' 

671 if query_dict: 

672 operation_params = percent_encode_sequence(query_dict) + '&' 

673 new_query_string = ( 

674 f"{operation_params}{percent_encode_sequence(auth_params)}" 

675 ) 

676 # url_parts is a tuple (and therefore immutable) so we need to create 

677 # a new url_parts with the new query string. 

678 # <part> - <index> 

679 # scheme - 0 

680 # netloc - 1 

681 # path - 2 

682 # query - 3 <-- we're replacing this. 

683 # fragment - 4 

684 p = url_parts 

685 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

686 request.url = urlunsplit(new_url_parts) 

687 

688 def _inject_signature_to_request(self, request, signature): 

689 # Rather than calculating an "Authorization" header, for the query 

690 # param quth, we just append an 'X-Amz-Signature' param to the end 

691 # of the query string. 

692 request.url += f'&X-Amz-Signature={signature}' 

693 

694 def _normalize_url_path(self, path): 

695 # For S3, we do not normalize the path. 

696 return path 

697 

698 def payload(self, request): 

699 # From the doc link above: 

700 # "You don't include a payload hash in the Canonical Request, because 

701 # when you create a presigned URL, you don't know anything about the 

702 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

703 return UNSIGNED_PAYLOAD 

704 

705 

706class SigV4QueryAuth(SigV4Auth): 

707 DEFAULT_EXPIRES = 3600 

708 

709 def __init__( 

710 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES 

711 ): 

712 super().__init__(credentials, service_name, region_name) 

713 self._expires = expires 

714 

715 def _modify_request_before_signing(self, request): 

716 # We automatically set this header, so if it's the auto-set value we 

717 # want to get rid of it since it doesn't make sense for presigned urls. 

718 content_type = request.headers.get('content-type') 

719 blacklisted_content_type = ( 

720 'application/x-www-form-urlencoded; charset=utf-8' 

721 ) 

722 if content_type == blacklisted_content_type: 

723 del request.headers['content-type'] 

724 

725 # Note that we're not including X-Amz-Signature. 

726 # From the docs: "The Canonical Query String must include all the query 

727 # parameters from the preceding table except for X-Amz-Signature. 

728 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

729 

730 auth_params = { 

731 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

732 'X-Amz-Credential': self.scope(request), 

733 'X-Amz-Date': request.context['timestamp'], 

734 'X-Amz-Expires': self._expires, 

735 'X-Amz-SignedHeaders': signed_headers, 

736 } 

737 if self.credentials.token is not None: 

738 auth_params['X-Amz-Security-Token'] = self.credentials.token 

739 # Now parse the original query string to a dict, inject our new query 

740 # params, and serialize back to a query string. 

741 url_parts = urlsplit(request.url) 

742 # parse_qs makes each value a list, but in our case we know we won't 

743 # have repeated keys so we know we have single element lists which we 

744 # can convert back to scalar values. 

745 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

746 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

747 

748 if request.params: 

749 query_dict.update(request.params) 

750 request.params = {} 

751 # The spec is particular about this. It *has* to be: 

752 # https://<endpoint>?<operation params>&<auth params> 

753 # You can't mix the two types of params together, i.e just keep doing 

754 # new_query_params.update(op_params) 

755 # new_query_params.update(auth_params) 

756 # percent_encode_sequence(new_query_params) 

757 operation_params = '' 

758 if request.data: 

759 # We also need to move the body params into the query string. To 

760 # do this, we first have to convert it to a dict. 

761 query_dict.update(_get_body_as_dict(request)) 

762 request.data = '' 

763 if query_dict: 

764 operation_params = percent_encode_sequence(query_dict) + '&' 

765 new_query_string = ( 

766 f"{operation_params}{percent_encode_sequence(auth_params)}" 

767 ) 

768 # url_parts is a tuple (and therefore immutable) so we need to create 

769 # a new url_parts with the new query string. 

770 # <part> - <index> 

771 # scheme - 0 

772 # netloc - 1 

773 # path - 2 

774 # query - 3 <-- we're replacing this. 

775 # fragment - 4 

776 p = url_parts 

777 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

778 request.url = urlunsplit(new_url_parts) 

779 

780 def _inject_signature_to_request(self, request, signature): 

781 # Rather than calculating an "Authorization" header, for the query 

782 # param quth, we just append an 'X-Amz-Signature' param to the end 

783 # of the query string. 

784 request.url += f'&X-Amz-Signature={signature}' 

785 

786 

787class S3SigV4QueryAuth(SigV4QueryAuth): 

788 """S3 SigV4 auth using query parameters. 

789 

790 This signer will sign a request using query parameters and signature 

791 version 4, i.e a "presigned url" signer. 

792 

793 Based off of: 

794 

795 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html 

796 

797 """ 

798 

799 def _normalize_url_path(self, path): 

800 # For S3, we do not normalize the path. 

801 return path 

802 

803 def payload(self, request): 

804 # From the doc link above: 

805 # "You don't include a payload hash in the Canonical Request, because 

806 # when you create a presigned URL, you don't know anything about the 

807 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

808 return UNSIGNED_PAYLOAD 

809 

810 

811class S3SigV4PostAuth(SigV4Auth): 

812 """ 

813 Presigns a s3 post 

814 

815 Implementation doc here: 

816 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html 

817 """ 

818 

819 def add_auth(self, request): 

820 datetime_now = get_current_datetime() 

821 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

822 

823 fields = {} 

824 if request.context.get('s3-presign-post-fields', None) is not None: 

825 fields = request.context['s3-presign-post-fields'] 

826 

827 policy = {} 

828 conditions = [] 

829 if request.context.get('s3-presign-post-policy', None) is not None: 

830 policy = request.context['s3-presign-post-policy'] 

831 if policy.get('conditions', None) is not None: 

832 conditions = policy['conditions'] 

833 

834 policy['conditions'] = conditions 

835 

836 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

837 fields['x-amz-credential'] = self.scope(request) 

838 fields['x-amz-date'] = request.context['timestamp'] 

839 

840 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

841 conditions.append({'x-amz-credential': self.scope(request)}) 

842 conditions.append({'x-amz-date': request.context['timestamp']}) 

843 

844 if self.credentials.token is not None: 

845 fields['x-amz-security-token'] = self.credentials.token 

846 conditions.append({'x-amz-security-token': self.credentials.token}) 

847 

848 # Dump the base64 encoded policy into the fields dictionary. 

849 fields['policy'] = base64.b64encode( 

850 json.dumps(policy).encode('utf-8') 

851 ).decode('utf-8') 

852 

853 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

854 

855 request.context['s3-presign-post-fields'] = fields 

856 request.context['s3-presign-post-policy'] = policy 

857 

858 

859class HmacV1Auth(BaseSigner): 

860 # List of Query String Arguments of Interest 

861 QSAOfInterest = [ 

862 'accelerate', 

863 'acl', 

864 'cors', 

865 'defaultObjectAcl', 

866 'location', 

867 'logging', 

868 'partNumber', 

869 'policy', 

870 'requestPayment', 

871 'torrent', 

872 'versioning', 

873 'versionId', 

874 'versions', 

875 'website', 

876 'uploads', 

877 'uploadId', 

878 'response-content-type', 

879 'response-content-language', 

880 'response-expires', 

881 'response-cache-control', 

882 'response-content-disposition', 

883 'response-content-encoding', 

884 'delete', 

885 'lifecycle', 

886 'tagging', 

887 'restore', 

888 'storageClass', 

889 'notification', 

890 'replication', 

891 'requestPayment', 

892 'analytics', 

893 'metrics', 

894 'inventory', 

895 'select', 

896 'select-type', 

897 'object-lock', 

898 ] 

899 

900 def __init__(self, credentials, service_name=None, region_name=None): 

901 self.credentials = credentials 

902 

903 def sign_string(self, string_to_sign): 

904 new_hmac = hmac.new( 

905 self.credentials.secret_key.encode('utf-8'), digestmod=sha1 

906 ) 

907 new_hmac.update(string_to_sign.encode('utf-8')) 

908 return encodebytes(new_hmac.digest()).strip().decode('utf-8') 

909 

910 def canonical_standard_headers(self, headers): 

911 interesting_headers = ['content-md5', 'content-type', 'date'] 

912 hoi = [] 

913 if 'Date' in headers: 

914 del headers['Date'] 

915 headers['Date'] = self._get_date() 

916 for ih in interesting_headers: 

917 found = False 

918 for key in headers: 

919 lk = key.lower() 

920 if headers[key] is not None and lk == ih: 

921 hoi.append(headers[key].strip()) 

922 found = True 

923 if not found: 

924 hoi.append('') 

925 return '\n'.join(hoi) 

926 

927 def canonical_custom_headers(self, headers): 

928 hoi = [] 

929 custom_headers = {} 

930 for key in headers: 

931 lk = key.lower() 

932 if headers[key] is not None: 

933 if lk.startswith('x-amz-'): 

934 custom_headers[lk] = ','.join( 

935 v.strip() for v in headers.get_all(key) 

936 ) 

937 sorted_header_keys = sorted(custom_headers.keys()) 

938 for key in sorted_header_keys: 

939 hoi.append(f"{key}:{custom_headers[key]}") 

940 return '\n'.join(hoi) 

941 

942 def unquote_v(self, nv): 

943 """ 

944 TODO: Do we need this? 

945 """ 

946 if len(nv) == 1: 

947 return nv 

948 else: 

949 return (nv[0], unquote(nv[1])) 

950 

951 def canonical_resource(self, split, auth_path=None): 

952 # don't include anything after the first ? in the resource... 

953 # unless it is one of the QSA of interest, defined above 

954 # NOTE: 

955 # The path in the canonical resource should always be the 

956 # full path including the bucket name, even for virtual-hosting 

957 # style addressing. The ``auth_path`` keeps track of the full 

958 # path for the canonical resource and would be passed in if 

959 # the client was using virtual-hosting style. 

960 if auth_path is not None: 

961 buf = auth_path 

962 else: 

963 buf = split.path 

964 if split.query: 

965 qsa = split.query.split('&') 

966 qsa = [a.split('=', 1) for a in qsa] 

967 qsa = [ 

968 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest 

969 ] 

970 if len(qsa) > 0: 

971 qsa.sort(key=itemgetter(0)) 

972 qsa = ['='.join(a) for a in qsa] 

973 buf += '?' 

974 buf += '&'.join(qsa) 

975 return buf 

976 

977 def canonical_string( 

978 self, method, split, headers, expires=None, auth_path=None 

979 ): 

980 cs = method.upper() + '\n' 

981 cs += self.canonical_standard_headers(headers) + '\n' 

982 custom_headers = self.canonical_custom_headers(headers) 

983 if custom_headers: 

984 cs += custom_headers + '\n' 

985 cs += self.canonical_resource(split, auth_path=auth_path) 

986 return cs 

987 

988 def get_signature( 

989 self, method, split, headers, expires=None, auth_path=None 

990 ): 

991 if self.credentials.token: 

992 del headers['x-amz-security-token'] 

993 headers['x-amz-security-token'] = self.credentials.token 

994 string_to_sign = self.canonical_string( 

995 method, split, headers, auth_path=auth_path 

996 ) 

997 logger.debug('StringToSign:\n%s', string_to_sign) 

998 return self.sign_string(string_to_sign) 

999 

1000 def add_auth(self, request): 

1001 if self.credentials is None: 

1002 raise NoCredentialsError 

1003 logger.debug("Calculating signature using hmacv1 auth.") 

1004 split = urlsplit(request.url) 

1005 logger.debug("HTTP request method: %s", request.method) 

1006 signature = self.get_signature( 

1007 request.method, split, request.headers, auth_path=request.auth_path 

1008 ) 

1009 self._inject_signature(request, signature) 

1010 

1011 def _get_date(self): 

1012 return formatdate(usegmt=True) 

1013 

1014 def _inject_signature(self, request, signature): 

1015 if 'Authorization' in request.headers: 

1016 # We have to do this because request.headers is not 

1017 # normal dictionary. It has the (unintuitive) behavior 

1018 # of aggregating repeated setattr calls for the same 

1019 # key value. For example: 

1020 # headers['foo'] = 'a'; headers['foo'] = 'b' 

1021 # list(headers) will print ['foo', 'foo']. 

1022 del request.headers['Authorization'] 

1023 

1024 auth_header = f"AWS {self.credentials.access_key}:{signature}" 

1025 request.headers['Authorization'] = auth_header 

1026 

1027 

1028class HmacV1QueryAuth(HmacV1Auth): 

1029 """ 

1030 Generates a presigned request for s3. 

1031 

1032 Spec from this document: 

1033 

1034 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html 

1035 #RESTAuthenticationQueryStringAuth 

1036 

1037 """ 

1038 

1039 DEFAULT_EXPIRES = 3600 

1040 

1041 def __init__(self, credentials, expires=DEFAULT_EXPIRES): 

1042 self.credentials = credentials 

1043 self._expires = expires 

1044 

1045 def _get_date(self): 

1046 return str(int(time.time() + int(self._expires))) 

1047 

1048 def _inject_signature(self, request, signature): 

1049 query_dict = {} 

1050 query_dict['AWSAccessKeyId'] = self.credentials.access_key 

1051 query_dict['Signature'] = signature 

1052 

1053 for header_key in request.headers: 

1054 lk = header_key.lower() 

1055 # For query string requests, Expires is used instead of the 

1056 # Date header. 

1057 if header_key == 'Date': 

1058 query_dict['Expires'] = request.headers['Date'] 

1059 # We only want to include relevant headers in the query string. 

1060 # These can be anything that starts with x-amz, is Content-MD5, 

1061 # or is Content-Type. 

1062 elif lk.startswith('x-amz-') or lk in ( 

1063 'content-md5', 

1064 'content-type', 

1065 ): 

1066 query_dict[lk] = request.headers[lk] 

1067 # Combine all of the identified headers into an encoded 

1068 # query string 

1069 new_query_string = percent_encode_sequence(query_dict) 

1070 

1071 # Create a new url with the presigned url. 

1072 p = urlsplit(request.url) 

1073 if p[3]: 

1074 # If there was a pre-existing query string, we should 

1075 # add that back before injecting the new query string. 

1076 new_query_string = f'{p[3]}&{new_query_string}' 

1077 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

1078 request.url = urlunsplit(new_url_parts) 

1079 

1080 

1081class HmacV1PostAuth(HmacV1Auth): 

1082 """ 

1083 Generates a presigned post for s3. 

1084 

1085 Spec from this document: 

1086 

1087 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html 

1088 """ 

1089 

1090 def add_auth(self, request): 

1091 fields = {} 

1092 if request.context.get('s3-presign-post-fields', None) is not None: 

1093 fields = request.context['s3-presign-post-fields'] 

1094 

1095 policy = {} 

1096 conditions = [] 

1097 if request.context.get('s3-presign-post-policy', None) is not None: 

1098 policy = request.context['s3-presign-post-policy'] 

1099 if policy.get('conditions', None) is not None: 

1100 conditions = policy['conditions'] 

1101 

1102 policy['conditions'] = conditions 

1103 

1104 fields['AWSAccessKeyId'] = self.credentials.access_key 

1105 

1106 if self.credentials.token is not None: 

1107 fields['x-amz-security-token'] = self.credentials.token 

1108 conditions.append({'x-amz-security-token': self.credentials.token}) 

1109 

1110 # Dump the base64 encoded policy into the fields dictionary. 

1111 fields['policy'] = base64.b64encode( 

1112 json.dumps(policy).encode('utf-8') 

1113 ).decode('utf-8') 

1114 

1115 fields['signature'] = self.sign_string(fields['policy']) 

1116 

1117 request.context['s3-presign-post-fields'] = fields 

1118 request.context['s3-presign-post-policy'] = policy 

1119 

1120 

1121class BearerAuth(TokenSigner): 

1122 """ 

1123 Performs bearer token authorization by placing the bearer token in the 

1124 Authorization header as specified by Section 2.1 of RFC 6750. 

1125 

1126 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1 

1127 """ 

1128 

1129 def add_auth(self, request): 

1130 if self.auth_token is None: 

1131 raise NoAuthTokenError() 

1132 

1133 auth_header = f'Bearer {self.auth_token.token}' 

1134 if 'Authorization' in request.headers: 

1135 del request.headers['Authorization'] 

1136 request.headers['Authorization'] = auth_header 

1137 

1138 

1139def resolve_auth_type(auth_trait): 

1140 for auth_type in auth_trait: 

1141 if auth_type == 'smithy.api#noAuth': 

1142 return AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type] 

1143 elif auth_type in AUTH_TYPE_TO_SIGNATURE_VERSION: 

1144 signature_version = AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type] 

1145 if signature_version in AUTH_TYPE_MAPS: 

1146 return signature_version 

1147 else: 

1148 raise UnknownSignatureVersionError(signature_version=auth_type) 

1149 raise UnsupportedSignatureVersionError(signature_version=auth_trait) 

1150 

1151 

1152def resolve_auth_scheme_preference(preference_list, auth_options): 

1153 service_supported = [scheme.split('#')[-1] for scheme in auth_options] 

1154 

1155 unsupported = [ 

1156 scheme 

1157 for scheme in preference_list 

1158 if scheme not in AUTH_PREF_TO_SIGNATURE_VERSION 

1159 ] 

1160 if unsupported: 

1161 logger.debug( 

1162 "Unsupported auth schemes in preference list: %r", unsupported 

1163 ) 

1164 

1165 combined = preference_list + service_supported 

1166 prioritized_schemes = [ 

1167 scheme 

1168 for scheme in dict.fromkeys(combined) 

1169 if scheme in service_supported 

1170 ] 

1171 

1172 for scheme in prioritized_schemes: 

1173 if scheme == 'noAuth': 

1174 return AUTH_PREF_TO_SIGNATURE_VERSION[scheme] 

1175 sig_version = AUTH_PREF_TO_SIGNATURE_VERSION.get(scheme) 

1176 if sig_version in AUTH_TYPE_MAPS: 

1177 return sig_version 

1178 

1179 raise UnsupportedSignatureVersionError( 

1180 signature_version=', '.join(sorted(service_supported)) 

1181 ) 

1182 

1183 

1184AUTH_TYPE_MAPS = { 

1185 'v2': SigV2Auth, 

1186 'v3': SigV3Auth, 

1187 'v3https': SigV3Auth, 

1188 's3': HmacV1Auth, 

1189 's3-query': HmacV1QueryAuth, 

1190 's3-presign-post': HmacV1PostAuth, 

1191 's3v4-presign-post': S3SigV4PostAuth, 

1192 'v4-s3express': S3ExpressAuth, 

1193 'v4-s3express-query': S3ExpressQueryAuth, 

1194 'v4-s3express-presign-post': S3ExpressPostAuth, 

1195 'bearer': BearerAuth, 

1196} 

1197 

1198# Define v4 signers depending on if CRT is present 

1199if HAS_CRT: 

1200 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS 

1201 

1202 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS) 

1203else: 

1204 AUTH_TYPE_MAPS.update( 

1205 { 

1206 'v4': SigV4Auth, 

1207 'v4-query': SigV4QueryAuth, 

1208 's3v4': S3SigV4Auth, 

1209 's3v4-query': S3SigV4QueryAuth, 

1210 } 

1211 ) 

1212 

1213AUTH_TYPE_TO_SIGNATURE_VERSION = { 

1214 'aws.auth#sigv4': 'v4', 

1215 'aws.auth#sigv4a': 'v4a', 

1216 'smithy.api#httpBearerAuth': 'bearer', 

1217 'smithy.api#noAuth': 'none', 

1218} 

1219 

1220# Mapping used specifically for resolving user-configured auth scheme preferences. 

1221# This is similar to AUTH_TYPE_TO_SIGNATURE_VERSION, but uses simplified keys by 

1222# stripping the auth trait prefixes ('smithy.api#httpBearerAuth' → 'httpBearerAuth'). 

1223# These simplified keys match what customers are expected to provide in configuration. 

1224AUTH_PREF_TO_SIGNATURE_VERSION = { 

1225 auth_scheme.split('#')[-1]: sig_version 

1226 for auth_scheme, sig_version in AUTH_TYPE_TO_SIGNATURE_VERSION.items() 

1227}