Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/auth.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

601 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import base64 

15import calendar 

16import datetime 

17import functools 

18import hmac 

19import json 

20import logging 

21import time 

22from collections.abc import Mapping 

23from email.utils import formatdate 

24from hashlib import sha1, sha256 

25from operator import itemgetter 

26 

27from botocore.compat import ( 

28 HAS_CRT, 

29 MD5_AVAILABLE, # noqa: F401 

30 HTTPHeaders, 

31 encodebytes, 

32 ensure_unicode, 

33 parse_qs, 

34 quote, 

35 unquote, 

36 urlsplit, 

37 urlunsplit, 

38) 

39from botocore.exceptions import ( 

40 NoAuthTokenError, 

41 NoCredentialsError, 

42 UnknownSignatureVersionError, 

43 UnsupportedSignatureVersionError, 

44) 

45from botocore.utils import ( 

46 is_valid_ipv6_endpoint_url, 

47 normalize_url_path, 

48 percent_encode_sequence, 

49) 

50 

51logger = logging.getLogger(__name__) 

52 

53 

54EMPTY_SHA256_HASH = ( 

55 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' 

56) 

57# This is the buffer size used when calculating sha256 checksums. 

58# Experimenting with various buffer sizes showed that this value generally 

59# gave the best result (in terms of performance). 

60PAYLOAD_BUFFER = 1024 * 1024 

61ISO8601 = '%Y-%m-%dT%H:%M:%SZ' 

62SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ' 

63SIGNED_HEADERS_BLACKLIST = [ 

64 'expect', 

65 'transfer-encoding', 

66 'user-agent', 

67 'x-amzn-trace-id', 

68] 

69UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD' 

70STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER' 

71 

72 

73def _host_from_url(url): 

74 # Given URL, derive value for host header. Ensure that value: 

75 # 1) is lowercase 

76 # 2) excludes port, if it was the default port 

77 # 3) excludes userinfo 

78 url_parts = urlsplit(url) 

79 host = url_parts.hostname # urlsplit's hostname is always lowercase 

80 if is_valid_ipv6_endpoint_url(url): 

81 host = f'[{host}]' 

82 default_ports = { 

83 'http': 80, 

84 'https': 443, 

85 } 

86 if url_parts.port is not None: 

87 if url_parts.port != default_ports.get(url_parts.scheme): 

88 host = f'{host}:{url_parts.port}' 

89 return host 

90 

91 

92def _get_body_as_dict(request): 

93 # For query services, request.data is form-encoded and is already a 

94 # dict, but for other services such as rest-json it could be a json 

95 # string or bytes. In those cases we attempt to load the data as a 

96 # dict. 

97 data = request.data 

98 if isinstance(data, bytes): 

99 data = json.loads(data.decode('utf-8')) 

100 elif isinstance(data, str): 

101 data = json.loads(data) 

102 return data 

103 

104 

105class BaseSigner: 

106 REQUIRES_REGION = False 

107 REQUIRES_TOKEN = False 

108 

109 def add_auth(self, request): 

110 raise NotImplementedError("add_auth") 

111 

112 

113class TokenSigner(BaseSigner): 

114 REQUIRES_TOKEN = True 

115 """ 

116 Signers that expect an authorization token to perform the authorization 

117 """ 

118 

119 def __init__(self, auth_token): 

120 self.auth_token = auth_token 

121 

122 

123class SigV2Auth(BaseSigner): 

124 """ 

125 Sign a request with Signature V2. 

126 """ 

127 

128 def __init__(self, credentials): 

129 self.credentials = credentials 

130 

131 def calc_signature(self, request, params): 

132 logger.debug("Calculating signature using v2 auth.") 

133 split = urlsplit(request.url) 

134 path = split.path 

135 if len(path) == 0: 

136 path = '/' 

137 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n" 

138 lhmac = hmac.new( 

139 self.credentials.secret_key.encode("utf-8"), digestmod=sha256 

140 ) 

141 pairs = [] 

142 for key in sorted(params): 

143 # Any previous signature should not be a part of this 

144 # one, so we skip that particular key. This prevents 

145 # issues during retries. 

146 if key == 'Signature': 

147 continue 

148 value = str(params[key]) 

149 quoted_key = quote(key.encode('utf-8'), safe='') 

150 quoted_value = quote(value.encode('utf-8'), safe='-_~') 

151 pairs.append(f'{quoted_key}={quoted_value}') 

152 qs = '&'.join(pairs) 

153 string_to_sign += qs 

154 logger.debug('String to sign: %s', string_to_sign) 

155 lhmac.update(string_to_sign.encode('utf-8')) 

156 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8') 

157 return (qs, b64) 

158 

159 def add_auth(self, request): 

160 # The auth handler is the last thing called in the 

161 # preparation phase of a prepared request. 

162 # Because of this we have to parse the query params 

163 # from the request body so we can update them with 

164 # the sigv2 auth params. 

165 if self.credentials is None: 

166 raise NoCredentialsError() 

167 if request.data: 

168 # POST 

169 params = request.data 

170 else: 

171 # GET 

172 params = request.params 

173 params['AWSAccessKeyId'] = self.credentials.access_key 

174 params['SignatureVersion'] = '2' 

175 params['SignatureMethod'] = 'HmacSHA256' 

176 params['Timestamp'] = time.strftime(ISO8601, time.gmtime()) 

177 if self.credentials.token: 

178 params['SecurityToken'] = self.credentials.token 

