Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/auth.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

616 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import base64 

15import calendar 

16import datetime 

17import functools 

18import hmac 

19import json 

20import logging 

21import time 

22from collections.abc import Mapping 

23from email.utils import formatdate 

24from hashlib import sha1, sha256 

25from operator import itemgetter 

26 

27from botocore.compat import ( 

28 HAS_CRT, 

29 MD5_AVAILABLE, # noqa: F401 

30 HTTPHeaders, 

31 encodebytes, 

32 ensure_unicode, 

33 get_current_datetime, 

34 parse_qs, 

35 quote, 

36 unquote, 

37 urlsplit, 

38 urlunsplit, 

39) 

40from botocore.exceptions import ( 

41 NoAuthTokenError, 

42 NoCredentialsError, 

43 UnknownSignatureVersionError, 

44 UnsupportedSignatureVersionError, 

45) 

46from botocore.utils import ( 

47 is_valid_ipv6_endpoint_url, 

48 normalize_url_path, 

49 percent_encode_sequence, 

50) 

51 

52logger = logging.getLogger(__name__) 

53 

54 

55EMPTY_SHA256_HASH = ( 

56 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' 

57) 

58# This is the buffer size used when calculating sha256 checksums. 

59# Experimenting with various buffer sizes showed that this value generally 

60# gave the best result (in terms of performance). 

61PAYLOAD_BUFFER = 1024 * 1024 

62ISO8601 = '%Y-%m-%dT%H:%M:%SZ' 

63SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ' 

64SIGNED_HEADERS_BLACKLIST = [ 

65 'connection', 

66 'expect', 

67 'keep-alive', 

68 'proxy-authenticate', 

69 'proxy-authorization', 

70 'te', 

71 'trailer', 

72 'transfer-encoding', 

73 'upgrade', 

74 'user-agent', 

75 'x-amzn-trace-id', 

76] 

77UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD' 

78STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER' 

79 

80 

81def _host_from_url(url): 

82 # Given URL, derive value for host header. Ensure that value: 

83 # 1) is lowercase 

84 # 2) excludes port, if it was the default port 

85 # 3) excludes userinfo 

86 url_parts = urlsplit(url) 

87 host = url_parts.hostname # urlsplit's hostname is always lowercase 

88 if is_valid_ipv6_endpoint_url(url): 

89 host = f'[{host}]' 

90 default_ports = { 

91 'http': 80, 

92 'https': 443, 

93 } 

94 if url_parts.port is not None: 

95 if url_parts.port != default_ports.get(url_parts.scheme): 

96 host = f'{host}:{url_parts.port}' 

97 return host 

98 

99 

100def _get_body_as_dict(request): 

101 # For query services, request.data is form-encoded and is already a 

102 # dict, but for other services such as rest-json it could be a json 

103 # string or bytes. In those cases we attempt to load the data as a 

104 # dict. 

105 data = request.data 

106 if isinstance(data, bytes): 

107 data = json.loads(data.decode('utf-8')) 

108 elif isinstance(data, str): 

109 data = json.loads(data) 

110 return data 

111 

112 

113class BaseSigner: 

114 REQUIRES_REGION = False 

115 REQUIRES_TOKEN = False 

116 

117 def add_auth(self, request): 

118 raise NotImplementedError("add_auth") 

119 

120 

121class TokenSigner(BaseSigner): 

122 REQUIRES_TOKEN = True 

123 """ 

124 Signers that expect an authorization token to perform the authorization 

125 """ 

126 

127 def __init__(self, auth_token): 

128 self.auth_token = auth_token 

129 

130 

131class SigV2Auth(BaseSigner): 

132 """ 

133 Sign a request with Signature V2. 

134 """ 

135 

136 def __init__(self, credentials): 

137 self.credentials = credentials 

138 

139 def calc_signature(self, request, params): 

140 logger.debug("Calculating signature using v2 auth.") 

141 split = urlsplit(request.url) 

142 path = split.path 

143 if len(path) == 0: 

144 path = '/' 

145 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n" 

146 lhmac = hmac.new( 

147 self.credentials.secret_key.encode("utf-8"), digestmod=sha256 

148 ) 

149 pairs = [] 

150 for key in sorted(params): 

151 # Any previous signature should not be a part of this 

152 # one, so we skip that particular key. This prevents 

153 # issues during retries. 

154 if key == 'Signature': 

155 continue 

156 value = str(params[key]) 

157 quoted_key = quote(key.encode('utf-8'), safe='') 

158 quoted_value = quote(value.encode('utf-8'), safe='-_~') 

159 pairs.append(f'{quoted_key}={quoted_value}') 

160 qs = '&'.join(pairs) 

161 string_to_sign += qs 

162 logger.debug('String to sign: %s', string_to_sign) 

163 lhmac.update(string_to_sign.encode('utf-8')) 

164 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8') 

165 return (qs, b64) 

166 

167 def add_auth(self, request): 

168 # The auth handler is the last thing called in the 

169 # preparation phase of a prepared request. 

170 # Because of this we have to parse the query params 

171 # from the request body so we can update them with 

172 # the sigv2 auth params. 

173 if self.credentials is None: 

174 raise NoCredentialsError() 

175 if request.data: 

176 # POST 

177 params = request.data 

178 else: 

179 # GET 

180 params = request.params 

181 params['AWSAccessKeyId'] = self.credentials.access_key 

182 params['SignatureVersion'] = '2' 

183 params['SignatureMethod'] = 'HmacSHA256' 

184 params['Timestamp'] = time.strftime(ISO8601, time.gmtime()) 

185 if self.credentials.token: 

186 params['SecurityToken'] = self.credentials.token 

187 qs, signature = self.calc_signature(request, params) 

