Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/auth.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

600 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import base64 

15import calendar 

16import datetime 

17import functools 

18import hmac 

19import json 

20import logging 

21import time 

22from collections.abc import Mapping 

23from email.utils import formatdate 

24from hashlib import sha1, sha256 

25from operator import itemgetter 

26 

27from botocore.compat import ( 

28 HAS_CRT, 

29 HTTPHeaders, 

30 encodebytes, 

31 ensure_unicode, 

32 parse_qs, 

33 quote, 

34 unquote, 

35 urlsplit, 

36 urlunsplit, 

37) 

38from botocore.exceptions import ( 

39 NoAuthTokenError, 

40 NoCredentialsError, 

41 UnknownSignatureVersionError, 

42 UnsupportedSignatureVersionError, 

43) 

44from botocore.utils import ( 

45 is_valid_ipv6_endpoint_url, 

46 normalize_url_path, 

47 percent_encode_sequence, 

48) 

49 

50# Imports for backwards compatibility 

51from botocore.compat import MD5_AVAILABLE # noqa 

52 

53 

54logger = logging.getLogger(__name__) 

55 

56 

57EMPTY_SHA256_HASH = ( 

58 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' 

59) 

60# This is the buffer size used when calculating sha256 checksums. 

61# Experimenting with various buffer sizes showed that this value generally 

62# gave the best result (in terms of performance). 

63PAYLOAD_BUFFER = 1024 * 1024 

64ISO8601 = '%Y-%m-%dT%H:%M:%SZ' 

65SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ' 

66SIGNED_HEADERS_BLACKLIST = [ 

67 'expect', 

68 'user-agent', 

69 'x-amzn-trace-id', 

70] 

71UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD' 

72STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER' 

73 

74 

75def _host_from_url(url): 

76 # Given URL, derive value for host header. Ensure that value: 

77 # 1) is lowercase 

78 # 2) excludes port, if it was the default port 

79 # 3) excludes userinfo 

80 url_parts = urlsplit(url) 

81 host = url_parts.hostname # urlsplit's hostname is always lowercase 

82 if is_valid_ipv6_endpoint_url(url): 

83 host = f'[{host}]' 

84 default_ports = { 

85 'http': 80, 

86 'https': 443, 

87 } 

88 if url_parts.port is not None: 

89 if url_parts.port != default_ports.get(url_parts.scheme): 

90 host = '%s:%d' % (host, url_parts.port) 

91 return host 

92 

93 

94def _get_body_as_dict(request): 

95 # For query services, request.data is form-encoded and is already a 

96 # dict, but for other services such as rest-json it could be a json 

97 # string or bytes. In those cases we attempt to load the data as a 

98 # dict. 

99 data = request.data 

100 if isinstance(data, bytes): 

101 data = json.loads(data.decode('utf-8')) 

102 elif isinstance(data, str): 

103 data = json.loads(data) 

104 return data 

105 

106 

107class BaseSigner: 

108 REQUIRES_REGION = False 

109 REQUIRES_TOKEN = False 

110 

111 def add_auth(self, request): 

112 raise NotImplementedError("add_auth") 

113 

114 

115class TokenSigner(BaseSigner): 

116 REQUIRES_TOKEN = True 

117 """ 

118 Signers that expect an authorization token to perform the authorization 

119 """ 

120 

121 def __init__(self, auth_token): 

122 self.auth_token = auth_token 

123 

124 

125class SigV2Auth(BaseSigner): 

126 """ 

127 Sign a request with Signature V2. 

128 """ 

129 

130 def __init__(self, credentials): 

131 self.credentials = credentials 

132 

133 def calc_signature(self, request, params): 

134 logger.debug("Calculating signature using v2 auth.") 

135 split = urlsplit(request.url) 

136 path = split.path 

137 if len(path) == 0: 

138 path = '/' 

139 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n" 

140 lhmac = hmac.new( 

141 self.credentials.secret_key.encode("utf-8"), digestmod=sha256 

142 ) 

143 pairs = [] 

144 for key in sorted(params): 

145 # Any previous signature should not be a part of this 

146 # one, so we skip that particular key. This prevents 

147 # issues during retries. 

148 if key == 'Signature': 

149 continue 

150 value = str(params[key]) 

151 quoted_key = quote(key.encode('utf-8'), safe='') 

152 quoted_value = quote(value.encode('utf-8'), safe='-_~') 

153 pairs.append(f'{quoted_key}={quoted_value}') 

154 qs = '&'.join(pairs) 

155 string_to_sign += qs 

156 logger.debug('String to sign: %s', string_to_sign) 

157 lhmac.update(string_to_sign.encode('utf-8')) 

158 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8') 

159 return (qs, b64) 

160 

161 def add_auth(self, request): 

162 # The auth handler is the last thing called in the 

163 # preparation phase of a prepared request. 

164 # Because of this we have to parse the query params 

165 # from the request body so we can update them with 

166 # the sigv2 auth params. 

167 if self.credentials is None: 

168 raise NoCredentialsError() 

169 if request.data: 

170 # POST 

171 params = request.data 

172 else: 

173 # GET 

174 params = request.params 

175 params['AWSAccessKeyId'] = self.credentials.access_key 

176 params['SignatureVersion'] = '2' 

177 params['SignatureMethod'] = 'HmacSHA256' 

178 params['Timestamp'] = time.strftime(ISO8601, time.gmtime()) 

179 if self.credentials.token: 