179 qs, signature = self.calc_signature(request, params) 

180 params['Signature'] = signature 

181 return request 

182 

183 

184class SigV3Auth(BaseSigner): 

185 def __init__(self, credentials): 

186 self.credentials = credentials 

187 

188 def add_auth(self, request): 

189 if self.credentials is None: 

190 raise NoCredentialsError() 

191 if 'Date' in request.headers: 

192 del request.headers['Date'] 

193 request.headers['Date'] = formatdate(usegmt=True) 

194 if self.credentials.token: 

195 if 'X-Amz-Security-Token' in request.headers: 

196 del request.headers['X-Amz-Security-Token'] 

197 request.headers['X-Amz-Security-Token'] = self.credentials.token 

198 new_hmac = hmac.new( 

199 self.credentials.secret_key.encode('utf-8'), digestmod=sha256 

200 ) 

201 new_hmac.update(request.headers['Date'].encode('utf-8')) 

202 encoded_signature = encodebytes(new_hmac.digest()).strip() 

203 signature = ( 

204 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key}," 

205 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}" 

206 ) 

207 if 'X-Amzn-Authorization' in request.headers: 

208 del request.headers['X-Amzn-Authorization'] 

209 request.headers['X-Amzn-Authorization'] = signature 

210 

211 

212class SigV4Auth(BaseSigner): 

213 """ 

214 Sign a request with Signature V4. 

215 """ 

216 

217 REQUIRES_REGION = True 

218 

219 def __init__(self, credentials, service_name, region_name): 

220 self.credentials = credentials 

221 # We initialize these value here so the unit tests can have 

222 # valid values. But these will get overriden in ``add_auth`` 

223 # later for real requests. 

224 self._region_name = region_name 

225 self._service_name = service_name 

226 

227 def _sign(self, key, msg, hex=False): 

228 if hex: 

229 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest() 

230 else: 

231 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest() 

232 return sig 

233 

234 def headers_to_sign(self, request): 

235 """ 

236 Select the headers from the request that need to be included 

237 in the StringToSign. 

238 """ 

239 header_map = HTTPHeaders() 

240 for name, value in request.headers.items(): 

241 lname = name.lower() 

242 if lname not in SIGNED_HEADERS_BLACKLIST: 

243 header_map[lname] = value 

244 if 'host' not in header_map: 

245 # TODO: We should set the host ourselves, instead of relying on our 

246 # HTTP client to set it for us. 

247 header_map['host'] = _host_from_url(request.url) 

248 return header_map 

249 

250 def canonical_query_string(self, request): 

251 # The query string can come from two parts. One is the 

252 # params attribute of the request. The other is from the request 

253 # url (in which case we have to re-split the url into its components 

254 # and parse out the query string component). 

255 if request.params: 

256 return self._canonical_query_string_params(request.params) 

257 else: 

258 return self._canonical_query_string_url(urlsplit(request.url)) 

259 

260 def _canonical_query_string_params(self, params): 

261 # [(key, value), (key2, value2)] 

262 key_val_pairs = [] 

263 if isinstance(params, Mapping): 

264 params = params.items() 

265 for key, value in params: 

266 key_val_pairs.append( 

267 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~')) 

268 ) 

269 sorted_key_vals = [] 

270 # Sort by the URI-encoded key names, and in the case of 

271 # repeated keys, then sort by the value. 

272 for key, value in sorted(key_val_pairs): 

273 sorted_key_vals.append(f'{key}={value}') 

274 canonical_query_string = '&'.join(sorted_key_vals) 

275 return canonical_query_string 

276 

277 def _canonical_query_string_url(self, parts): 

278 canonical_query_string = '' 

279 if parts.query: 

280 # [(key, value), (key2, value2)] 

281 key_val_pairs = [] 

282 for pair in parts.query.split('&'): 

283 key, _, value = pair.partition('=') 

284 key_val_pairs.append((key, value)) 

285 sorted_key_vals = [] 

286 # Sort by the URI-encoded key names, and in the case of 

287 # repeated keys, then sort by the value. 

288 for key, value in sorted(key_val_pairs): 

289 sorted_key_vals.append(f'{key}={value}') 

290 canonical_query_string = '&'.join(sorted_key_vals) 

291 return canonical_query_string 

292 

293 def canonical_headers(self, headers_to_sign): 

294 """ 

295 Return the headers that need to be included in the StringToSign 

296 in their canonical form by converting all header keys to lower 

297 case, sorting them in alphabetical order and then joining 

298 them into a string, separated by newlines. 

299 """ 

300 headers = [] 

301 sorted_header_names = sorted(set(headers_to_sign)) 

302 for key in sorted_header_names: 

303 value = ','.join( 

304 self._header_value(v) for v in headers_to_sign.get_all(key) 

305 ) 

306 headers.append(f'{key}:{ensure_unicode(value)}') 

307 return '\n'.join(headers) 

308 

309 def _header_value(self, value): 

310 # From the sigv4 docs: 

311 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue) 

312 # 

313 # The Trimall function removes excess white space before and after 

314 # values, and converts sequential spaces to a single space. 

315 return ' '.join(value.split()) 

316 

317 def signed_headers(self, headers_to_sign): 

318 headers = sorted(n.lower().strip() for n in set(headers_to_sign)) 

319 return ';'.join(headers) 

320 

321 def _is_streaming_checksum_payload(self, request): 

322 checksum_context = request.context.get('checksum', {}) 