188 params['Signature'] = signature 

189 return request 

190 

191 

192class SigV3Auth(BaseSigner): 

193 def __init__(self, credentials): 

194 self.credentials = credentials 

195 

196 def add_auth(self, request): 

197 if self.credentials is None: 

198 raise NoCredentialsError() 

199 if 'Date' in request.headers: 

200 del request.headers['Date'] 

201 request.headers['Date'] = formatdate(usegmt=True) 

202 if self.credentials.token: 

203 if 'X-Amz-Security-Token' in request.headers: 

204 del request.headers['X-Amz-Security-Token'] 

205 request.headers['X-Amz-Security-Token'] = self.credentials.token 

206 new_hmac = hmac.new( 

207 self.credentials.secret_key.encode('utf-8'), digestmod=sha256 

208 ) 

209 new_hmac.update(request.headers['Date'].encode('utf-8')) 

210 encoded_signature = encodebytes(new_hmac.digest()).strip() 

211 signature = ( 

212 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key}," 

213 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}" 

214 ) 

215 if 'X-Amzn-Authorization' in request.headers: 

216 del request.headers['X-Amzn-Authorization'] 

217 request.headers['X-Amzn-Authorization'] = signature 

218 

219 

220class SigV4Auth(BaseSigner): 

221 """ 

222 Sign a request with Signature V4. 

223 """ 

224 

225 REQUIRES_REGION = True 

226 

227 def __init__(self, credentials, service_name, region_name): 

228 self.credentials = credentials 

229 # We initialize these value here so the unit tests can have 

230 # valid values. But these will get overriden in ``add_auth`` 

231 # later for real requests. 

232 self._region_name = region_name 

233 self._service_name = service_name 

234 

235 def _sign(self, key, msg, hex=False): 

236 if hex: 

237 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest() 

238 else: 

239 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest() 

240 return sig 

241 

242 def headers_to_sign(self, request): 

243 """ 

244 Select the headers from the request that need to be included 

245 in the StringToSign. 

246 """ 

247 header_map = HTTPHeaders() 

248 for name, value in request.headers.items(): 

249 lname = name.lower() 

250 if lname not in SIGNED_HEADERS_BLACKLIST: 

251 header_map[lname] = value 

252 if 'host' not in header_map: 

253 # TODO: We should set the host ourselves, instead of relying on our 

254 # HTTP client to set it for us. 

255 header_map['host'] = _host_from_url(request.url) 

256 return header_map 

257 

258 def canonical_query_string(self, request): 

259 # The query string can come from two parts. One is the 

260 # params attribute of the request. The other is from the request 

261 # url (in which case we have to re-split the url into its components 

262 # and parse out the query string component). 

263 if request.params: 

264 return self._canonical_query_string_params(request.params) 

265 else: 

266 return self._canonical_query_string_url(urlsplit(request.url)) 

267 

268 def _canonical_query_string_params(self, params): 

269 # [(key, value), (key2, value2)] 

270 key_val_pairs = [] 

271 if isinstance(params, Mapping): 

272 params = params.items() 

273 for key, value in params: 

274 key_val_pairs.append( 

275 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~')) 

276 ) 

277 sorted_key_vals = [] 

278 # Sort by the URI-encoded key names, and in the case of 

279 # repeated keys, then sort by the value. 

280 for key, value in sorted(key_val_pairs): 

281 sorted_key_vals.append(f'{key}={value}') 

282 canonical_query_string = '&'.join(sorted_key_vals) 

283 return canonical_query_string 

284 

285 def _canonical_query_string_url(self, parts): 

286 canonical_query_string = '' 

287 if parts.query: 

288 # [(key, value), (key2, value2)] 

289 key_val_pairs = [] 

290 for pair in parts.query.split('&'): 

291 key, _, value = pair.partition('=') 

292 key_val_pairs.append((key, value)) 

293 sorted_key_vals = [] 

294 # Sort by the URI-encoded key names, and in the case of 

295 # repeated keys, then sort by the value. 

296 for key, value in sorted(key_val_pairs): 

297 sorted_key_vals.append(f'{key}={value}') 

298 canonical_query_string = '&'.join(sorted_key_vals) 

299 return canonical_query_string 

300 

301 def canonical_headers(self, headers_to_sign): 

302 """ 

303 Return the headers that need to be included in the StringToSign 

304 in their canonical form by converting all header keys to lower 

305 case, sorting them in alphabetical order and then joining 

306 them into a string, separated by newlines. 

307 """ 

308 headers = [] 

309 sorted_header_names = sorted(set(headers_to_sign)) 

310 for key in sorted_header_names: 

311 value = ','.join( 

312 self._header_value(v) for v in headers_to_sign.get_all(key) 

313 ) 

314 headers.append(f'{key}:{ensure_unicode(value)}') 

315 return '\n'.join(headers) 

316 

317 def _header_value(self, value): 

318 # From the sigv4 docs: 

319 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue) 

320 # 

321 # The Trimall function removes excess white space before and after 

322 # values, and converts sequential spaces to a single space. 

323 return ' '.join(value.split()) 

324 

325 def signed_headers(self, headers_to_sign): 

326 headers = sorted(n.lower().strip() for n in set(headers_to_sign)) 

327 return ';'.join(headers) 

328 

329 def _is_streaming_checksum_payload(self, request): 

330 checksum_context = request.context.get('checksum', {}) 

331 algorithm = checksum_context.get('request_algorithm') 

332 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer' 

333 

334 def payload(self, request): 

335 if self._is_streaming_checksum_payload(request): 

336 return STREAMING_UNSIGNED_PAYLOAD_TRAILER 

337 elif not self._should_sha256_sign_payload(request): 