180 params['SecurityToken'] = self.credentials.token 

181 qs, signature = self.calc_signature(request, params) 

182 params['Signature'] = signature 

183 return request 

184 

185 

186class SigV3Auth(BaseSigner): 

187 def __init__(self, credentials): 

188 self.credentials = credentials 

189 

190 def add_auth(self, request): 

191 if self.credentials is None: 

192 raise NoCredentialsError() 

193 if 'Date' in request.headers: 

194 del request.headers['Date'] 

195 request.headers['Date'] = formatdate(usegmt=True) 

196 if self.credentials.token: 

197 if 'X-Amz-Security-Token' in request.headers: 

198 del request.headers['X-Amz-Security-Token'] 

199 request.headers['X-Amz-Security-Token'] = self.credentials.token 

200 new_hmac = hmac.new( 

201 self.credentials.secret_key.encode('utf-8'), digestmod=sha256 

202 ) 

203 new_hmac.update(request.headers['Date'].encode('utf-8')) 

204 encoded_signature = encodebytes(new_hmac.digest()).strip() 

205 signature = ( 

206 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key}," 

207 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}" 

208 ) 

209 if 'X-Amzn-Authorization' in request.headers: 

210 del request.headers['X-Amzn-Authorization'] 

211 request.headers['X-Amzn-Authorization'] = signature 

212 

213 

214class SigV4Auth(BaseSigner): 

215 """ 

216 Sign a request with Signature V4. 

217 """ 

218 

219 REQUIRES_REGION = True 

220 

221 def __init__(self, credentials, service_name, region_name): 

222 self.credentials = credentials 

223 # We initialize these value here so the unit tests can have 

224 # valid values. But these will get overriden in ``add_auth`` 

225 # later for real requests. 

226 self._region_name = region_name 

227 self._service_name = service_name 

228 

229 def _sign(self, key, msg, hex=False): 

230 if hex: 

231 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest() 

232 else: 

233 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest() 

234 return sig 

235 

236 def headers_to_sign(self, request): 

237 """ 

238 Select the headers from the request that need to be included 

239 in the StringToSign. 

240 """ 

241 header_map = HTTPHeaders() 

242 for name, value in request.headers.items(): 

243 lname = name.lower() 

244 if lname not in SIGNED_HEADERS_BLACKLIST: 

245 header_map[lname] = value 

246 if 'host' not in header_map: 

247 # TODO: We should set the host ourselves, instead of relying on our 

248 # HTTP client to set it for us. 

249 header_map['host'] = _host_from_url(request.url) 

250 return header_map 

251 

252 def canonical_query_string(self, request): 

253 # The query string can come from two parts. One is the 

254 # params attribute of the request. The other is from the request 

255 # url (in which case we have to re-split the url into its components 

256 # and parse out the query string component). 

257 if request.params: 

258 return self._canonical_query_string_params(request.params) 

259 else: 

260 return self._canonical_query_string_url(urlsplit(request.url)) 

261 

262 def _canonical_query_string_params(self, params): 

263 # [(key, value), (key2, value2)] 

264 key_val_pairs = [] 

265 if isinstance(params, Mapping): 

266 params = params.items() 

267 for key, value in params: 

268 key_val_pairs.append( 

269 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~')) 

270 ) 

271 sorted_key_vals = [] 

272 # Sort by the URI-encoded key names, and in the case of 

273 # repeated keys, then sort by the value. 

274 for key, value in sorted(key_val_pairs): 

275 sorted_key_vals.append(f'{key}={value}') 

276 canonical_query_string = '&'.join(sorted_key_vals) 

277 return canonical_query_string 

278 

279 def _canonical_query_string_url(self, parts): 

280 canonical_query_string = '' 

281 if parts.query: 

282 # [(key, value), (key2, value2)] 

283 key_val_pairs = [] 

284 for pair in parts.query.split('&'): 

285 key, _, value = pair.partition('=') 

286 key_val_pairs.append((key, value)) 

287 sorted_key_vals = [] 

288 # Sort by the URI-encoded key names, and in the case of 

289 # repeated keys, then sort by the value. 

290 for key, value in sorted(key_val_pairs): 

291 sorted_key_vals.append(f'{key}={value}') 

292 canonical_query_string = '&'.join(sorted_key_vals) 

293 return canonical_query_string 

294 

295 def canonical_headers(self, headers_to_sign): 

296 """ 

297 Return the headers that need to be included in the StringToSign 

298 in their canonical form by converting all header keys to lower 

299 case, sorting them in alphabetical order and then joining 

300 them into a string, separated by newlines. 

301 """ 

302 headers = [] 

303 sorted_header_names = sorted(set(headers_to_sign)) 

304 for key in sorted_header_names: 

305 value = ','.join( 

306 self._header_value(v) for v in headers_to_sign.get_all(key) 

307 ) 

308 headers.append(f'{key}:{ensure_unicode(value)}') 

309 return '\n'.join(headers) 

310 

311 def _header_value(self, value): 

312 # From the sigv4 docs: 

313 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue) 

314 # 

315 # The Trimall function removes excess white space before and after 

316 # values, and converts sequential spaces to a single space. 

317 return ' '.join(value.split()) 

318 

319 def signed_headers(self, headers_to_sign): 

320 headers = sorted(n.lower().strip() for n in set(headers_to_sign)) 

321 return ';'.join(headers) 

322 

323 def _is_streaming_checksum_payload(self, request): 

324 checksum_context = request.context.get('checksum', {}) 