323 algorithm = checksum_context.get('request_algorithm') 

324 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer' 

325 

326 def payload(self, request): 

327 if self._is_streaming_checksum_payload(request): 

328 return STREAMING_UNSIGNED_PAYLOAD_TRAILER 

329 elif not self._should_sha256_sign_payload(request): 

330 # When payload signing is disabled, we use this static string in 

331 # place of the payload checksum. 

332 return UNSIGNED_PAYLOAD 

333 request_body = request.body 

334 if request_body and hasattr(request_body, 'seek'): 

335 position = request_body.tell() 

336 read_chunksize = functools.partial( 

337 request_body.read, PAYLOAD_BUFFER 

338 ) 

339 checksum = sha256() 

340 for chunk in iter(read_chunksize, b''): 

341 checksum.update(chunk) 

342 hex_checksum = checksum.hexdigest() 

343 request_body.seek(position) 

344 return hex_checksum 

345 elif request_body: 

346 # The request serialization has ensured that 

347 # request.body is a bytes() type. 

348 return sha256(request_body).hexdigest() 

349 else: 

350 return EMPTY_SHA256_HASH 

351 

352 def _should_sha256_sign_payload(self, request): 

353 # Payloads will always be signed over insecure connections. 

354 if not request.url.startswith('https'): 

355 return True 

356 

357 # Certain operations may have payload signing disabled by default. 

358 # Since we don't have access to the operation model, we pass in this 

359 # bit of metadata through the request context. 

360 return request.context.get('payload_signing_enabled', True) 

361 

362 def canonical_request(self, request): 

363 cr = [request.method.upper()] 

364 path = self._normalize_url_path(urlsplit(request.url).path) 

365 cr.append(path) 

366 cr.append(self.canonical_query_string(request)) 

367 headers_to_sign = self.headers_to_sign(request) 

368 cr.append(self.canonical_headers(headers_to_sign) + '\n') 

369 cr.append(self.signed_headers(headers_to_sign)) 

370 if 'X-Amz-Content-SHA256' in request.headers: 

371 body_checksum = request.headers['X-Amz-Content-SHA256'] 

372 else: 

373 body_checksum = self.payload(request) 

374 cr.append(body_checksum) 

375 return '\n'.join(cr) 

376 

377 def _normalize_url_path(self, path): 

378 normalized_path = quote(normalize_url_path(path), safe='/~') 

379 return normalized_path 

380 

381 def scope(self, request): 

382 scope = [self.credentials.access_key] 

383 scope.append(request.context['timestamp'][0:8]) 

384 scope.append(self._region_name) 

385 scope.append(self._service_name) 

386 scope.append('aws4_request') 

387 return '/'.join(scope) 

388 

389 def credential_scope(self, request): 

390 scope = [] 

391 scope.append(request.context['timestamp'][0:8]) 

392 scope.append(self._region_name) 

393 scope.append(self._service_name) 

394 scope.append('aws4_request') 

395 return '/'.join(scope) 

396 

397 def string_to_sign(self, request, canonical_request): 

398 """ 

399 Return the canonical StringToSign as well as a dict 

400 containing the original version of all headers that 

401 were included in the StringToSign. 

402 """ 

403 sts = ['AWS4-HMAC-SHA256'] 

404 sts.append(request.context['timestamp']) 

405 sts.append(self.credential_scope(request)) 

406 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest()) 

407 return '\n'.join(sts) 

408 

409 def signature(self, string_to_sign, request): 

410 key = self.credentials.secret_key 

411 k_date = self._sign( 

412 (f"AWS4{key}").encode(), request.context["timestamp"][0:8] 

413 ) 

414 k_region = self._sign(k_date, self._region_name) 

415 k_service = self._sign(k_region, self._service_name) 

416 k_signing = self._sign(k_service, 'aws4_request') 

417 return self._sign(k_signing, string_to_sign, hex=True) 

418 

419 def add_auth(self, request): 

420 if self.credentials is None: 

421 raise NoCredentialsError() 

422 datetime_now = datetime.datetime.utcnow() 

423 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

424 # This could be a retry. Make sure the previous 

425 # authorization header is removed first. 

426 self._modify_request_before_signing(request) 

427 canonical_request = self.canonical_request(request) 

428 logger.debug("Calculating signature using v4 auth.") 

429 logger.debug('CanonicalRequest:\n%s', canonical_request) 

430 string_to_sign = self.string_to_sign(request, canonical_request) 

431 logger.debug('StringToSign:\n%s', string_to_sign) 

432 signature = self.signature(string_to_sign, request) 

433 logger.debug('Signature:\n%s', signature) 

434 

435 self._inject_signature_to_request(request, signature) 

436 

437 def _inject_signature_to_request(self, request, signature): 

438 auth_str = [f'AWS4-HMAC-SHA256 Credential={self.scope(request)}'] 

439 headers_to_sign = self.headers_to_sign(request) 

440 auth_str.append( 

441 f"SignedHeaders={self.signed_headers(headers_to_sign)}" 

442 ) 

443 auth_str.append(f'Signature={signature}') 

444 request.headers['Authorization'] = ', '.join(auth_str) 

445 return request 

446 

447 def _modify_request_before_signing(self, request): 

448 if 'Authorization' in request.headers: 

449 del request.headers['Authorization'] 

450 self._set_necessary_date_headers(request) 

451 if self.credentials.token: 

452 if 'X-Amz-Security-Token' in request.headers: 