338 # When payload signing is disabled, we use this static string in 

339 # place of the payload checksum. 

340 return UNSIGNED_PAYLOAD 

341 request_body = request.body 

342 if request_body and hasattr(request_body, 'seek'): 

343 position = request_body.tell() 

344 read_chunksize = functools.partial( 

345 request_body.read, PAYLOAD_BUFFER 

346 ) 

347 checksum = sha256() 

348 for chunk in iter(read_chunksize, b''): 

349 checksum.update(chunk) 

350 hex_checksum = checksum.hexdigest() 

351 request_body.seek(position) 

352 return hex_checksum 

353 elif request_body: 

354 # The request serialization has ensured that 

355 # request.body is a bytes() type. 

356 return sha256(request_body).hexdigest() 

357 else: 

358 return EMPTY_SHA256_HASH 

359 

360 def _should_sha256_sign_payload(self, request): 

361 # Payloads will always be signed over insecure connections. 

362 if not request.url.startswith('https'): 

363 return True 

364 

365 # Certain operations may have payload signing disabled by default. 

366 # Since we don't have access to the operation model, we pass in this 

367 # bit of metadata through the request context. 

368 return request.context.get('payload_signing_enabled', True) 

369 

370 def canonical_request(self, request): 

371 cr = [request.method.upper()] 

372 path = self._normalize_url_path(urlsplit(request.url).path) 

373 cr.append(path) 

374 cr.append(self.canonical_query_string(request)) 

375 headers_to_sign = self.headers_to_sign(request) 

376 cr.append(self.canonical_headers(headers_to_sign) + '\n') 

377 cr.append(self.signed_headers(headers_to_sign)) 

378 if 'X-Amz-Content-SHA256' in request.headers: 

379 body_checksum = request.headers['X-Amz-Content-SHA256'] 

380 else: 

381 body_checksum = self.payload(request) 

382 cr.append(body_checksum) 

383 return '\n'.join(cr) 

384 

385 def _normalize_url_path(self, path): 

386 normalized_path = quote(normalize_url_path(path), safe='/~') 

387 return normalized_path 

388 

389 def scope(self, request): 

390 scope = [self.credentials.access_key] 

391 scope.append(request.context['timestamp'][0:8]) 

392 scope.append(self._region_name) 

393 scope.append(self._service_name) 

394 scope.append('aws4_request') 

395 return '/'.join(scope) 

396 

397 def credential_scope(self, request): 

398 scope = [] 

399 scope.append(request.context['timestamp'][0:8]) 

400 scope.append(self._region_name) 

401 scope.append(self._service_name) 

402 scope.append('aws4_request') 

403 return '/'.join(scope) 

404 

405 def string_to_sign(self, request, canonical_request): 

406 """ 

407 Return the canonical StringToSign as well as a dict 

408 containing the original version of all headers that 

409 were included in the StringToSign. 

410 """ 

411 sts = ['AWS4-HMAC-SHA256'] 

412 sts.append(request.context['timestamp']) 

413 sts.append(self.credential_scope(request)) 

414 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest()) 

415 return '\n'.join(sts) 

416 

417 def signature(self, string_to_sign, request): 

418 key = self.credentials.secret_key 

419 k_date = self._sign( 

420 (f"AWS4{key}").encode(), request.context["timestamp"][0:8] 

421 ) 

422 k_region = self._sign(k_date, self._region_name) 

423 k_service = self._sign(k_region, self._service_name) 

424 k_signing = self._sign(k_service, 'aws4_request') 

425 return self._sign(k_signing, string_to_sign, hex=True) 

426 

427 def add_auth(self, request): 

428 if self.credentials is None: 

429 raise NoCredentialsError() 

430 datetime_now = get_current_datetime() 

431 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

432 # This could be a retry. Make sure the previous 

433 # authorization header is removed first. 

434 self._modify_request_before_signing(request) 

435 canonical_request = self.canonical_request(request) 

436 logger.debug("Calculating signature using v4 auth.") 

437 logger.debug('CanonicalRequest:\n%s', canonical_request) 

438 string_to_sign = self.string_to_sign(request, canonical_request) 

439 logger.debug('StringToSign:\n%s', string_to_sign) 

440 signature = self.signature(string_to_sign, request) 

441 logger.debug('Signature:\n%s', signature) 

442 

443 self._inject_signature_to_request(request, signature) 

444 

445 def _inject_signature_to_request(self, request, signature): 

446 auth_str = [f'AWS4-HMAC-SHA256 Credential={self.scope(request)}'] 

447 headers_to_sign = self.headers_to_sign(request) 

448 auth_str.append( 

449 f"SignedHeaders={self.signed_headers(headers_to_sign)}" 

450 ) 

451 auth_str.append(f'Signature={signature}') 

452 request.headers['Authorization'] = ', '.join(auth_str) 

453 return request 

454 

455 def _modify_request_before_signing(self, request): 

456 if 'Authorization' in request.headers: 

457 del request.headers['Authorization'] 

458 self._set_necessary_date_headers(request) 

459 if self.credentials.token: 

460 if 'X-Amz-Security-Token' in request.headers: 

461 del request.headers['X-Amz-Security-Token'] 

462 request.headers['X-Amz-Security-Token'] = self.credentials.token 

463 

464 if not request.context.get('payload_signing_enabled', True): 

465 if 'X-Amz-Content-SHA256' in request.headers: 

466 del request.headers['X-Amz-Content-SHA256'] 

467 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD 

468 

469 def _set_necessary_date_headers(self, request): 

470 # The spec allows for either the Date _or_ the X-Amz-Date value to be 

471 # used so we check both. If there's a Date header, we use the date 

472 # header. Otherwise we use the X-Amz-Date header. 