325 algorithm = checksum_context.get('request_algorithm') 

326 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer' 

327 

328 def payload(self, request): 

329 if self._is_streaming_checksum_payload(request): 

330 return STREAMING_UNSIGNED_PAYLOAD_TRAILER 

331 elif not self._should_sha256_sign_payload(request): 

332 # When payload signing is disabled, we use this static string in 

333 # place of the payload checksum. 

334 return UNSIGNED_PAYLOAD 

335 request_body = request.body 

336 if request_body and hasattr(request_body, 'seek'): 

337 position = request_body.tell() 

338 read_chunksize = functools.partial( 

339 request_body.read, PAYLOAD_BUFFER 

340 ) 

341 checksum = sha256() 

342 for chunk in iter(read_chunksize, b''): 

343 checksum.update(chunk) 

344 hex_checksum = checksum.hexdigest() 

345 request_body.seek(position) 

346 return hex_checksum 

347 elif request_body: 

348 # The request serialization has ensured that 

349 # request.body is a bytes() type. 

350 return sha256(request_body).hexdigest() 

351 else: 

352 return EMPTY_SHA256_HASH 

353 

354 def _should_sha256_sign_payload(self, request): 

355 # Payloads will always be signed over insecure connections. 

356 if not request.url.startswith('https'): 

357 return True 

358 

359 # Certain operations may have payload signing disabled by default. 

360 # Since we don't have access to the operation model, we pass in this 

361 # bit of metadata through the request context. 

362 return request.context.get('payload_signing_enabled', True) 

363 

364 def canonical_request(self, request): 

365 cr = [request.method.upper()] 

366 path = self._normalize_url_path(urlsplit(request.url).path) 

367 cr.append(path) 

368 cr.append(self.canonical_query_string(request)) 

369 headers_to_sign = self.headers_to_sign(request) 

370 cr.append(self.canonical_headers(headers_to_sign) + '\n') 

371 cr.append(self.signed_headers(headers_to_sign)) 

372 if 'X-Amz-Content-SHA256' in request.headers: 

373 body_checksum = request.headers['X-Amz-Content-SHA256'] 

374 else: 

375 body_checksum = self.payload(request) 

376 cr.append(body_checksum) 

377 return '\n'.join(cr) 

378 

379 def _normalize_url_path(self, path): 

380 normalized_path = quote(normalize_url_path(path), safe='/~') 

381 return normalized_path 

382 

383 def scope(self, request): 

384 scope = [self.credentials.access_key] 

385 scope.append(request.context['timestamp'][0:8]) 

386 scope.append(self._region_name) 

387 scope.append(self._service_name) 

388 scope.append('aws4_request') 

389 return '/'.join(scope) 

390 

391 def credential_scope(self, request): 

392 scope = [] 

393 scope.append(request.context['timestamp'][0:8]) 

394 scope.append(self._region_name) 

395 scope.append(self._service_name) 

396 scope.append('aws4_request') 

397 return '/'.join(scope) 

398 

399 def string_to_sign(self, request, canonical_request): 

400 """ 

401 Return the canonical StringToSign as well as a dict 

402 containing the original version of all headers that 

403 were included in the StringToSign. 

404 """ 

405 sts = ['AWS4-HMAC-SHA256'] 

406 sts.append(request.context['timestamp']) 

407 sts.append(self.credential_scope(request)) 

408 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest()) 

409 return '\n'.join(sts) 

410 

411 def signature(self, string_to_sign, request): 

412 key = self.credentials.secret_key 

413 k_date = self._sign( 

414 (f"AWS4{key}").encode(), request.context["timestamp"][0:8] 

415 ) 

416 k_region = self._sign(k_date, self._region_name) 

417 k_service = self._sign(k_region, self._service_name) 

418 k_signing = self._sign(k_service, 'aws4_request') 

419 return self._sign(k_signing, string_to_sign, hex=True) 

420 

421 def add_auth(self, request): 

422 if self.credentials is None: 

423 raise NoCredentialsError() 

424 datetime_now = datetime.datetime.utcnow() 

425 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

426 # This could be a retry. Make sure the previous 

427 # authorization header is removed first. 

428 self._modify_request_before_signing(request) 

429 canonical_request = self.canonical_request(request) 

430 logger.debug("Calculating signature using v4 auth.") 

431 logger.debug('CanonicalRequest:\n%s', canonical_request) 

432 string_to_sign = self.string_to_sign(request, canonical_request) 

433 logger.debug('StringToSign:\n%s', string_to_sign) 

434 signature = self.signature(string_to_sign, request) 

435 logger.debug('Signature:\n%s', signature) 

436 

437 self._inject_signature_to_request(request, signature) 

438 

439 def _inject_signature_to_request(self, request, signature): 

440 auth_str = [f'AWS4-HMAC-SHA256 Credential={self.scope(request)}'] 

441 headers_to_sign = self.headers_to_sign(request) 

442 auth_str.append( 

443 f"SignedHeaders={self.signed_headers(headers_to_sign)}" 

444 ) 

445 auth_str.append(f'Signature={signature}') 

446 request.headers['Authorization'] = ', '.join(auth_str) 

447 return request 

448 

449 def _modify_request_before_signing(self, request): 

450 if 'Authorization' in request.headers: 

451 del request.headers['Authorization'] 

452 self._set_necessary_date_headers(request) 

453 if self.credentials.token: 

454 if 'X-Amz-Security-Token' in request.headers: 