453 del request.headers['X-Amz-Security-Token'] 

454 request.headers['X-Amz-Security-Token'] = self.credentials.token 

455 

456 if not request.context.get('payload_signing_enabled', True): 

457 if 'X-Amz-Content-SHA256' in request.headers: 

458 del request.headers['X-Amz-Content-SHA256'] 

459 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD 

460 

461 def _set_necessary_date_headers(self, request): 

462 # The spec allows for either the Date _or_ the X-Amz-Date value to be 

463 # used so we check both. If there's a Date header, we use the date 

464 # header. Otherwise we use the X-Amz-Date header. 

465 if 'Date' in request.headers: 

466 del request.headers['Date'] 

467 datetime_timestamp = datetime.datetime.strptime( 

468 request.context['timestamp'], SIGV4_TIMESTAMP 

469 ) 

470 request.headers['Date'] = formatdate( 

471 int(calendar.timegm(datetime_timestamp.timetuple())) 

472 ) 

473 if 'X-Amz-Date' in request.headers: 

474 del request.headers['X-Amz-Date'] 

475 else: 

476 if 'X-Amz-Date' in request.headers: 

477 del request.headers['X-Amz-Date'] 

478 request.headers['X-Amz-Date'] = request.context['timestamp'] 

479 

480 

481class S3SigV4Auth(SigV4Auth): 

482 def _modify_request_before_signing(self, request): 

483 super()._modify_request_before_signing(request) 

484 if 'X-Amz-Content-SHA256' in request.headers: 

485 del request.headers['X-Amz-Content-SHA256'] 

486 

487 request.headers['X-Amz-Content-SHA256'] = self.payload(request) 

488 

489 def _should_sha256_sign_payload(self, request): 

490 # S3 allows optional body signing, so to minimize the performance 

491 # impact, we opt to not SHA256 sign the body on streaming uploads, 

492 # provided that we're on https. 

493 client_config = request.context.get('client_config') 

494 s3_config = getattr(client_config, 's3', None) 

495 

496 # The config could be None if it isn't set, or if the customer sets it 

497 # to None. 

498 if s3_config is None: 

499 s3_config = {} 

500 

501 # The explicit configuration takes precedence over any implicit 

502 # configuration. 

503 sign_payload = s3_config.get('payload_signing_enabled', None) 

504 if sign_payload is not None: 

505 return sign_payload 

506 

507 # We require that both a checksum be present and https be enabled 

508 # to implicitly disable body signing. The combination of TLS and 

509 # a checksum is sufficiently secure and durable for us to be 

510 # confident in the request without body signing. 

511 checksum_header = 'Content-MD5' 

512 checksum_context = request.context.get('checksum', {}) 

513 algorithm = checksum_context.get('request_algorithm') 

514 if isinstance(algorithm, dict) and algorithm.get('in') == 'header': 

515 checksum_header = algorithm['name'] 

516 if ( 

517 not request.url.startswith("https") 

518 or checksum_header not in request.headers 

519 ): 

520 return True 

521 

522 # If the input is streaming we disable body signing by default. 

523 if request.context.get('has_streaming_input', False): 

524 return False 

525 

526 # If the S3-specific checks had no results, delegate to the generic 

527 # checks. 

528 return super()._should_sha256_sign_payload(request) 

529 

530 def _normalize_url_path(self, path): 

531 # For S3, we do not normalize the path. 

532 return path 

533 

534 

535class S3ExpressAuth(S3SigV4Auth): 

536 REQUIRES_IDENTITY_CACHE = True 

537 

538 def __init__( 

539 self, credentials, service_name, region_name, *, identity_cache 

540 ): 

541 super().__init__(credentials, service_name, region_name) 

542 self._identity_cache = identity_cache 

543 

544 def add_auth(self, request): 

545 super().add_auth(request) 

546 

547 def _modify_request_before_signing(self, request): 

548 super()._modify_request_before_signing(request) 

549 if 'x-amz-s3session-token' not in request.headers: 

550 request.headers['x-amz-s3session-token'] = self.credentials.token 

551 # S3Express does not support STS' X-Amz-Security-Token 

552 if 'X-Amz-Security-Token' in request.headers: 

553 del request.headers['X-Amz-Security-Token'] 

554 

555 

556class S3ExpressPostAuth(S3ExpressAuth): 

557 REQUIRES_IDENTITY_CACHE = True 

558 

559 def add_auth(self, request): 

560 datetime_now = datetime.datetime.utcnow() 

561 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

562 

563 fields = {} 

564 if request.context.get('s3-presign-post-fields', None) is not None: 

565 fields = request.context['s3-presign-post-fields'] 

566 

567 policy = {} 

568 conditions = [] 

569 if request.context.get('s3-presign-post-policy', None) is not None: 

570 policy = request.context['s3-presign-post-policy'] 

571 if policy.get('conditions', None) is not None: 

572 conditions = policy['conditions'] 

573 

574 policy['conditions'] = conditions 

575 

576 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

577 fields['x-amz-credential'] = self.scope(request) 

578 fields['x-amz-date'] = request.context['timestamp'] 

579 

580 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

581 conditions.append({'x-amz-credential': self.scope(request)}) 

582 conditions.append({'x-amz-date': request.context['timestamp']}) 

583 

584 if self.credentials.token is not None: 

585 fields['X-Amz-S3session-Token'] = self.credentials.token 

586 conditions.append( 

587 {'X-Amz-S3session-Token': self.credentials.token} 

588 ) 