473 if 'Date' in request.headers: 

474 del request.headers['Date'] 

475 datetime_timestamp = datetime.datetime.strptime( 

476 request.context['timestamp'], SIGV4_TIMESTAMP 

477 ) 

478 request.headers['Date'] = formatdate( 

479 int(calendar.timegm(datetime_timestamp.timetuple())) 

480 ) 

481 if 'X-Amz-Date' in request.headers: 

482 del request.headers['X-Amz-Date'] 

483 else: 

484 if 'X-Amz-Date' in request.headers: 

485 del request.headers['X-Amz-Date'] 

486 request.headers['X-Amz-Date'] = request.context['timestamp'] 

487 

488 

489class S3SigV4Auth(SigV4Auth): 

490 def _modify_request_before_signing(self, request): 

491 super()._modify_request_before_signing(request) 

492 if 'X-Amz-Content-SHA256' in request.headers: 

493 del request.headers['X-Amz-Content-SHA256'] 

494 

495 request.headers['X-Amz-Content-SHA256'] = self.payload(request) 

496 

497 def _should_sha256_sign_payload(self, request): 

498 # S3 allows optional body signing, so to minimize the performance 

499 # impact, we opt to not SHA256 sign the body on streaming uploads, 

500 # provided that we're on https. 

501 client_config = request.context.get('client_config') 

502 s3_config = getattr(client_config, 's3', None) 

503 

504 # The config could be None if it isn't set, or if the customer sets it 

505 # to None. 

506 if s3_config is None: 

507 s3_config = {} 

508 

509 # The explicit configuration takes precedence over any implicit 

510 # configuration. 

511 sign_payload = s3_config.get('payload_signing_enabled', None) 

512 if sign_payload is not None: 

513 return sign_payload 

514 

515 # We require that both a checksum be present and https be enabled 

516 # to implicitly disable body signing. The combination of TLS and 

517 # a checksum is sufficiently secure and durable for us to be 

518 # confident in the request without body signing. 

519 checksum_header = 'Content-MD5' 

520 checksum_context = request.context.get('checksum', {}) 

521 algorithm = checksum_context.get('request_algorithm') 

522 if isinstance(algorithm, dict) and algorithm.get('in') == 'header': 

523 checksum_header = algorithm['name'] 

524 if ( 

525 not request.url.startswith("https") 

526 or checksum_header not in request.headers 

527 ): 

528 return True 

529 

530 # If the input is streaming we disable body signing by default. 

531 if request.context.get('has_streaming_input', False): 

532 return False 

533 

534 # If the S3-specific checks had no results, delegate to the generic 

535 # checks. 

536 return super()._should_sha256_sign_payload(request) 

537 

538 def _normalize_url_path(self, path): 

539 # For S3, we do not normalize the path. 

540 return path 

541 

542 

543class S3ExpressAuth(S3SigV4Auth): 

544 REQUIRES_IDENTITY_CACHE = True 

545 

546 def __init__( 

547 self, credentials, service_name, region_name, *, identity_cache 

548 ): 

549 super().__init__(credentials, service_name, region_name) 

550 self._identity_cache = identity_cache 

551 

552 def add_auth(self, request): 

553 super().add_auth(request) 

554 

555 def _modify_request_before_signing(self, request): 

556 super()._modify_request_before_signing(request) 

557 if 'x-amz-s3session-token' not in request.headers: 

558 request.headers['x-amz-s3session-token'] = self.credentials.token 

559 # S3Express does not support STS' X-Amz-Security-Token 

560 if 'X-Amz-Security-Token' in request.headers: 

561 del request.headers['X-Amz-Security-Token'] 

562 

563 

564class S3ExpressPostAuth(S3ExpressAuth): 

565 REQUIRES_IDENTITY_CACHE = True 

566 

567 def add_auth(self, request): 

568 datetime_now = get_current_datetime() 

569 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

570 

571 fields = {} 

572 if request.context.get('s3-presign-post-fields', None) is not None: 

573 fields = request.context['s3-presign-post-fields'] 

574 

575 policy = {} 

576 conditions = [] 

577 if request.context.get('s3-presign-post-policy', None) is not None: 

578 policy = request.context['s3-presign-post-policy'] 

579 if policy.get('conditions', None) is not None: 

580 conditions = policy['conditions'] 

581 

582 policy['conditions'] = conditions 

583 

584 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

585 fields['x-amz-credential'] = self.scope(request) 

586 fields['x-amz-date'] = request.context['timestamp'] 

587 

588 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

589 conditions.append({'x-amz-credential': self.scope(request)}) 

590 conditions.append({'x-amz-date': request.context['timestamp']}) 

591 

592 if self.credentials.token is not None: 

593 fields['X-Amz-S3session-Token'] = self.credentials.token 

594 conditions.append( 

595 {'X-Amz-S3session-Token': self.credentials.token} 

596 ) 

597 

598 # Dump the base64 encoded policy into the fields dictionary. 

599 fields['policy'] = base64.b64encode( 

600 json.dumps(policy).encode('utf-8') 

601 ).decode('utf-8') 

602 

603 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

604 

605 request.context['s3-presign-post-fields'] = fields 

606 request.context['s3-presign-post-policy'] = policy 

607 

608 

609class S3ExpressQueryAuth(S3ExpressAuth): 

610 DEFAULT_EXPIRES = 300 

611 REQUIRES_IDENTITY_CACHE = True 

612 

613 def __init__( 

614 self, 

615 credentials, 

616 service_name, 

617 region_name, 

618 *, 

619 identity_cache, 

620 expires=DEFAULT_EXPIRES, 

621 ): 

622 super().__init__( 

623 credentials, 

624 service_name, 

625 region_name, 

626 identity_cache=identity_cache, 

627 ) 