455 del request.headers['X-Amz-Security-Token'] 

456 request.headers['X-Amz-Security-Token'] = self.credentials.token 

457 

458 if not request.context.get('payload_signing_enabled', True): 

459 if 'X-Amz-Content-SHA256' in request.headers: 

460 del request.headers['X-Amz-Content-SHA256'] 

461 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD 

462 

463 def _set_necessary_date_headers(self, request): 

464 # The spec allows for either the Date _or_ the X-Amz-Date value to be 

465 # used so we check both. If there's a Date header, we use the date 

466 # header. Otherwise we use the X-Amz-Date header. 

467 if 'Date' in request.headers: 

468 del request.headers['Date'] 

469 datetime_timestamp = datetime.datetime.strptime( 

470 request.context['timestamp'], SIGV4_TIMESTAMP 

471 ) 

472 request.headers['Date'] = formatdate( 

473 int(calendar.timegm(datetime_timestamp.timetuple())) 

474 ) 

475 if 'X-Amz-Date' in request.headers: 

476 del request.headers['X-Amz-Date'] 

477 else: 

478 if 'X-Amz-Date' in request.headers: 

479 del request.headers['X-Amz-Date'] 

480 request.headers['X-Amz-Date'] = request.context['timestamp'] 

481 

482 

483class S3SigV4Auth(SigV4Auth): 

484 def _modify_request_before_signing(self, request): 

485 super()._modify_request_before_signing(request) 

486 if 'X-Amz-Content-SHA256' in request.headers: 

487 del request.headers['X-Amz-Content-SHA256'] 

488 

489 request.headers['X-Amz-Content-SHA256'] = self.payload(request) 

490 

491 def _should_sha256_sign_payload(self, request): 

492 # S3 allows optional body signing, so to minimize the performance 

493 # impact, we opt to not SHA256 sign the body on streaming uploads, 

494 # provided that we're on https. 

495 client_config = request.context.get('client_config') 

496 s3_config = getattr(client_config, 's3', None) 

497 

498 # The config could be None if it isn't set, or if the customer sets it 

499 # to None. 

500 if s3_config is None: 

501 s3_config = {} 

502 

503 # The explicit configuration takes precedence over any implicit 

504 # configuration. 

505 sign_payload = s3_config.get('payload_signing_enabled', None) 

506 if sign_payload is not None: 

507 return sign_payload 

508 

509 # We require that both a checksum be present and https be enabled 

510 # to implicitly disable body signing. The combination of TLS and 

511 # a checksum is sufficiently secure and durable for us to be 

512 # confident in the request without body signing. 

513 checksum_header = 'Content-MD5' 

514 checksum_context = request.context.get('checksum', {}) 

515 algorithm = checksum_context.get('request_algorithm') 

516 if isinstance(algorithm, dict) and algorithm.get('in') == 'header': 

517 checksum_header = algorithm['name'] 

518 if ( 

519 not request.url.startswith("https") 

520 or checksum_header not in request.headers 

521 ): 

522 return True 

523 

524 # If the input is streaming we disable body signing by default. 

525 if request.context.get('has_streaming_input', False): 

526 return False 

527 

528 # If the S3-specific checks had no results, delegate to the generic 

529 # checks. 

530 return super()._should_sha256_sign_payload(request) 

531 

532 def _normalize_url_path(self, path): 

533 # For S3, we do not normalize the path. 

534 return path 

535 

536 

537class S3ExpressAuth(S3SigV4Auth): 

538 REQUIRES_IDENTITY_CACHE = True 

539 

540 def __init__( 

541 self, credentials, service_name, region_name, *, identity_cache 

542 ): 

543 super().__init__(credentials, service_name, region_name) 

544 self._identity_cache = identity_cache 

545 

546 def add_auth(self, request): 

547 super().add_auth(request) 

548 

549 def _modify_request_before_signing(self, request): 

550 super()._modify_request_before_signing(request) 

551 if 'x-amz-s3session-token' not in request.headers: 

552 request.headers['x-amz-s3session-token'] = self.credentials.token 

553 # S3Express does not support STS' X-Amz-Security-Token 

554 if 'X-Amz-Security-Token' in request.headers: 

555 del request.headers['X-Amz-Security-Token'] 

556 

557 

558class S3ExpressPostAuth(S3ExpressAuth): 

559 REQUIRES_IDENTITY_CACHE = True 

560 

561 def add_auth(self, request): 

562 datetime_now = datetime.datetime.utcnow() 

563 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

564 

565 fields = {} 

566 if request.context.get('s3-presign-post-fields', None) is not None: 

567 fields = request.context['s3-presign-post-fields'] 

568 

569 policy = {} 

570 conditions = [] 

571 if request.context.get('s3-presign-post-policy', None) is not None: 

572 policy = request.context['s3-presign-post-policy'] 

573 if policy.get('conditions', None) is not None: 

574 conditions = policy['conditions'] 

575 

576 policy['conditions'] = conditions 

577 

578 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

579 fields['x-amz-credential'] = self.scope(request) 

580 fields['x-amz-date'] = request.context['timestamp'] 

581 

582 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

583 conditions.append({'x-amz-credential': self.scope(request)}) 

584 conditions.append({'x-amz-date': request.context['timestamp']}) 

585 

586 if self.credentials.token is not None: 

587 fields['X-Amz-S3session-Token'] = self.credentials.token 

588 conditions.append( 

589 {'X-Amz-S3session-Token': self.credentials.token} 

590 ) 