589 

590 # Dump the base64 encoded policy into the fields dictionary. 

591 fields['policy'] = base64.b64encode( 

592 json.dumps(policy).encode('utf-8') 

593 ).decode('utf-8') 

594 

595 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

596 

597 request.context['s3-presign-post-fields'] = fields 

598 request.context['s3-presign-post-policy'] = policy 

599 

600 

601class S3ExpressQueryAuth(S3ExpressAuth): 

602 DEFAULT_EXPIRES = 300 

603 REQUIRES_IDENTITY_CACHE = True 

604 

605 def __init__( 

606 self, 

607 credentials, 

608 service_name, 

609 region_name, 

610 *, 

611 identity_cache, 

612 expires=DEFAULT_EXPIRES, 

613 ): 

614 super().__init__( 

615 credentials, 

616 service_name, 

617 region_name, 

618 identity_cache=identity_cache, 

619 ) 

620 self._expires = expires 

621 

622 def _modify_request_before_signing(self, request): 

623 # We automatically set this header, so if it's the auto-set value we 

624 # want to get rid of it since it doesn't make sense for presigned urls. 

625 content_type = request.headers.get('content-type') 

626 blocklisted_content_type = ( 

627 'application/x-www-form-urlencoded; charset=utf-8' 

628 ) 

629 if content_type == blocklisted_content_type: 

630 del request.headers['content-type'] 

631 

632 # Note that we're not including X-Amz-Signature. 

633 # From the docs: "The Canonical Query String must include all the query 

634 # parameters from the preceding table except for X-Amz-Signature. 

635 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

636 

637 auth_params = { 

638 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

639 'X-Amz-Credential': self.scope(request), 

640 'X-Amz-Date': request.context['timestamp'], 

641 'X-Amz-Expires': self._expires, 

642 'X-Amz-SignedHeaders': signed_headers, 

643 } 

644 if self.credentials.token is not None: 

645 auth_params['X-Amz-S3session-Token'] = self.credentials.token 

646 # Now parse the original query string to a dict, inject our new query 

647 # params, and serialize back to a query string. 

648 url_parts = urlsplit(request.url) 

649 # parse_qs makes each value a list, but in our case we know we won't 

650 # have repeated keys so we know we have single element lists which we 

651 # can convert back to scalar values. 

652 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

653 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

654 

655 if request.params: 

656 query_dict.update(request.params) 

657 request.params = {} 

658 # The spec is particular about this. It *has* to be: 

659 # https://<endpoint>?<operation params>&<auth params> 

660 # You can't mix the two types of params together, i.e just keep doing 

661 # new_query_params.update(op_params) 

662 # new_query_params.update(auth_params) 

663 # percent_encode_sequence(new_query_params) 

664 operation_params = '' 

665 if request.data: 

666 # We also need to move the body params into the query string. To 

667 # do this, we first have to convert it to a dict. 

668 query_dict.update(_get_body_as_dict(request)) 

669 request.data = '' 

670 if query_dict: 

671 operation_params = percent_encode_sequence(query_dict) + '&' 

672 new_query_string = ( 

673 f"{operation_params}{percent_encode_sequence(auth_params)}" 

674 ) 

675 # url_parts is a tuple (and therefore immutable) so we need to create 

676 # a new url_parts with the new query string. 

677 # <part> - <index> 

678 # scheme - 0 

679 # netloc - 1 

680 # path - 2 

681 # query - 3 <-- we're replacing this. 

682 # fragment - 4 

683 p = url_parts 

684 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

685 request.url = urlunsplit(new_url_parts) 

686 

687 def _inject_signature_to_request(self, request, signature): 

688 # Rather than calculating an "Authorization" header, for the query 

689 # param quth, we just append an 'X-Amz-Signature' param to the end 

690 # of the query string. 

691 request.url += f'&X-Amz-Signature={signature}' 

692 

693 def _normalize_url_path(self, path): 

694 # For S3, we do not normalize the path. 

695 return path 

696 

697 def payload(self, request): 

698 # From the doc link above: 

699 # "You don't include a payload hash in the Canonical Request, because 

700 # when you create a presigned URL, you don't know anything about the 

701 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

702 return UNSIGNED_PAYLOAD 

703 

704 

705class SigV4QueryAuth(SigV4Auth): 

706 DEFAULT_EXPIRES = 3600 

707 

708 def __init__( 

709 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES 

710 ): 

711 super().__init__(credentials, service_name, region_name) 

712 self._expires = expires 

713 

714 def _modify_request_before_signing(self, request): 

715 # We automatically set this header, so if it's the auto-set value we 

716 # want to get rid of it since it doesn't make sense for presigned urls. 

717 content_type = request.headers.get('content-type') 

718 blacklisted_content_type = ( 

719 'application/x-www-form-urlencoded; charset=utf-8' 

720 ) 

721 if content_type == blacklisted_content_type: 

722 del request.headers['content-type'] 

723 

724 # Note that we're not including X-Amz-Signature. 

725 # From the docs: "The Canonical Query String must include all the query 

726 # parameters from the preceding table except for X-Amz-Signature. 

727 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

728 

729 auth_params = { 

730 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

731 'X-Amz-Credential': self.scope(request), 

732 'X-Amz-Date': request.context['timestamp'], 

733 'X-Amz-Expires': self._expires, 

734 'X-Amz-SignedHeaders': signed_headers, 

735 } 

736 if self.credentials.token is not None: 