628 self._expires = expires 

629 

630 def _modify_request_before_signing(self, request): 

631 # We automatically set this header, so if it's the auto-set value we 

632 # want to get rid of it since it doesn't make sense for presigned urls. 

633 content_type = request.headers.get('content-type') 

634 blocklisted_content_type = ( 

635 'application/x-www-form-urlencoded; charset=utf-8' 

636 ) 

637 if content_type == blocklisted_content_type: 

638 del request.headers['content-type'] 

639 

640 # Note that we're not including X-Amz-Signature. 

641 # From the docs: "The Canonical Query String must include all the query 

642 # parameters from the preceding table except for X-Amz-Signature. 

643 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

644 

645 auth_params = { 

646 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

647 'X-Amz-Credential': self.scope(request), 

648 'X-Amz-Date': request.context['timestamp'], 

649 'X-Amz-Expires': self._expires, 

650 'X-Amz-SignedHeaders': signed_headers, 

651 } 

652 if self.credentials.token is not None: 

653 auth_params['X-Amz-S3session-Token'] = self.credentials.token 

654 # Now parse the original query string to a dict, inject our new query 

655 # params, and serialize back to a query string. 

656 url_parts = urlsplit(request.url) 

657 # parse_qs makes each value a list, but in our case we know we won't 

658 # have repeated keys so we know we have single element lists which we 

659 # can convert back to scalar values. 

660 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

661 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

662 

663 if request.params: 

664 query_dict.update(request.params) 

665 request.params = {} 

666 # The spec is particular about this. It *has* to be: 

667 # https://<endpoint>?<operation params>&<auth params> 

668 # You can't mix the two types of params together, i.e just keep doing 

669 # new_query_params.update(op_params) 

670 # new_query_params.update(auth_params) 

671 # percent_encode_sequence(new_query_params) 

672 operation_params = '' 

673 if request.data: 

674 # We also need to move the body params into the query string. To 

675 # do this, we first have to convert it to a dict. 

676 query_dict.update(_get_body_as_dict(request)) 

677 request.data = '' 

678 if query_dict: 

679 operation_params = percent_encode_sequence(query_dict) + '&' 

680 new_query_string = ( 

681 f"{operation_params}{percent_encode_sequence(auth_params)}" 

682 ) 

683 # url_parts is a tuple (and therefore immutable) so we need to create 

684 # a new url_parts with the new query string. 

685 # <part> - <index> 

686 # scheme - 0 

687 # netloc - 1 

688 # path - 2 

689 # query - 3 <-- we're replacing this. 

690 # fragment - 4 

691 p = url_parts 

692 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

693 request.url = urlunsplit(new_url_parts) 

694 

695 def _inject_signature_to_request(self, request, signature): 

696 # Rather than calculating an "Authorization" header, for the query 

697 # param quth, we just append an 'X-Amz-Signature' param to the end 

698 # of the query string. 

699 request.url += f'&X-Amz-Signature={signature}' 

700 

701 def _normalize_url_path(self, path): 

702 # For S3, we do not normalize the path. 

703 return path 

704 

705 def payload(self, request): 

706 # From the doc link above: 

707 # "You don't include a payload hash in the Canonical Request, because 

708 # when you create a presigned URL, you don't know anything about the 

709 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

710 return UNSIGNED_PAYLOAD 

711 

712 

713class SigV4QueryAuth(SigV4Auth): 

714 DEFAULT_EXPIRES = 3600 

715 

716 def __init__( 

717 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES 

718 ): 

719 super().__init__(credentials, service_name, region_name) 

720 self._expires = expires 

721 

722 def _modify_request_before_signing(self, request): 

723 # We automatically set this header, so if it's the auto-set value we 

724 # want to get rid of it since it doesn't make sense for presigned urls. 

725 content_type = request.headers.get('content-type') 

726 blacklisted_content_type = ( 

727 'application/x-www-form-urlencoded; charset=utf-8' 

728 ) 

729 if content_type == blacklisted_content_type: 

730 del request.headers['content-type'] 

731 

732 # Note that we're not including X-Amz-Signature. 

733 # From the docs: "The Canonical Query String must include all the query 

734 # parameters from the preceding table except for X-Amz-Signature. 

735 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

736 

737 auth_params = { 

738 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

739 'X-Amz-Credential': self.scope(request), 

740 'X-Amz-Date': request.context['timestamp'], 

741 'X-Amz-Expires': self._expires, 

742 'X-Amz-SignedHeaders': signed_headers, 

743 } 

744 if self.credentials.token is not None: 

745 auth_params['X-Amz-Security-Token'] = self.credentials.token 

746 # Now parse the original query string to a dict, inject our new query 

747 # params, and serialize back to a query string. 

748 url_parts = urlsplit(request.url) 

749 # parse_qs makes each value a list, but in our case we know we won't 

750 # have repeated keys so we know we have single element lists which we 

751 # can convert back to scalar values. 

752 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

753 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

754 

755 if request.params: 

756 query_dict.update(request.params) 

757 request.params = {} 

758 # The spec is particular about this. It *has* to be: 

759 # https://<endpoint>?<operation params>&<auth params> 

760 # You can't mix the two types of params together, i.e just keep doing 

761 # new_query_params.update(op_params) 

762 # new_query_params.update(auth_params) 

763 # percent_encode_sequence(new_query_params) 

764 operation_params = '' 

765 if request.data: 

766 # We also need to move the body params into the query string. To 

767 # do this, we first have to convert it to a dict. 

768 query_dict.update(_get_body_as_dict(request)) 

769 request.data = '' 

770 if query_dict: 

771 operation_params = percent_encode_sequence(query_dict) + '&' 