591 

592 # Dump the base64 encoded policy into the fields dictionary. 

593 fields['policy'] = base64.b64encode( 

594 json.dumps(policy).encode('utf-8') 

595 ).decode('utf-8') 

596 

597 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

598 

599 request.context['s3-presign-post-fields'] = fields 

600 request.context['s3-presign-post-policy'] = policy 

601 

602 

603class S3ExpressQueryAuth(S3ExpressAuth): 

604 DEFAULT_EXPIRES = 300 

605 REQUIRES_IDENTITY_CACHE = True 

606 

607 def __init__( 

608 self, 

609 credentials, 

610 service_name, 

611 region_name, 

612 *, 

613 identity_cache, 

614 expires=DEFAULT_EXPIRES, 

615 ): 

616 super().__init__( 

617 credentials, 

618 service_name, 

619 region_name, 

620 identity_cache=identity_cache, 

621 ) 

622 self._expires = expires 

623 

624 def _modify_request_before_signing(self, request): 

625 # We automatically set this header, so if it's the auto-set value we 

626 # want to get rid of it since it doesn't make sense for presigned urls. 

627 content_type = request.headers.get('content-type') 

628 blocklisted_content_type = ( 

629 'application/x-www-form-urlencoded; charset=utf-8' 

630 ) 

631 if content_type == blocklisted_content_type: 

632 del request.headers['content-type'] 

633 

634 # Note that we're not including X-Amz-Signature. 

635 # From the docs: "The Canonical Query String must include all the query 

636 # parameters from the preceding table except for X-Amz-Signature. 

637 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

638 

639 auth_params = { 

640 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

641 'X-Amz-Credential': self.scope(request), 

642 'X-Amz-Date': request.context['timestamp'], 

643 'X-Amz-Expires': self._expires, 

644 'X-Amz-SignedHeaders': signed_headers, 

645 } 

646 if self.credentials.token is not None: 

647 auth_params['X-Amz-S3session-Token'] = self.credentials.token 

648 # Now parse the original query string to a dict, inject our new query 

649 # params, and serialize back to a query string. 

650 url_parts = urlsplit(request.url) 

651 # parse_qs makes each value a list, but in our case we know we won't 

652 # have repeated keys so we know we have single element lists which we 

653 # can convert back to scalar values. 

654 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

655 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

656 

657 if request.params: 

658 query_dict.update(request.params) 

659 request.params = {} 

660 # The spec is particular about this. It *has* to be: 

661 # https://<endpoint>?<operation params>&<auth params> 

662 # You can't mix the two types of params together, i.e just keep doing 

663 # new_query_params.update(op_params) 

664 # new_query_params.update(auth_params) 

665 # percent_encode_sequence(new_query_params) 

666 operation_params = '' 

667 if request.data: 

668 # We also need to move the body params into the query string. To 

669 # do this, we first have to convert it to a dict. 

670 query_dict.update(_get_body_as_dict(request)) 

671 request.data = '' 

672 if query_dict: 

673 operation_params = percent_encode_sequence(query_dict) + '&' 

674 new_query_string = ( 

675 f"{operation_params}{percent_encode_sequence(auth_params)}" 

676 ) 

677 # url_parts is a tuple (and therefore immutable) so we need to create 

678 # a new url_parts with the new query string. 

679 # <part> - <index> 

680 # scheme - 0 

681 # netloc - 1 

682 # path - 2 

683 # query - 3 <-- we're replacing this. 

684 # fragment - 4 

685 p = url_parts 

686 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

687 request.url = urlunsplit(new_url_parts) 

688 

689 def _inject_signature_to_request(self, request, signature): 

690 # Rather than calculating an "Authorization" header, for the query 

691 # param quth, we just append an 'X-Amz-Signature' param to the end 

692 # of the query string. 

693 request.url += f'&X-Amz-Signature={signature}' 

694 

695 def _normalize_url_path(self, path): 

696 # For S3, we do not normalize the path. 

697 return path 

698 

699 def payload(self, request): 

700 # From the doc link above: 

701 # "You don't include a payload hash in the Canonical Request, because 

702 # when you create a presigned URL, you don't know anything about the 

703 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

704 return UNSIGNED_PAYLOAD 

705 

706 

707class SigV4QueryAuth(SigV4Auth): 

708 DEFAULT_EXPIRES = 3600 

709 

710 def __init__( 

711 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES 

712 ): 

713 super().__init__(credentials, service_name, region_name) 

714 self._expires = expires 

715 

716 def _modify_request_before_signing(self, request): 

717 # We automatically set this header, so if it's the auto-set value we 

718 # want to get rid of it since it doesn't make sense for presigned urls. 

719 content_type = request.headers.get('content-type') 

720 blacklisted_content_type = ( 

721 'application/x-www-form-urlencoded; charset=utf-8' 

722 ) 

723 if content_type == blacklisted_content_type: 

724 del request.headers['content-type'] 

725 

726 # Note that we're not including X-Amz-Signature. 

727 # From the docs: "The Canonical Query String must include all the query 

728 # parameters from the preceding table except for X-Amz-Signature. 

729 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

730 

731 auth_params = { 

732 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

733 'X-Amz-Credential': self.scope(request), 

734 'X-Amz-Date': request.context['timestamp'], 

735 'X-Amz-Expires': self._expires, 

736 'X-Amz-SignedHeaders': signed_headers, 

737 } 