737 auth_params['X-Amz-Security-Token'] = self.credentials.token 

738 # Now parse the original query string to a dict, inject our new query 

739 # params, and serialize back to a query string. 

740 url_parts = urlsplit(request.url) 

741 # parse_qs makes each value a list, but in our case we know we won't 

742 # have repeated keys so we know we have single element lists which we 

743 # can convert back to scalar values. 

744 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

745 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

746 

747 if request.params: 

748 query_dict.update(request.params) 

749 request.params = {} 

750 # The spec is particular about this. It *has* to be: 

751 # https://<endpoint>?<operation params>&<auth params> 

752 # You can't mix the two types of params together, i.e just keep doing 

753 # new_query_params.update(op_params) 

754 # new_query_params.update(auth_params) 

755 # percent_encode_sequence(new_query_params) 

756 operation_params = '' 

757 if request.data: 

758 # We also need to move the body params into the query string. To 

759 # do this, we first have to convert it to a dict. 

760 query_dict.update(_get_body_as_dict(request)) 

761 request.data = '' 

762 if query_dict: 

763 operation_params = percent_encode_sequence(query_dict) + '&' 

764 new_query_string = ( 

765 f"{operation_params}{percent_encode_sequence(auth_params)}" 

766 ) 

767 # url_parts is a tuple (and therefore immutable) so we need to create 

768 # a new url_parts with the new query string. 

769 # <part> - <index> 

770 # scheme - 0 

771 # netloc - 1 

772 # path - 2 

773 # query - 3 <-- we're replacing this. 

774 # fragment - 4 

775 p = url_parts 

776 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

777 request.url = urlunsplit(new_url_parts) 

778 

779 def _inject_signature_to_request(self, request, signature): 

780 # Rather than calculating an "Authorization" header, for the query 

781 # param quth, we just append an 'X-Amz-Signature' param to the end 

782 # of the query string. 

783 request.url += f'&X-Amz-Signature={signature}' 

784 

785 

786class S3SigV4QueryAuth(SigV4QueryAuth): 

787 """S3 SigV4 auth using query parameters. 

788 

789 This signer will sign a request using query parameters and signature 

790 version 4, i.e a "presigned url" signer. 

791 

792 Based off of: 

793 

794 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html 

795 

796 """ 

797 

798 def _normalize_url_path(self, path): 

799 # For S3, we do not normalize the path. 

800 return path 

801 

802 def payload(self, request): 

803 # From the doc link above: 

804 # "You don't include a payload hash in the Canonical Request, because 

805 # when you create a presigned URL, you don't know anything about the 

806 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

807 return UNSIGNED_PAYLOAD 

808 

809 

810class S3SigV4PostAuth(SigV4Auth): 

811 """ 

812 Presigns a s3 post 

813 

814 Implementation doc here: 

815 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html 

816 """ 

817 

818 def add_auth(self, request): 

819 datetime_now = datetime.datetime.utcnow() 

820 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

821 

822 fields = {} 

823 if request.context.get('s3-presign-post-fields', None) is not None: 

824 fields = request.context['s3-presign-post-fields'] 

825 

826 policy = {} 

827 conditions = [] 

828 if request.context.get('s3-presign-post-policy', None) is not None: 

829 policy = request.context['s3-presign-post-policy'] 

830 if policy.get('conditions', None) is not None: 

831 conditions = policy['conditions'] 

832 

833 policy['conditions'] = conditions 

834 

835 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

836 fields['x-amz-credential'] = self.scope(request) 

837 fields['x-amz-date'] = request.context['timestamp'] 

838 

839 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

840 conditions.append({'x-amz-credential': self.scope(request)}) 

841 conditions.append({'x-amz-date': request.context['timestamp']}) 

842 

843 if self.credentials.token is not None: 

844 fields['x-amz-security-token'] = self.credentials.token 

845 conditions.append({'x-amz-security-token': self.credentials.token}) 

846 

847 # Dump the base64 encoded policy into the fields dictionary. 

848 fields['policy'] = base64.b64encode( 

849 json.dumps(policy).encode('utf-8') 

850 ).decode('utf-8') 

851 

852 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

853 

854 request.context['s3-presign-post-fields'] = fields 

855 request.context['s3-presign-post-policy'] = policy 

856 

857 

858class HmacV1Auth(BaseSigner): 

859 # List of Query String Arguments of Interest 

860 QSAOfInterest = [ 

861 'accelerate', 

862 'acl', 

863 'cors', 

864 'defaultObjectAcl', 

865 'location', 

866 'logging', 

867 'partNumber', 

868 'policy', 

869 'requestPayment', 

870 'torrent', 

871 'versioning', 

872 'versionId', 

873 'versions', 

874 'website', 

875 'uploads', 

876 'uploadId', 

877 'response-content-type', 

878 'response-content-language', 

879 'response-expires', 

880 'response-cache-control', 

881 'response-content-disposition', 

882 'response-content-encoding', 

883 'delete', 

884 'lifecycle', 

885 'tagging', 

886 'restore', 

887 'storageClass', 

888 'notification', 

889 'replication', 

890 'requestPayment', 

891 'analytics', 

892 'metrics', 

893 'inventory', 

894 'select', 

895 'select-type', 

896 'object-lock', 

897 ] 

898 

899 def __init__(self, credentials, service_name=None, region_name=None): 

900 self.credentials = credentials 

901 

902 def sign_string(self, string_to_sign): 