772 new_query_string = ( 

773 f"{operation_params}{percent_encode_sequence(auth_params)}" 

774 ) 

775 # url_parts is a tuple (and therefore immutable) so we need to create 

776 # a new url_parts with the new query string. 

777 # <part> - <index> 

778 # scheme - 0 

779 # netloc - 1 

780 # path - 2 

781 # query - 3 <-- we're replacing this. 

782 # fragment - 4 

783 p = url_parts 

784 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

785 request.url = urlunsplit(new_url_parts) 

786 

787 def _inject_signature_to_request(self, request, signature): 

788 # Rather than calculating an "Authorization" header, for the query 

789 # param quth, we just append an 'X-Amz-Signature' param to the end 

790 # of the query string. 

791 request.url += f'&X-Amz-Signature={signature}' 

792 

793 

794class S3SigV4QueryAuth(SigV4QueryAuth): 

795 """S3 SigV4 auth using query parameters. 

796 

797 This signer will sign a request using query parameters and signature 

798 version 4, i.e a "presigned url" signer. 

799 

800 Based off of: 

801 

802 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html 

803 

804 """ 

805 

806 def _normalize_url_path(self, path): 

807 # For S3, we do not normalize the path. 

808 return path 

809 

810 def payload(self, request): 

811 # From the doc link above: 

812 # "You don't include a payload hash in the Canonical Request, because 

813 # when you create a presigned URL, you don't know anything about the 

814 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

815 return UNSIGNED_PAYLOAD 

816 

817 

818class S3SigV4PostAuth(SigV4Auth): 

819 """ 

820 Presigns a s3 post 

821 

822 Implementation doc here: 

823 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html 

824 """ 

825 

826 def add_auth(self, request): 

827 datetime_now = get_current_datetime() 

828 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

829 

830 fields = {} 

831 if request.context.get('s3-presign-post-fields', None) is not None: 

832 fields = request.context['s3-presign-post-fields'] 

833 

834 policy = {} 

835 conditions = [] 

836 if request.context.get('s3-presign-post-policy', None) is not None: 

837 policy = request.context['s3-presign-post-policy'] 

838 if policy.get('conditions', None) is not None: 

839 conditions = policy['conditions'] 

840 

841 policy['conditions'] = conditions 

842 

843 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

844 fields['x-amz-credential'] = self.scope(request) 

845 fields['x-amz-date'] = request.context['timestamp'] 

846 

847 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

848 conditions.append({'x-amz-credential': self.scope(request)}) 

849 conditions.append({'x-amz-date': request.context['timestamp']}) 

850 

851 if self.credentials.token is not None: 

852 fields['x-amz-security-token'] = self.credentials.token 

853 conditions.append({'x-amz-security-token': self.credentials.token}) 

854 

855 # Dump the base64 encoded policy into the fields dictionary. 

856 fields['policy'] = base64.b64encode( 

857 json.dumps(policy).encode('utf-8') 

858 ).decode('utf-8') 

859 

860 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

861 

862 request.context['s3-presign-post-fields'] = fields 

863 request.context['s3-presign-post-policy'] = policy 

864 

865 

866class HmacV1Auth(BaseSigner): 

867 # List of Query String Arguments of Interest 

868 QSAOfInterest = [ 

869 'accelerate', 

870 'acl', 

871 'cors', 

872 'defaultObjectAcl', 

873 'location', 

874 'logging', 

875 'partNumber', 

876 'policy', 

877 'requestPayment', 

878 'torrent', 

879 'versioning', 

880 'versionId', 

881 'versions', 

882 'website', 

883 'uploads', 

884 'uploadId', 

885 'response-content-type', 

886 'response-content-language', 

887 'response-expires', 

888 'response-cache-control', 

889 'response-content-disposition', 

890 'response-content-encoding', 

891 'delete', 

892 'lifecycle', 

893 'tagging', 

894 'restore', 

895 'storageClass', 

896 'notification', 

897 'replication', 

898 'requestPayment', 

899 'analytics', 

900 'metrics', 

901 'inventory', 

902 'select', 

903 'select-type', 

904 'object-lock', 

905 ] 

906 

907 def __init__(self, credentials, service_name=None, region_name=None): 

908 self.credentials = credentials 

909 

910 def sign_string(self, string_to_sign): 

911 new_hmac = hmac.new( 

912 self.credentials.secret_key.encode('utf-8'), digestmod=sha1 

913 ) 

914 new_hmac.update(string_to_sign.encode('utf-8')) 

915 return encodebytes(new_hmac.digest()).strip().decode('utf-8') 

916 

917 def canonical_standard_headers(self, headers): 

918 interesting_headers = ['content-md5', 'content-type', 'date'] 

919 hoi = [] 

920 if 'Date' in headers: 

921 del headers['Date'] 

922 headers['Date'] = self._get_date() 

923 for ih in interesting_headers: 

924 found = False 

925 for key in headers: 

926 lk = key.lower() 

927 if headers[key] is not None and lk == ih: 

928 hoi.append(headers[key].strip()) 

929 found = True 

930 if not found: 

931 hoi.append('') 

932 return '\n'.join(hoi) 

933 

934 def canonical_custom_headers(self, headers): 

935 hoi = [] 

936 custom_headers = {} 

937 for key in headers: 

938 lk = key.lower() 

939 if headers[key] is not None: 

940 if lk.startswith('x-amz-'): 

941 custom_headers[lk] = ','.join( 

942 v.strip() for v in headers.get_all(key) 

943 ) 

944 sorted_header_keys = sorted(custom_headers.keys()) 

945 for key in sorted_header_keys: 

946 hoi.append(f"{key}:{custom_headers[key]}") 