738 if self.credentials.token is not None: 

739 auth_params['X-Amz-Security-Token'] = self.credentials.token 

740 # Now parse the original query string to a dict, inject our new query 

741 # params, and serialize back to a query string. 

742 url_parts = urlsplit(request.url) 

743 # parse_qs makes each value a list, but in our case we know we won't 

744 # have repeated keys so we know we have single element lists which we 

745 # can convert back to scalar values. 

746 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

747 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

748 

749 if request.params: 

750 query_dict.update(request.params) 

751 request.params = {} 

752 # The spec is particular about this. It *has* to be: 

753 # https://<endpoint>?<operation params>&<auth params> 

754 # You can't mix the two types of params together, i.e just keep doing 

755 # new_query_params.update(op_params) 

756 # new_query_params.update(auth_params) 

757 # percent_encode_sequence(new_query_params) 

758 operation_params = '' 

759 if request.data: 

760 # We also need to move the body params into the query string. To 

761 # do this, we first have to convert it to a dict. 

762 query_dict.update(_get_body_as_dict(request)) 

763 request.data = '' 

764 if query_dict: 

765 operation_params = percent_encode_sequence(query_dict) + '&' 

766 new_query_string = ( 

767 f"{operation_params}{percent_encode_sequence(auth_params)}" 

768 ) 

769 # url_parts is a tuple (and therefore immutable) so we need to create 

770 # a new url_parts with the new query string. 

771 # <part> - <index> 

772 # scheme - 0 

773 # netloc - 1 

774 # path - 2 

775 # query - 3 <-- we're replacing this. 

776 # fragment - 4 

777 p = url_parts 

778 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

779 request.url = urlunsplit(new_url_parts) 

780 

781 def _inject_signature_to_request(self, request, signature): 

782 # Rather than calculating an "Authorization" header, for the query 

783 # param quth, we just append an 'X-Amz-Signature' param to the end 

784 # of the query string. 

785 request.url += f'&X-Amz-Signature={signature}' 

786 

787 

788class S3SigV4QueryAuth(SigV4QueryAuth): 

789 """S3 SigV4 auth using query parameters. 

790 

791 This signer will sign a request using query parameters and signature 

792 version 4, i.e a "presigned url" signer. 

793 

794 Based off of: 

795 

796 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html 

797 

798 """ 

799 

800 def _normalize_url_path(self, path): 

801 # For S3, we do not normalize the path. 

802 return path 

803 

804 def payload(self, request): 

805 # From the doc link above: 

806 # "You don't include a payload hash in the Canonical Request, because 

807 # when you create a presigned URL, you don't know anything about the 

808 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

809 return UNSIGNED_PAYLOAD 

810 

811 

812class S3SigV4PostAuth(SigV4Auth): 

813 """ 

814 Presigns a s3 post 

815 

816 Implementation doc here: 

817 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html 

818 """ 

819 

820 def add_auth(self, request): 

821 datetime_now = datetime.datetime.utcnow() 

822 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

823 

824 fields = {} 

825 if request.context.get('s3-presign-post-fields', None) is not None: 

826 fields = request.context['s3-presign-post-fields'] 

827 

828 policy = {} 

829 conditions = [] 

830 if request.context.get('s3-presign-post-policy', None) is not None: 

831 policy = request.context['s3-presign-post-policy'] 

832 if policy.get('conditions', None) is not None: 

833 conditions = policy['conditions'] 

834 

835 policy['conditions'] = conditions 

836 

837 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

838 fields['x-amz-credential'] = self.scope(request) 

839 fields['x-amz-date'] = request.context['timestamp'] 

840 

841 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

842 conditions.append({'x-amz-credential': self.scope(request)}) 

843 conditions.append({'x-amz-date': request.context['timestamp']}) 

844 

845 if self.credentials.token is not None: 

846 fields['x-amz-security-token'] = self.credentials.token 

847 conditions.append({'x-amz-security-token': self.credentials.token}) 

848 

849 # Dump the base64 encoded policy into the fields dictionary. 

850 fields['policy'] = base64.b64encode( 

851 json.dumps(policy).encode('utf-8') 

852 ).decode('utf-8') 

853 

854 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

855 

856 request.context['s3-presign-post-fields'] = fields 

857 request.context['s3-presign-post-policy'] = policy 

858 

859 

860class HmacV1Auth(BaseSigner): 

861 # List of Query String Arguments of Interest 

862 QSAOfInterest = [ 

863 'accelerate', 

864 'acl', 

865 'cors', 

866 'defaultObjectAcl', 

867 'location', 

868 'logging', 

869 'partNumber', 

870 'policy', 

871 'requestPayment', 

872 'torrent', 

873 'versioning', 

874 'versionId', 

875 'versions', 

876 'website', 

877 'uploads', 

878 'uploadId', 

879 'response-content-type', 

880 'response-content-language', 

881 'response-expires', 

882 'response-cache-control', 

883 'response-content-disposition', 

884 'response-content-encoding', 

885 'delete', 

886 'lifecycle', 

887 'tagging', 

888 'restore', 

889 'storageClass', 

890 'notification', 

891 'replication', 

892 'requestPayment', 

893 'analytics', 

894 'metrics', 

895 'inventory', 

896 'select', 

897 'select-type', 

898 'object-lock', 

899 ] 

900 

901 def __init__(self, credentials, service_name=None, region_name=None): 

902 self.credentials = credentials 

903 

904 def sign_string(self, string_to_sign): 