903 new_hmac = hmac.new( 

904 self.credentials.secret_key.encode('utf-8'), digestmod=sha1 

905 ) 

906 new_hmac.update(string_to_sign.encode('utf-8')) 

907 return encodebytes(new_hmac.digest()).strip().decode('utf-8') 

908 

909 def canonical_standard_headers(self, headers): 

910 interesting_headers = ['content-md5', 'content-type', 'date'] 

911 hoi = [] 

912 if 'Date' in headers: 

913 del headers['Date'] 

914 headers['Date'] = self._get_date() 

915 for ih in interesting_headers: 

916 found = False 

917 for key in headers: 

918 lk = key.lower() 

919 if headers[key] is not None and lk == ih: 

920 hoi.append(headers[key].strip()) 

921 found = True 

922 if not found: 

923 hoi.append('') 

924 return '\n'.join(hoi) 

925 

926 def canonical_custom_headers(self, headers): 

927 hoi = [] 

928 custom_headers = {} 

929 for key in headers: 

930 lk = key.lower() 

931 if headers[key] is not None: 

932 if lk.startswith('x-amz-'): 

933 custom_headers[lk] = ','.join( 

934 v.strip() for v in headers.get_all(key) 

935 ) 

936 sorted_header_keys = sorted(custom_headers.keys()) 

937 for key in sorted_header_keys: 

938 hoi.append(f"{key}:{custom_headers[key]}") 

939 return '\n'.join(hoi) 

940 

941 def unquote_v(self, nv): 

942 """ 

943 TODO: Do we need this? 

944 """ 

945 if len(nv) == 1: 

946 return nv 

947 else: 

948 return (nv[0], unquote(nv[1])) 

949 

950 def canonical_resource(self, split, auth_path=None): 

951 # don't include anything after the first ? in the resource... 

952 # unless it is one of the QSA of interest, defined above 

953 # NOTE: 

954 # The path in the canonical resource should always be the 

955 # full path including the bucket name, even for virtual-hosting 

956 # style addressing. The ``auth_path`` keeps track of the full 

957 # path for the canonical resource and would be passed in if 

958 # the client was using virtual-hosting style. 

959 if auth_path is not None: 

960 buf = auth_path 

961 else: 

962 buf = split.path 

963 if split.query: 

964 qsa = split.query.split('&') 

965 qsa = [a.split('=', 1) for a in qsa] 

966 qsa = [ 

967 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest 

968 ] 

969 if len(qsa) > 0: 

970 qsa.sort(key=itemgetter(0)) 

971 qsa = ['='.join(a) for a in qsa] 

972 buf += '?' 

973 buf += '&'.join(qsa) 

974 return buf 

975 

976 def canonical_string( 

977 self, method, split, headers, expires=None, auth_path=None 

978 ): 

979 cs = method.upper() + '\n' 

980 cs += self.canonical_standard_headers(headers) + '\n' 

981 custom_headers = self.canonical_custom_headers(headers) 

982 if custom_headers: 

983 cs += custom_headers + '\n' 

984 cs += self.canonical_resource(split, auth_path=auth_path) 

985 return cs 

986 

987 def get_signature( 

988 self, method, split, headers, expires=None, auth_path=None 

989 ): 

990 if self.credentials.token: 

991 del headers['x-amz-security-token'] 

992 headers['x-amz-security-token'] = self.credentials.token 

993 string_to_sign = self.canonical_string( 

994 method, split, headers, auth_path=auth_path 

995 ) 

996 logger.debug(f'StringToSign:\n{string_to_sign}') 

997 return self.sign_string(string_to_sign) 

998 

999 def add_auth(self, request): 

1000 if self.credentials is None: 

1001 raise NoCredentialsError 

1002 logger.debug("Calculating signature using hmacv1 auth.") 

1003 split = urlsplit(request.url) 

1004 logger.debug(f'HTTP request method: {request.method}') 

1005 signature = self.get_signature( 

1006 request.method, split, request.headers, auth_path=request.auth_path 

1007 ) 

1008 self._inject_signature(request, signature) 

1009 

1010 def _get_date(self): 

1011 return formatdate(usegmt=True) 

1012 

1013 def _inject_signature(self, request, signature): 

1014 if 'Authorization' in request.headers: 

1015 # We have to do this because request.headers is not 

1016 # normal dictionary. It has the (unintuitive) behavior 

1017 # of aggregating repeated setattr calls for the same 

1018 # key value. For example: 

1019 # headers['foo'] = 'a'; headers['foo'] = 'b' 

1020 # list(headers) will print ['foo', 'foo']. 

1021 del request.headers['Authorization'] 

1022 

1023 auth_header = f"AWS {self.credentials.access_key}:{signature}" 

1024 request.headers['Authorization'] = auth_header 

1025 

1026 

1027class HmacV1QueryAuth(HmacV1Auth): 

1028 """ 

1029 Generates a presigned request for s3. 

1030 

1031 Spec from this document: 

1032 

1033 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html 

1034 #RESTAuthenticationQueryStringAuth 

1035 

1036 """ 

1037 

1038 DEFAULT_EXPIRES = 3600 

1039 

1040 def __init__(self, credentials, expires=DEFAULT_EXPIRES): 

1041 self.credentials = credentials 

1042 self._expires = expires 

1043 

1044 def _get_date(self): 

1045 return str(int(time.time() + int(self._expires))) 

1046 

1047 def _inject_signature(self, request, signature): 