947 return '\n'.join(hoi) 

948 

949 def unquote_v(self, nv): 

950 """ 

951 TODO: Do we need this? 

952 """ 

953 if len(nv) == 1: 

954 return nv 

955 else: 

956 return (nv[0], unquote(nv[1])) 

957 

958 def canonical_resource(self, split, auth_path=None): 

959 # don't include anything after the first ? in the resource... 

960 # unless it is one of the QSA of interest, defined above 

961 # NOTE: 

962 # The path in the canonical resource should always be the 

963 # full path including the bucket name, even for virtual-hosting 

964 # style addressing. The ``auth_path`` keeps track of the full 

965 # path for the canonical resource and would be passed in if 

966 # the client was using virtual-hosting style. 

967 if auth_path is not None: 

968 buf = auth_path 

969 else: 

970 buf = split.path 

971 if split.query: 

972 qsa = split.query.split('&') 

973 qsa = [a.split('=', 1) for a in qsa] 

974 qsa = [ 

975 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest 

976 ] 

977 if len(qsa) > 0: 

978 qsa.sort(key=itemgetter(0)) 

979 qsa = ['='.join(a) for a in qsa] 

980 buf += '?' 

981 buf += '&'.join(qsa) 

982 return buf 

983 

984 def canonical_string( 

985 self, method, split, headers, expires=None, auth_path=None 

986 ): 

987 cs = method.upper() + '\n' 

988 cs += self.canonical_standard_headers(headers) + '\n' 

989 custom_headers = self.canonical_custom_headers(headers) 

990 if custom_headers: 

991 cs += custom_headers + '\n' 

992 cs += self.canonical_resource(split, auth_path=auth_path) 

993 return cs 

994 

995 def get_signature( 

996 self, method, split, headers, expires=None, auth_path=None 

997 ): 

998 if self.credentials.token: 

999 del headers['x-amz-security-token'] 

1000 headers['x-amz-security-token'] = self.credentials.token 

1001 string_to_sign = self.canonical_string( 

1002 method, split, headers, auth_path=auth_path 

1003 ) 

1004 logger.debug('StringToSign:\n%s', string_to_sign) 

1005 return self.sign_string(string_to_sign) 

1006 

1007 def add_auth(self, request): 

1008 if self.credentials is None: 

1009 raise NoCredentialsError 

1010 logger.debug("Calculating signature using hmacv1 auth.") 

1011 split = urlsplit(request.url) 

1012 logger.debug("HTTP request method: %s", request.method) 

1013 signature = self.get_signature( 

1014 request.method, split, request.headers, auth_path=request.auth_path 

1015 ) 

1016 self._inject_signature(request, signature) 

1017 

1018 def _get_date(self): 

1019 return formatdate(usegmt=True) 

1020 

1021 def _inject_signature(self, request, signature): 

1022 if 'Authorization' in request.headers: 

1023 # We have to do this because request.headers is not 

1024 # normal dictionary. It has the (unintuitive) behavior 

1025 # of aggregating repeated setattr calls for the same 

1026 # key value. For example: 

1027 # headers['foo'] = 'a'; headers['foo'] = 'b' 

1028 # list(headers) will print ['foo', 'foo']. 

1029 del request.headers['Authorization'] 

1030 

1031 auth_header = f"AWS {self.credentials.access_key}:{signature}" 

1032 request.headers['Authorization'] = auth_header 

1033 

1034 

1035class HmacV1QueryAuth(HmacV1Auth): 

1036 """ 

1037 Generates a presigned request for s3. 

1038 

1039 Spec from this document: 

1040 

1041 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html 

1042 #RESTAuthenticationQueryStringAuth 

1043 

1044 """ 

1045 

1046 DEFAULT_EXPIRES = 3600 

1047 

1048 def __init__(self, credentials, expires=DEFAULT_EXPIRES): 

1049 self.credentials = credentials 

1050 self._expires = expires 

1051 

1052 def _get_date(self): 

1053 return str(int(time.time() + int(self._expires))) 

1054 

1055 def _inject_signature(self, request, signature): 

1056 query_dict = {} 

1057 query_dict['AWSAccessKeyId'] = self.credentials.access_key 

1058 query_dict['Signature'] = signature 

1059 

1060 for header_key in request.headers: 

1061 lk = header_key.lower() 

1062 # For query string requests, Expires is used instead of the 

1063 # Date header. 

1064 if header_key == 'Date': 

1065 query_dict['Expires'] = request.headers['Date'] 

1066 # We only want to include relevant headers in the query string. 

1067 # These can be anything that starts with x-amz, is Content-MD5, 

1068 # or is Content-Type. 

1069 elif lk.startswith('x-amz-') or lk in ( 

1070 'content-md5', 

1071 'content-type', 

1072 ): 

1073 query_dict[lk] = request.headers[lk] 

1074 # Combine all of the identified headers into an encoded 

1075 # query string 

1076 new_query_string = percent_encode_sequence(query_dict) 

1077 

1078 # Create a new url with the presigned url. 

1079 p = urlsplit(request.url) 

1080 if p[3]: 

1081 # If there was a pre-existing query string, we should 

1082 # add that back before injecting the new query string. 

1083 new_query_string = f'{p[3]}&{new_query_string}' 

1084 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

1085 request.url = urlunsplit(new_url_parts) 

1086 

1087 

1088class HmacV1PostAuth(HmacV1Auth): 

1089 """ 

1090 Generates a presigned post for s3. 

1091 

1092 Spec from this document: 

1093 

1094 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html 

1095 """ 

1096 

1097 def add_auth(self, request): 

1098 fields = {} 

1099 if request.context.get('s3-presign-post-fields', None) is not None: 