905 new_hmac = hmac.new( 

906 self.credentials.secret_key.encode('utf-8'), digestmod=sha1 

907 ) 

908 new_hmac.update(string_to_sign.encode('utf-8')) 

909 return encodebytes(new_hmac.digest()).strip().decode('utf-8') 

910 

911 def canonical_standard_headers(self, headers): 

912 interesting_headers = ['content-md5', 'content-type', 'date'] 

913 hoi = [] 

914 if 'Date' in headers: 

915 del headers['Date'] 

916 headers['Date'] = self._get_date() 

917 for ih in interesting_headers: 

918 found = False 

919 for key in headers: 

920 lk = key.lower() 

921 if headers[key] is not None and lk == ih: 

922 hoi.append(headers[key].strip()) 

923 found = True 

924 if not found: 

925 hoi.append('') 

926 return '\n'.join(hoi) 

927 

928 def canonical_custom_headers(self, headers): 

929 hoi = [] 

930 custom_headers = {} 

931 for key in headers: 

932 lk = key.lower() 

933 if headers[key] is not None: 

934 if lk.startswith('x-amz-'): 

935 custom_headers[lk] = ','.join( 

936 v.strip() for v in headers.get_all(key) 

937 ) 

938 sorted_header_keys = sorted(custom_headers.keys()) 

939 for key in sorted_header_keys: 

940 hoi.append(f"{key}:{custom_headers[key]}") 

941 return '\n'.join(hoi) 

942 

943 def unquote_v(self, nv): 

944 """ 

945 TODO: Do we need this? 

946 """ 

947 if len(nv) == 1: 

948 return nv 

949 else: 

950 return (nv[0], unquote(nv[1])) 

951 

952 def canonical_resource(self, split, auth_path=None): 

953 # don't include anything after the first ? in the resource... 

954 # unless it is one of the QSA of interest, defined above 

955 # NOTE: 

956 # The path in the canonical resource should always be the 

957 # full path including the bucket name, even for virtual-hosting 

958 # style addressing. The ``auth_path`` keeps track of the full 

959 # path for the canonical resource and would be passed in if 

960 # the client was using virtual-hosting style. 

961 if auth_path is not None: 

962 buf = auth_path 

963 else: 

964 buf = split.path 

965 if split.query: 

966 qsa = split.query.split('&') 

967 qsa = [a.split('=', 1) for a in qsa] 

968 qsa = [ 

969 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest 

970 ] 

971 if len(qsa) > 0: 

972 qsa.sort(key=itemgetter(0)) 

973 qsa = ['='.join(a) for a in qsa] 

974 buf += '?' 

975 buf += '&'.join(qsa) 

976 return buf 

977 

978 def canonical_string( 

979 self, method, split, headers, expires=None, auth_path=None 

980 ): 

981 cs = method.upper() + '\n' 

982 cs += self.canonical_standard_headers(headers) + '\n' 

983 custom_headers = self.canonical_custom_headers(headers) 

984 if custom_headers: 

985 cs += custom_headers + '\n' 

986 cs += self.canonical_resource(split, auth_path=auth_path) 

987 return cs 

988 

989 def get_signature( 

990 self, method, split, headers, expires=None, auth_path=None 

991 ): 

992 if self.credentials.token: 

993 del headers['x-amz-security-token'] 

994 headers['x-amz-security-token'] = self.credentials.token 

995 string_to_sign = self.canonical_string( 

996 method, split, headers, auth_path=auth_path 

997 ) 

998 logger.debug(f'StringToSign:\n{string_to_sign}') 

999 return self.sign_string(string_to_sign) 

1000 

1001 def add_auth(self, request): 

1002 if self.credentials is None: 

1003 raise NoCredentialsError 

1004 logger.debug("Calculating signature using hmacv1 auth.") 

1005 split = urlsplit(request.url) 

1006 logger.debug(f'HTTP request method: {request.method}') 

1007 signature = self.get_signature( 

1008 request.method, split, request.headers, auth_path=request.auth_path 

1009 ) 

1010 self._inject_signature(request, signature) 

1011 

1012 def _get_date(self): 

1013 return formatdate(usegmt=True) 

1014 

1015 def _inject_signature(self, request, signature): 

1016 if 'Authorization' in request.headers: 

1017 # We have to do this because request.headers is not 

1018 # normal dictionary. It has the (unintuitive) behavior 

1019 # of aggregating repeated setattr calls for the same 

1020 # key value. For example: 

1021 # headers['foo'] = 'a'; headers['foo'] = 'b' 

1022 # list(headers) will print ['foo', 'foo']. 

1023 del request.headers['Authorization'] 

1024 

1025 auth_header = f"AWS {self.credentials.access_key}:{signature}" 

1026 request.headers['Authorization'] = auth_header 

1027 

1028 

1029class HmacV1QueryAuth(HmacV1Auth): 

1030 """ 

1031 Generates a presigned request for s3. 

1032 

1033 Spec from this document: 

1034 

1035 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html 

1036 #RESTAuthenticationQueryStringAuth 

1037 

1038 """ 

1039 

1040 DEFAULT_EXPIRES = 3600 

1041 

1042 def __init__(self, credentials, expires=DEFAULT_EXPIRES): 

1043 self.credentials = credentials 

1044 self._expires = expires 

1045 

1046 def _get_date(self): 

1047 return str(int(time.time() + int(self._expires))) 

1048 

1049 def _inject_signature(self, request, signature): 