1048 query_dict = {} 

1049 query_dict['AWSAccessKeyId'] = self.credentials.access_key 

1050 query_dict['Signature'] = signature 

1051 

1052 for header_key in request.headers: 

1053 lk = header_key.lower() 

1054 # For query string requests, Expires is used instead of the 

1055 # Date header. 

1056 if header_key == 'Date': 

1057 query_dict['Expires'] = request.headers['Date'] 

1058 # We only want to include relevant headers in the query string. 

1059 # These can be anything that starts with x-amz, is Content-MD5, 

1060 # or is Content-Type. 

1061 elif lk.startswith('x-amz-') or lk in ( 

1062 'content-md5', 

1063 'content-type', 

1064 ): 

1065 query_dict[lk] = request.headers[lk] 

1066 # Combine all of the identified headers into an encoded 

1067 # query string 

1068 new_query_string = percent_encode_sequence(query_dict) 

1069 

1070 # Create a new url with the presigned url. 

1071 p = urlsplit(request.url) 

1072 if p[3]: 

1073 # If there was a pre-existing query string, we should 

1074 # add that back before injecting the new query string. 

1075 new_query_string = f'{p[3]}&{new_query_string}' 

1076 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

1077 request.url = urlunsplit(new_url_parts) 

1078 

1079 

1080class HmacV1PostAuth(HmacV1Auth): 

1081 """ 

1082 Generates a presigned post for s3. 

1083 

1084 Spec from this document: 

1085 

1086 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html 

1087 """ 

1088 

1089 def add_auth(self, request): 

1090 fields = {} 

1091 if request.context.get('s3-presign-post-fields', None) is not None: 

1092 fields = request.context['s3-presign-post-fields'] 

1093 

1094 policy = {} 

1095 conditions = [] 

1096 if request.context.get('s3-presign-post-policy', None) is not None: 

1097 policy = request.context['s3-presign-post-policy'] 

1098 if policy.get('conditions', None) is not None: 

1099 conditions = policy['conditions'] 

1100 

1101 policy['conditions'] = conditions 

1102 

1103 fields['AWSAccessKeyId'] = self.credentials.access_key 

1104 

1105 if self.credentials.token is not None: 

1106 fields['x-amz-security-token'] = self.credentials.token 

1107 conditions.append({'x-amz-security-token': self.credentials.token}) 

1108 

1109 # Dump the base64 encoded policy into the fields dictionary. 

1110 fields['policy'] = base64.b64encode( 

1111 json.dumps(policy).encode('utf-8') 

1112 ).decode('utf-8') 

1113 

1114 fields['signature'] = self.sign_string(fields['policy']) 

1115 

1116 request.context['s3-presign-post-fields'] = fields 

1117 request.context['s3-presign-post-policy'] = policy 

1118 

1119 

1120class BearerAuth(TokenSigner): 

1121 """ 

1122 Performs bearer token authorization by placing the bearer token in the 

1123 Authorization header as specified by Section 2.1 of RFC 6750. 

1124 

1125 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1 

1126 """ 

1127 

1128 def add_auth(self, request): 

1129 if self.auth_token is None: 

1130 raise NoAuthTokenError() 

1131 

1132 auth_header = f'Bearer {self.auth_token.token}' 

1133 if 'Authorization' in request.headers: 

1134 del request.headers['Authorization'] 

1135 request.headers['Authorization'] = auth_header 

1136 

1137 

1138def resolve_auth_type(auth_trait): 

1139 for auth_type in auth_trait: 

1140 if auth_type == 'smithy.api#noAuth': 

1141 return AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type] 

1142 elif auth_type in AUTH_TYPE_TO_SIGNATURE_VERSION: 

1143 signature_version = AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type] 

1144 if signature_version in AUTH_TYPE_MAPS: 

1145 return signature_version 

1146 else: 

1147 raise UnknownSignatureVersionError(signature_version=auth_type) 

1148 raise UnsupportedSignatureVersionError(signature_version=auth_trait) 

1149 

1150 

1151AUTH_TYPE_MAPS = { 

1152 'v2': SigV2Auth, 

1153 'v3': SigV3Auth, 

1154 'v3https': SigV3Auth, 

1155 's3': HmacV1Auth, 

1156 's3-query': HmacV1QueryAuth, 

1157 's3-presign-post': HmacV1PostAuth, 

1158 's3v4-presign-post': S3SigV4PostAuth, 

1159 'v4-s3express': S3ExpressAuth, 

1160 'v4-s3express-query': S3ExpressQueryAuth, 

1161 'v4-s3express-presign-post': S3ExpressPostAuth, 

1162 'bearer': BearerAuth, 

1163} 

1164 

1165# Define v4 signers depending on if CRT is present 

1166if HAS_CRT: 

1167 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS 

1168 

1169 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS) 

1170else: 

1171 AUTH_TYPE_MAPS.update( 

1172 { 

1173 'v4': SigV4Auth, 

1174 'v4-query': SigV4QueryAuth, 

1175 's3v4': S3SigV4Auth, 

1176 's3v4-query': S3SigV4QueryAuth, 

1177 } 

1178 ) 

1179 

1180AUTH_TYPE_TO_SIGNATURE_VERSION = { 

1181 'aws.auth#sigv4': 'v4', 

1182 'aws.auth#sigv4a': 'v4a', 

1183 'smithy.api#httpBearerAuth': 'bearer', 

1184 'smithy.api#noAuth': 'none', 

1185}