1100 fields = request.context['s3-presign-post-fields'] 

1101 

1102 policy = {} 

1103 conditions = [] 

1104 if request.context.get('s3-presign-post-policy', None) is not None: 

1105 policy = request.context['s3-presign-post-policy'] 

1106 if policy.get('conditions', None) is not None: 

1107 conditions = policy['conditions'] 

1108 

1109 policy['conditions'] = conditions 

1110 

1111 fields['AWSAccessKeyId'] = self.credentials.access_key 

1112 

1113 if self.credentials.token is not None: 

1114 fields['x-amz-security-token'] = self.credentials.token 

1115 conditions.append({'x-amz-security-token': self.credentials.token}) 

1116 

1117 # Dump the base64 encoded policy into the fields dictionary. 

1118 fields['policy'] = base64.b64encode( 

1119 json.dumps(policy).encode('utf-8') 

1120 ).decode('utf-8') 

1121 

1122 fields['signature'] = self.sign_string(fields['policy']) 

1123 

1124 request.context['s3-presign-post-fields'] = fields 

1125 request.context['s3-presign-post-policy'] = policy 

1126 

1127 

1128class BearerAuth(TokenSigner): 

1129 """ 

1130 Performs bearer token authorization by placing the bearer token in the 

1131 Authorization header as specified by Section 2.1 of RFC 6750. 

1132 

1133 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1 

1134 """ 

1135 

1136 def add_auth(self, request): 

1137 if self.auth_token is None: 

1138 raise NoAuthTokenError() 

1139 

1140 auth_header = f'Bearer {self.auth_token.token}' 

1141 if 'Authorization' in request.headers: 

1142 del request.headers['Authorization'] 

1143 request.headers['Authorization'] = auth_header 

1144 

1145 

1146def resolve_auth_type(auth_trait): 

1147 for auth_type in auth_trait: 

1148 if auth_type == 'smithy.api#noAuth': 

1149 return AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type] 

1150 elif auth_type in AUTH_TYPE_TO_SIGNATURE_VERSION: 

1151 signature_version = AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type] 

1152 if signature_version in AUTH_TYPE_MAPS: 

1153 return signature_version 

1154 else: 

1155 raise UnknownSignatureVersionError(signature_version=auth_type) 

1156 raise UnsupportedSignatureVersionError(signature_version=auth_trait) 

1157 

1158 

1159def resolve_auth_scheme_preference(preference_list, auth_options): 

1160 service_supported = [scheme.split('#')[-1] for scheme in auth_options] 

1161 

1162 unsupported = [ 

1163 scheme 

1164 for scheme in preference_list 

1165 if scheme not in AUTH_PREF_TO_SIGNATURE_VERSION 

1166 ] 

1167 if unsupported: 

1168 logger.debug( 

1169 "Unsupported auth schemes in preference list: %r", unsupported 

1170 ) 

1171 

1172 combined = preference_list + service_supported 

1173 prioritized_schemes = [ 

1174 scheme 

1175 for scheme in dict.fromkeys(combined) 

1176 if scheme in service_supported 

1177 ] 

1178 

1179 for scheme in prioritized_schemes: 

1180 if scheme == 'noAuth': 

1181 return AUTH_PREF_TO_SIGNATURE_VERSION[scheme] 

1182 sig_version = AUTH_PREF_TO_SIGNATURE_VERSION.get(scheme) 

1183 if sig_version in AUTH_TYPE_MAPS: 

1184 return sig_version 

1185 

1186 raise UnsupportedSignatureVersionError( 

1187 signature_version=', '.join(sorted(service_supported)) 

1188 ) 

1189 

1190 

1191AUTH_TYPE_MAPS = { 

1192 'v2': SigV2Auth, 

1193 'v3': SigV3Auth, 

1194 'v3https': SigV3Auth, 

1195 's3': HmacV1Auth, 

1196 's3-query': HmacV1QueryAuth, 

1197 's3-presign-post': HmacV1PostAuth, 

1198 's3v4-presign-post': S3SigV4PostAuth, 

1199 'v4-s3express': S3ExpressAuth, 

1200 'v4-s3express-query': S3ExpressQueryAuth, 

1201 'v4-s3express-presign-post': S3ExpressPostAuth, 

1202 'bearer': BearerAuth, 

1203} 

1204 

1205# Define v4 signers depending on if CRT is present 

1206if HAS_CRT: 

1207 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS 

1208 

1209 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS) 

1210else: 

1211 AUTH_TYPE_MAPS.update( 

1212 { 

1213 'v4': SigV4Auth, 

1214 'v4-query': SigV4QueryAuth, 

1215 's3v4': S3SigV4Auth, 

1216 's3v4-query': S3SigV4QueryAuth, 

1217 } 

1218 ) 

1219 

1220AUTH_TYPE_TO_SIGNATURE_VERSION = { 

1221 'aws.auth#sigv4': 'v4', 

1222 'aws.auth#sigv4a': 'v4a', 

1223 'smithy.api#httpBearerAuth': 'bearer', 

1224 'smithy.api#noAuth': 'none', 

1225} 

1226 

1227# Mapping used specifically for resolving user-configured auth scheme preferences. 

1228# This is similar to AUTH_TYPE_TO_SIGNATURE_VERSION, but uses simplified keys by 

1229# stripping the auth trait prefixes ('smithy.api#httpBearerAuth' → 'httpBearerAuth'). 

1230# These simplified keys match what customers are expected to provide in configuration. 

1231AUTH_PREF_TO_SIGNATURE_VERSION = { 

1232 auth_scheme.split('#')[-1]: sig_version 

1233 for auth_scheme, sig_version in AUTH_TYPE_TO_SIGNATURE_VERSION.items() 

1234}