1050 query_dict = {} 

1051 query_dict['AWSAccessKeyId'] = self.credentials.access_key 

1052 query_dict['Signature'] = signature 

1053 

1054 for header_key in request.headers: 

1055 lk = header_key.lower() 

1056 # For query string requests, Expires is used instead of the 

1057 # Date header. 

1058 if header_key == 'Date': 

1059 query_dict['Expires'] = request.headers['Date'] 

1060 # We only want to include relevant headers in the query string. 

1061 # These can be anything that starts with x-amz, is Content-MD5, 

1062 # or is Content-Type. 

1063 elif lk.startswith('x-amz-') or lk in ( 

1064 'content-md5', 

1065 'content-type', 

1066 ): 

1067 query_dict[lk] = request.headers[lk] 

1068 # Combine all of the identified headers into an encoded 

1069 # query string 

1070 new_query_string = percent_encode_sequence(query_dict) 

1071 

1072 # Create a new url with the presigned url. 

1073 p = urlsplit(request.url) 

1074 if p[3]: 

1075 # If there was a pre-existing query string, we should 

1076 # add that back before injecting the new query string. 

1077 new_query_string = f'{p[3]}&{new_query_string}' 

1078 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

1079 request.url = urlunsplit(new_url_parts) 

1080 

1081 

1082class HmacV1PostAuth(HmacV1Auth): 

1083 """ 

1084 Generates a presigned post for s3. 

1085 

1086 Spec from this document: 

1087 

1088 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html 

1089 """ 

1090 

1091 def add_auth(self, request): 

1092 fields = {} 

1093 if request.context.get('s3-presign-post-fields', None) is not None: 

1094 fields = request.context['s3-presign-post-fields'] 

1095 

1096 policy = {} 

1097 conditions = [] 

1098 if request.context.get('s3-presign-post-policy', None) is not None: 

1099 policy = request.context['s3-presign-post-policy'] 

1100 if policy.get('conditions', None) is not None: 

1101 conditions = policy['conditions'] 

1102 

1103 policy['conditions'] = conditions 

1104 

1105 fields['AWSAccessKeyId'] = self.credentials.access_key 

1106 

1107 if self.credentials.token is not None: 

1108 fields['x-amz-security-token'] = self.credentials.token 

1109 conditions.append({'x-amz-security-token': self.credentials.token}) 

1110 

1111 # Dump the base64 encoded policy into the fields dictionary. 

1112 fields['policy'] = base64.b64encode( 

1113 json.dumps(policy).encode('utf-8') 

1114 ).decode('utf-8') 

1115 

1116 fields['signature'] = self.sign_string(fields['policy']) 

1117 

1118 request.context['s3-presign-post-fields'] = fields 

1119 request.context['s3-presign-post-policy'] = policy 

1120 

1121 

1122class BearerAuth(TokenSigner): 

1123 """ 

1124 Performs bearer token authorization by placing the bearer token in the 

1125 Authorization header as specified by Section 2.1 of RFC 6750. 

1126 

1127 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1 

1128 """ 

1129 

1130 def add_auth(self, request): 

1131 if self.auth_token is None: 

1132 raise NoAuthTokenError() 

1133 

1134 auth_header = f'Bearer {self.auth_token.token}' 

1135 if 'Authorization' in request.headers: 

1136 del request.headers['Authorization'] 

1137 request.headers['Authorization'] = auth_header 

1138 

1139 

1140def resolve_auth_type(auth_trait): 

1141 for auth_type in auth_trait: 

1142 if auth_type == 'smithy.api#noAuth': 

1143 return AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type] 

1144 elif auth_type in AUTH_TYPE_TO_SIGNATURE_VERSION: 

1145 signature_version = AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type] 

1146 if signature_version in AUTH_TYPE_MAPS: 

1147 return signature_version 

1148 else: 

1149 raise UnknownSignatureVersionError(signature_version=auth_type) 

1150 raise UnsupportedSignatureVersionError(signature_version=auth_trait) 

1151 

1152 

1153AUTH_TYPE_MAPS = { 

1154 'v2': SigV2Auth, 

1155 'v3': SigV3Auth, 

1156 'v3https': SigV3Auth, 

1157 's3': HmacV1Auth, 

1158 's3-query': HmacV1QueryAuth, 

1159 's3-presign-post': HmacV1PostAuth, 

1160 's3v4-presign-post': S3SigV4PostAuth, 

1161 'v4-s3express': S3ExpressAuth, 

1162 'v4-s3express-query': S3ExpressQueryAuth, 

1163 'v4-s3express-presign-post': S3ExpressPostAuth, 

1164 'bearer': BearerAuth, 

1165} 

1166 

1167# Define v4 signers depending on if CRT is present 

1168if HAS_CRT: 

1169 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS 

1170 

1171 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS) 

1172else: 

1173 AUTH_TYPE_MAPS.update( 

1174 { 

1175 'v4': SigV4Auth, 

1176 'v4-query': SigV4QueryAuth, 

1177 's3v4': S3SigV4Auth, 

1178 's3v4-query': S3SigV4QueryAuth, 

1179 } 

1180 ) 

1181 

1182AUTH_TYPE_TO_SIGNATURE_VERSION = { 

1183 'aws.auth#sigv4': 'v4', 

1184 'aws.auth#sigv4a': 'v4a', 

1185 'smithy.api#httpBearerAuth': 'bearer', 

1186 'smithy.api#noAuth': 'none', 

1187}