Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/auth.py: 20%

589 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14import base64 

15import calendar 

16import datetime 

17import functools 

18import hmac 

19import json 

20import logging 

21import time 

22from collections.abc import Mapping 

23from email.utils import formatdate 

24from hashlib import sha1, sha256 

25from operator import itemgetter 

26 

27from botocore.compat import ( 

28 HAS_CRT, 

29 HTTPHeaders, 

30 encodebytes, 

31 ensure_unicode, 

32 parse_qs, 

33 quote, 

34 unquote, 

35 urlsplit, 

36 urlunsplit, 

37) 

38from botocore.exceptions import NoAuthTokenError, NoCredentialsError 

39from botocore.utils import ( 

40 is_valid_ipv6_endpoint_url, 

41 normalize_url_path, 

42 percent_encode_sequence, 

43) 

44 

45# Imports for backwards compatibility 

46from botocore.compat import MD5_AVAILABLE # noqa 

47 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52EMPTY_SHA256_HASH = ( 

53 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' 

54) 

55# This is the buffer size used when calculating sha256 checksums. 

56# Experimenting with various buffer sizes showed that this value generally 

57# gave the best result (in terms of performance). 

58PAYLOAD_BUFFER = 1024 * 1024 

59ISO8601 = '%Y-%m-%dT%H:%M:%SZ' 

60SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ' 

61SIGNED_HEADERS_BLACKLIST = [ 

62 'expect', 

63 'user-agent', 

64 'x-amzn-trace-id', 

65] 

66UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD' 

67STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER' 

68 

69 

70def _host_from_url(url): 

71 # Given URL, derive value for host header. Ensure that value: 

72 # 1) is lowercase 

73 # 2) excludes port, if it was the default port 

74 # 3) excludes userinfo 

75 url_parts = urlsplit(url) 

76 host = url_parts.hostname # urlsplit's hostname is always lowercase 

77 if is_valid_ipv6_endpoint_url(url): 

78 host = f'[{host}]' 

79 default_ports = { 

80 'http': 80, 

81 'https': 443, 

82 } 

83 if url_parts.port is not None: 

84 if url_parts.port != default_ports.get(url_parts.scheme): 

85 host = '%s:%d' % (host, url_parts.port) 

86 return host 

87 

88 

89def _get_body_as_dict(request): 

90 # For query services, request.data is form-encoded and is already a 

91 # dict, but for other services such as rest-json it could be a json 

92 # string or bytes. In those cases we attempt to load the data as a 

93 # dict. 

94 data = request.data 

95 if isinstance(data, bytes): 

96 data = json.loads(data.decode('utf-8')) 

97 elif isinstance(data, str): 

98 data = json.loads(data) 

99 return data 

100 

101 

102class BaseSigner: 

103 REQUIRES_REGION = False 

104 REQUIRES_TOKEN = False 

105 

106 def add_auth(self, request): 

107 raise NotImplementedError("add_auth") 

108 

109 

110class TokenSigner(BaseSigner): 

111 REQUIRES_TOKEN = True 

112 """ 

113 Signers that expect an authorization token to perform the authorization 

114 """ 

115 

116 def __init__(self, auth_token): 

117 self.auth_token = auth_token 

118 

119 

120class SigV2Auth(BaseSigner): 

121 """ 

122 Sign a request with Signature V2. 

123 """ 

124 

125 def __init__(self, credentials): 

126 self.credentials = credentials 

127 

128 def calc_signature(self, request, params): 

129 logger.debug("Calculating signature using v2 auth.") 

130 split = urlsplit(request.url) 

131 path = split.path 

132 if len(path) == 0: 

133 path = '/' 

134 string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n" 

135 lhmac = hmac.new( 

136 self.credentials.secret_key.encode("utf-8"), digestmod=sha256 

137 ) 

138 pairs = [] 

139 for key in sorted(params): 

140 # Any previous signature should not be a part of this 

141 # one, so we skip that particular key. This prevents 

142 # issues during retries. 

143 if key == 'Signature': 

144 continue 

145 value = str(params[key]) 

146 quoted_key = quote(key.encode('utf-8'), safe='') 

147 quoted_value = quote(value.encode('utf-8'), safe='-_~') 

148 pairs.append(f'{quoted_key}={quoted_value}') 

149 qs = '&'.join(pairs) 

150 string_to_sign += qs 

151 logger.debug('String to sign: %s', string_to_sign) 

152 lhmac.update(string_to_sign.encode('utf-8')) 

153 b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8') 

154 return (qs, b64) 

155 

156 def add_auth(self, request): 

157 # The auth handler is the last thing called in the 

158 # preparation phase of a prepared request. 

159 # Because of this we have to parse the query params 

160 # from the request body so we can update them with 

161 # the sigv2 auth params. 

162 if self.credentials is None: 

163 raise NoCredentialsError() 

164 if request.data: 

165 # POST 

166 params = request.data 

167 else: 

168 # GET 

169 params = request.params 

170 params['AWSAccessKeyId'] = self.credentials.access_key 

171 params['SignatureVersion'] = '2' 

172 params['SignatureMethod'] = 'HmacSHA256' 

173 params['Timestamp'] = time.strftime(ISO8601, time.gmtime()) 

174 if self.credentials.token: 

175 params['SecurityToken'] = self.credentials.token 

176 qs, signature = self.calc_signature(request, params) 

177 params['Signature'] = signature 

178 return request 

179 

180 

181class SigV3Auth(BaseSigner): 

182 def __init__(self, credentials): 

183 self.credentials = credentials 

184 

185 def add_auth(self, request): 

186 if self.credentials is None: 

187 raise NoCredentialsError() 

188 if 'Date' in request.headers: 

189 del request.headers['Date'] 

190 request.headers['Date'] = formatdate(usegmt=True) 

191 if self.credentials.token: 

192 if 'X-Amz-Security-Token' in request.headers: 

193 del request.headers['X-Amz-Security-Token'] 

194 request.headers['X-Amz-Security-Token'] = self.credentials.token 

195 new_hmac = hmac.new( 

196 self.credentials.secret_key.encode('utf-8'), digestmod=sha256 

197 ) 

198 new_hmac.update(request.headers['Date'].encode('utf-8')) 

199 encoded_signature = encodebytes(new_hmac.digest()).strip() 

200 signature = ( 

201 f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key}," 

202 f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}" 

203 ) 

204 if 'X-Amzn-Authorization' in request.headers: 

205 del request.headers['X-Amzn-Authorization'] 

206 request.headers['X-Amzn-Authorization'] = signature 

207 

208 

209class SigV4Auth(BaseSigner): 

210 """ 

211 Sign a request with Signature V4. 

212 """ 

213 

214 REQUIRES_REGION = True 

215 

216 def __init__(self, credentials, service_name, region_name): 

217 self.credentials = credentials 

218 # We initialize these value here so the unit tests can have 

219 # valid values. But these will get overriden in ``add_auth`` 

220 # later for real requests. 

221 self._region_name = region_name 

222 self._service_name = service_name 

223 

224 def _sign(self, key, msg, hex=False): 

225 if hex: 

226 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest() 

227 else: 

228 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest() 

229 return sig 

230 

231 def headers_to_sign(self, request): 

232 """ 

233 Select the headers from the request that need to be included 

234 in the StringToSign. 

235 """ 

236 header_map = HTTPHeaders() 

237 for name, value in request.headers.items(): 

238 lname = name.lower() 

239 if lname not in SIGNED_HEADERS_BLACKLIST: 

240 header_map[lname] = value 

241 if 'host' not in header_map: 

242 # TODO: We should set the host ourselves, instead of relying on our 

243 # HTTP client to set it for us. 

244 header_map['host'] = _host_from_url(request.url) 

245 return header_map 

246 

247 def canonical_query_string(self, request): 

248 # The query string can come from two parts. One is the 

249 # params attribute of the request. The other is from the request 

250 # url (in which case we have to re-split the url into its components 

251 # and parse out the query string component). 

252 if request.params: 

253 return self._canonical_query_string_params(request.params) 

254 else: 

255 return self._canonical_query_string_url(urlsplit(request.url)) 

256 

257 def _canonical_query_string_params(self, params): 

258 # [(key, value), (key2, value2)] 

259 key_val_pairs = [] 

260 if isinstance(params, Mapping): 

261 params = params.items() 

262 for key, value in params: 

263 key_val_pairs.append( 

264 (quote(key, safe='-_.~'), quote(str(value), safe='-_.~')) 

265 ) 

266 sorted_key_vals = [] 

267 # Sort by the URI-encoded key names, and in the case of 

268 # repeated keys, then sort by the value. 

269 for key, value in sorted(key_val_pairs): 

270 sorted_key_vals.append(f'{key}={value}') 

271 canonical_query_string = '&'.join(sorted_key_vals) 

272 return canonical_query_string 

273 

274 def _canonical_query_string_url(self, parts): 

275 canonical_query_string = '' 

276 if parts.query: 

277 # [(key, value), (key2, value2)] 

278 key_val_pairs = [] 

279 for pair in parts.query.split('&'): 

280 key, _, value = pair.partition('=') 

281 key_val_pairs.append((key, value)) 

282 sorted_key_vals = [] 

283 # Sort by the URI-encoded key names, and in the case of 

284 # repeated keys, then sort by the value. 

285 for key, value in sorted(key_val_pairs): 

286 sorted_key_vals.append(f'{key}={value}') 

287 canonical_query_string = '&'.join(sorted_key_vals) 

288 return canonical_query_string 

289 

290 def canonical_headers(self, headers_to_sign): 

291 """ 

292 Return the headers that need to be included in the StringToSign 

293 in their canonical form by converting all header keys to lower 

294 case, sorting them in alphabetical order and then joining 

295 them into a string, separated by newlines. 

296 """ 

297 headers = [] 

298 sorted_header_names = sorted(set(headers_to_sign)) 

299 for key in sorted_header_names: 

300 value = ','.join( 

301 self._header_value(v) for v in headers_to_sign.get_all(key) 

302 ) 

303 headers.append(f'{key}:{ensure_unicode(value)}') 

304 return '\n'.join(headers) 

305 

306 def _header_value(self, value): 

307 # From the sigv4 docs: 

308 # Lowercase(HeaderName) + ':' + Trimall(HeaderValue) 

309 # 

310 # The Trimall function removes excess white space before and after 

311 # values, and converts sequential spaces to a single space. 

312 return ' '.join(value.split()) 

313 

314 def signed_headers(self, headers_to_sign): 

315 headers = sorted(n.lower().strip() for n in set(headers_to_sign)) 

316 return ';'.join(headers) 

317 

318 def _is_streaming_checksum_payload(self, request): 

319 checksum_context = request.context.get('checksum', {}) 

320 algorithm = checksum_context.get('request_algorithm') 

321 return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer' 

322 

323 def payload(self, request): 

324 if self._is_streaming_checksum_payload(request): 

325 return STREAMING_UNSIGNED_PAYLOAD_TRAILER 

326 elif not self._should_sha256_sign_payload(request): 

327 # When payload signing is disabled, we use this static string in 

328 # place of the payload checksum. 

329 return UNSIGNED_PAYLOAD 

330 request_body = request.body 

331 if request_body and hasattr(request_body, 'seek'): 

332 position = request_body.tell() 

333 read_chunksize = functools.partial( 

334 request_body.read, PAYLOAD_BUFFER 

335 ) 

336 checksum = sha256() 

337 for chunk in iter(read_chunksize, b''): 

338 checksum.update(chunk) 

339 hex_checksum = checksum.hexdigest() 

340 request_body.seek(position) 

341 return hex_checksum 

342 elif request_body: 

343 # The request serialization has ensured that 

344 # request.body is a bytes() type. 

345 return sha256(request_body).hexdigest() 

346 else: 

347 return EMPTY_SHA256_HASH 

348 

349 def _should_sha256_sign_payload(self, request): 

350 # Payloads will always be signed over insecure connections. 

351 if not request.url.startswith('https'): 

352 return True 

353 

354 # Certain operations may have payload signing disabled by default. 

355 # Since we don't have access to the operation model, we pass in this 

356 # bit of metadata through the request context. 

357 return request.context.get('payload_signing_enabled', True) 

358 

359 def canonical_request(self, request): 

360 cr = [request.method.upper()] 

361 path = self._normalize_url_path(urlsplit(request.url).path) 

362 cr.append(path) 

363 cr.append(self.canonical_query_string(request)) 

364 headers_to_sign = self.headers_to_sign(request) 

365 cr.append(self.canonical_headers(headers_to_sign) + '\n') 

366 cr.append(self.signed_headers(headers_to_sign)) 

367 if 'X-Amz-Content-SHA256' in request.headers: 

368 body_checksum = request.headers['X-Amz-Content-SHA256'] 

369 else: 

370 body_checksum = self.payload(request) 

371 cr.append(body_checksum) 

372 return '\n'.join(cr) 

373 

374 def _normalize_url_path(self, path): 

375 normalized_path = quote(normalize_url_path(path), safe='/~') 

376 return normalized_path 

377 

378 def scope(self, request): 

379 scope = [self.credentials.access_key] 

380 scope.append(request.context['timestamp'][0:8]) 

381 scope.append(self._region_name) 

382 scope.append(self._service_name) 

383 scope.append('aws4_request') 

384 return '/'.join(scope) 

385 

386 def credential_scope(self, request): 

387 scope = [] 

388 scope.append(request.context['timestamp'][0:8]) 

389 scope.append(self._region_name) 

390 scope.append(self._service_name) 

391 scope.append('aws4_request') 

392 return '/'.join(scope) 

393 

394 def string_to_sign(self, request, canonical_request): 

395 """ 

396 Return the canonical StringToSign as well as a dict 

397 containing the original version of all headers that 

398 were included in the StringToSign. 

399 """ 

400 sts = ['AWS4-HMAC-SHA256'] 

401 sts.append(request.context['timestamp']) 

402 sts.append(self.credential_scope(request)) 

403 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest()) 

404 return '\n'.join(sts) 

405 

406 def signature(self, string_to_sign, request): 

407 key = self.credentials.secret_key 

408 k_date = self._sign( 

409 (f"AWS4{key}").encode(), request.context["timestamp"][0:8] 

410 ) 

411 k_region = self._sign(k_date, self._region_name) 

412 k_service = self._sign(k_region, self._service_name) 

413 k_signing = self._sign(k_service, 'aws4_request') 

414 return self._sign(k_signing, string_to_sign, hex=True) 

415 

416 def add_auth(self, request): 

417 if self.credentials is None: 

418 raise NoCredentialsError() 

419 datetime_now = datetime.datetime.utcnow() 

420 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

421 # This could be a retry. Make sure the previous 

422 # authorization header is removed first. 

423 self._modify_request_before_signing(request) 

424 canonical_request = self.canonical_request(request) 

425 logger.debug("Calculating signature using v4 auth.") 

426 logger.debug('CanonicalRequest:\n%s', canonical_request) 

427 string_to_sign = self.string_to_sign(request, canonical_request) 

428 logger.debug('StringToSign:\n%s', string_to_sign) 

429 signature = self.signature(string_to_sign, request) 

430 logger.debug('Signature:\n%s', signature) 

431 

432 self._inject_signature_to_request(request, signature) 

433 

434 def _inject_signature_to_request(self, request, signature): 

435 auth_str = ['AWS4-HMAC-SHA256 Credential=%s' % self.scope(request)] 

436 headers_to_sign = self.headers_to_sign(request) 

437 auth_str.append( 

438 f"SignedHeaders={self.signed_headers(headers_to_sign)}" 

439 ) 

440 auth_str.append('Signature=%s' % signature) 

441 request.headers['Authorization'] = ', '.join(auth_str) 

442 return request 

443 

444 def _modify_request_before_signing(self, request): 

445 if 'Authorization' in request.headers: 

446 del request.headers['Authorization'] 

447 self._set_necessary_date_headers(request) 

448 if self.credentials.token: 

449 if 'X-Amz-Security-Token' in request.headers: 

450 del request.headers['X-Amz-Security-Token'] 

451 request.headers['X-Amz-Security-Token'] = self.credentials.token 

452 

453 if not request.context.get('payload_signing_enabled', True): 

454 if 'X-Amz-Content-SHA256' in request.headers: 

455 del request.headers['X-Amz-Content-SHA256'] 

456 request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD 

457 

458 def _set_necessary_date_headers(self, request): 

459 # The spec allows for either the Date _or_ the X-Amz-Date value to be 

460 # used so we check both. If there's a Date header, we use the date 

461 # header. Otherwise we use the X-Amz-Date header. 

462 if 'Date' in request.headers: 

463 del request.headers['Date'] 

464 datetime_timestamp = datetime.datetime.strptime( 

465 request.context['timestamp'], SIGV4_TIMESTAMP 

466 ) 

467 request.headers['Date'] = formatdate( 

468 int(calendar.timegm(datetime_timestamp.timetuple())) 

469 ) 

470 if 'X-Amz-Date' in request.headers: 

471 del request.headers['X-Amz-Date'] 

472 else: 

473 if 'X-Amz-Date' in request.headers: 

474 del request.headers['X-Amz-Date'] 

475 request.headers['X-Amz-Date'] = request.context['timestamp'] 

476 

477 

478class S3SigV4Auth(SigV4Auth): 

479 def _modify_request_before_signing(self, request): 

480 super()._modify_request_before_signing(request) 

481 if 'X-Amz-Content-SHA256' in request.headers: 

482 del request.headers['X-Amz-Content-SHA256'] 

483 

484 request.headers['X-Amz-Content-SHA256'] = self.payload(request) 

485 

486 def _should_sha256_sign_payload(self, request): 

487 # S3 allows optional body signing, so to minimize the performance 

488 # impact, we opt to not SHA256 sign the body on streaming uploads, 

489 # provided that we're on https. 

490 client_config = request.context.get('client_config') 

491 s3_config = getattr(client_config, 's3', None) 

492 

493 # The config could be None if it isn't set, or if the customer sets it 

494 # to None. 

495 if s3_config is None: 

496 s3_config = {} 

497 

498 # The explicit configuration takes precedence over any implicit 

499 # configuration. 

500 sign_payload = s3_config.get('payload_signing_enabled', None) 

501 if sign_payload is not None: 

502 return sign_payload 

503 

504 # We require that both a checksum be present and https be enabled 

505 # to implicitly disable body signing. The combination of TLS and 

506 # a checksum is sufficiently secure and durable for us to be 

507 # confident in the request without body signing. 

508 checksum_header = 'Content-MD5' 

509 checksum_context = request.context.get('checksum', {}) 

510 algorithm = checksum_context.get('request_algorithm') 

511 if isinstance(algorithm, dict) and algorithm.get('in') == 'header': 

512 checksum_header = algorithm['name'] 

513 if ( 

514 not request.url.startswith("https") 

515 or checksum_header not in request.headers 

516 ): 

517 return True 

518 

519 # If the input is streaming we disable body signing by default. 

520 if request.context.get('has_streaming_input', False): 

521 return False 

522 

523 # If the S3-specific checks had no results, delegate to the generic 

524 # checks. 

525 return super()._should_sha256_sign_payload(request) 

526 

527 def _normalize_url_path(self, path): 

528 # For S3, we do not normalize the path. 

529 return path 

530 

531 

532class S3ExpressAuth(S3SigV4Auth): 

533 REQUIRES_IDENTITY_CACHE = True 

534 

535 def __init__( 

536 self, credentials, service_name, region_name, *, identity_cache 

537 ): 

538 super().__init__(credentials, service_name, region_name) 

539 self._identity_cache = identity_cache 

540 

541 def add_auth(self, request): 

542 super().add_auth(request) 

543 

544 def _modify_request_before_signing(self, request): 

545 super()._modify_request_before_signing(request) 

546 if 'x-amz-s3session-token' not in request.headers: 

547 request.headers['x-amz-s3session-token'] = self.credentials.token 

548 # S3Express does not support STS' X-Amz-Security-Token 

549 if 'X-Amz-Security-Token' in request.headers: 

550 del request.headers['X-Amz-Security-Token'] 

551 

552 

553class S3ExpressPostAuth(S3ExpressAuth): 

554 REQUIRES_IDENTITY_CACHE = True 

555 

556 def add_auth(self, request): 

557 datetime_now = datetime.datetime.utcnow() 

558 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

559 

560 fields = {} 

561 if request.context.get('s3-presign-post-fields', None) is not None: 

562 fields = request.context['s3-presign-post-fields'] 

563 

564 policy = {} 

565 conditions = [] 

566 if request.context.get('s3-presign-post-policy', None) is not None: 

567 policy = request.context['s3-presign-post-policy'] 

568 if policy.get('conditions', None) is not None: 

569 conditions = policy['conditions'] 

570 

571 policy['conditions'] = conditions 

572 

573 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

574 fields['x-amz-credential'] = self.scope(request) 

575 fields['x-amz-date'] = request.context['timestamp'] 

576 

577 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

578 conditions.append({'x-amz-credential': self.scope(request)}) 

579 conditions.append({'x-amz-date': request.context['timestamp']}) 

580 

581 if self.credentials.token is not None: 

582 fields['X-Amz-S3session-Token'] = self.credentials.token 

583 conditions.append( 

584 {'X-Amz-S3session-Token': self.credentials.token} 

585 ) 

586 

587 # Dump the base64 encoded policy into the fields dictionary. 

588 fields['policy'] = base64.b64encode( 

589 json.dumps(policy).encode('utf-8') 

590 ).decode('utf-8') 

591 

592 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

593 

594 request.context['s3-presign-post-fields'] = fields 

595 request.context['s3-presign-post-policy'] = policy 

596 

597 

598class S3ExpressQueryAuth(S3ExpressAuth): 

599 DEFAULT_EXPIRES = 300 

600 REQUIRES_IDENTITY_CACHE = True 

601 

602 def __init__( 

603 self, 

604 credentials, 

605 service_name, 

606 region_name, 

607 *, 

608 identity_cache, 

609 expires=DEFAULT_EXPIRES, 

610 ): 

611 super().__init__( 

612 credentials, 

613 service_name, 

614 region_name, 

615 identity_cache=identity_cache, 

616 ) 

617 self._expires = expires 

618 

619 def _modify_request_before_signing(self, request): 

620 # We automatically set this header, so if it's the auto-set value we 

621 # want to get rid of it since it doesn't make sense for presigned urls. 

622 content_type = request.headers.get('content-type') 

623 blocklisted_content_type = ( 

624 'application/x-www-form-urlencoded; charset=utf-8' 

625 ) 

626 if content_type == blocklisted_content_type: 

627 del request.headers['content-type'] 

628 

629 # Note that we're not including X-Amz-Signature. 

630 # From the docs: "The Canonical Query String must include all the query 

631 # parameters from the preceding table except for X-Amz-Signature. 

632 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

633 

634 auth_params = { 

635 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

636 'X-Amz-Credential': self.scope(request), 

637 'X-Amz-Date': request.context['timestamp'], 

638 'X-Amz-Expires': self._expires, 

639 'X-Amz-SignedHeaders': signed_headers, 

640 } 

641 if self.credentials.token is not None: 

642 auth_params['X-Amz-S3session-Token'] = self.credentials.token 

643 # Now parse the original query string to a dict, inject our new query 

644 # params, and serialize back to a query string. 

645 url_parts = urlsplit(request.url) 

646 # parse_qs makes each value a list, but in our case we know we won't 

647 # have repeated keys so we know we have single element lists which we 

648 # can convert back to scalar values. 

649 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

650 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

651 

652 if request.params: 

653 query_dict.update(request.params) 

654 request.params = {} 

655 # The spec is particular about this. It *has* to be: 

656 # https://<endpoint>?<operation params>&<auth params> 

657 # You can't mix the two types of params together, i.e just keep doing 

658 # new_query_params.update(op_params) 

659 # new_query_params.update(auth_params) 

660 # percent_encode_sequence(new_query_params) 

661 operation_params = '' 

662 if request.data: 

663 # We also need to move the body params into the query string. To 

664 # do this, we first have to convert it to a dict. 

665 query_dict.update(_get_body_as_dict(request)) 

666 request.data = '' 

667 if query_dict: 

668 operation_params = percent_encode_sequence(query_dict) + '&' 

669 new_query_string = ( 

670 f"{operation_params}{percent_encode_sequence(auth_params)}" 

671 ) 

672 # url_parts is a tuple (and therefore immutable) so we need to create 

673 # a new url_parts with the new query string. 

674 # <part> - <index> 

675 # scheme - 0 

676 # netloc - 1 

677 # path - 2 

678 # query - 3 <-- we're replacing this. 

679 # fragment - 4 

680 p = url_parts 

681 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

682 request.url = urlunsplit(new_url_parts) 

683 

684 def _inject_signature_to_request(self, request, signature): 

685 # Rather than calculating an "Authorization" header, for the query 

686 # param quth, we just append an 'X-Amz-Signature' param to the end 

687 # of the query string. 

688 request.url += '&X-Amz-Signature=%s' % signature 

689 

690 def _normalize_url_path(self, path): 

691 # For S3, we do not normalize the path. 

692 return path 

693 

694 def payload(self, request): 

695 # From the doc link above: 

696 # "You don't include a payload hash in the Canonical Request, because 

697 # when you create a presigned URL, you don't know anything about the 

698 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

699 return UNSIGNED_PAYLOAD 

700 

701 

702class SigV4QueryAuth(SigV4Auth): 

703 DEFAULT_EXPIRES = 3600 

704 

705 def __init__( 

706 self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES 

707 ): 

708 super().__init__(credentials, service_name, region_name) 

709 self._expires = expires 

710 

711 def _modify_request_before_signing(self, request): 

712 # We automatically set this header, so if it's the auto-set value we 

713 # want to get rid of it since it doesn't make sense for presigned urls. 

714 content_type = request.headers.get('content-type') 

715 blacklisted_content_type = ( 

716 'application/x-www-form-urlencoded; charset=utf-8' 

717 ) 

718 if content_type == blacklisted_content_type: 

719 del request.headers['content-type'] 

720 

721 # Note that we're not including X-Amz-Signature. 

722 # From the docs: "The Canonical Query String must include all the query 

723 # parameters from the preceding table except for X-Amz-Signature. 

724 signed_headers = self.signed_headers(self.headers_to_sign(request)) 

725 

726 auth_params = { 

727 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 

728 'X-Amz-Credential': self.scope(request), 

729 'X-Amz-Date': request.context['timestamp'], 

730 'X-Amz-Expires': self._expires, 

731 'X-Amz-SignedHeaders': signed_headers, 

732 } 

733 if self.credentials.token is not None: 

734 auth_params['X-Amz-Security-Token'] = self.credentials.token 

735 # Now parse the original query string to a dict, inject our new query 

736 # params, and serialize back to a query string. 

737 url_parts = urlsplit(request.url) 

738 # parse_qs makes each value a list, but in our case we know we won't 

739 # have repeated keys so we know we have single element lists which we 

740 # can convert back to scalar values. 

741 query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) 

742 query_dict = {k: v[0] for k, v in query_string_parts.items()} 

743 

744 if request.params: 

745 query_dict.update(request.params) 

746 request.params = {} 

747 # The spec is particular about this. It *has* to be: 

748 # https://<endpoint>?<operation params>&<auth params> 

749 # You can't mix the two types of params together, i.e just keep doing 

750 # new_query_params.update(op_params) 

751 # new_query_params.update(auth_params) 

752 # percent_encode_sequence(new_query_params) 

753 operation_params = '' 

754 if request.data: 

755 # We also need to move the body params into the query string. To 

756 # do this, we first have to convert it to a dict. 

757 query_dict.update(_get_body_as_dict(request)) 

758 request.data = '' 

759 if query_dict: 

760 operation_params = percent_encode_sequence(query_dict) + '&' 

761 new_query_string = ( 

762 f"{operation_params}{percent_encode_sequence(auth_params)}" 

763 ) 

764 # url_parts is a tuple (and therefore immutable) so we need to create 

765 # a new url_parts with the new query string. 

766 # <part> - <index> 

767 # scheme - 0 

768 # netloc - 1 

769 # path - 2 

770 # query - 3 <-- we're replacing this. 

771 # fragment - 4 

772 p = url_parts 

773 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

774 request.url = urlunsplit(new_url_parts) 

775 

776 def _inject_signature_to_request(self, request, signature): 

777 # Rather than calculating an "Authorization" header, for the query 

778 # param quth, we just append an 'X-Amz-Signature' param to the end 

779 # of the query string. 

780 request.url += '&X-Amz-Signature=%s' % signature 

781 

782 

783class S3SigV4QueryAuth(SigV4QueryAuth): 

784 """S3 SigV4 auth using query parameters. 

785 

786 This signer will sign a request using query parameters and signature 

787 version 4, i.e a "presigned url" signer. 

788 

789 Based off of: 

790 

791 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html 

792 

793 """ 

794 

795 def _normalize_url_path(self, path): 

796 # For S3, we do not normalize the path. 

797 return path 

798 

799 def payload(self, request): 

800 # From the doc link above: 

801 # "You don't include a payload hash in the Canonical Request, because 

802 # when you create a presigned URL, you don't know anything about the 

803 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". 

804 return UNSIGNED_PAYLOAD 

805 

806 

807class S3SigV4PostAuth(SigV4Auth): 

808 """ 

809 Presigns a s3 post 

810 

811 Implementation doc here: 

812 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html 

813 """ 

814 

815 def add_auth(self, request): 

816 datetime_now = datetime.datetime.utcnow() 

817 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) 

818 

819 fields = {} 

820 if request.context.get('s3-presign-post-fields', None) is not None: 

821 fields = request.context['s3-presign-post-fields'] 

822 

823 policy = {} 

824 conditions = [] 

825 if request.context.get('s3-presign-post-policy', None) is not None: 

826 policy = request.context['s3-presign-post-policy'] 

827 if policy.get('conditions', None) is not None: 

828 conditions = policy['conditions'] 

829 

830 policy['conditions'] = conditions 

831 

832 fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' 

833 fields['x-amz-credential'] = self.scope(request) 

834 fields['x-amz-date'] = request.context['timestamp'] 

835 

836 conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) 

837 conditions.append({'x-amz-credential': self.scope(request)}) 

838 conditions.append({'x-amz-date': request.context['timestamp']}) 

839 

840 if self.credentials.token is not None: 

841 fields['x-amz-security-token'] = self.credentials.token 

842 conditions.append({'x-amz-security-token': self.credentials.token}) 

843 

844 # Dump the base64 encoded policy into the fields dictionary. 

845 fields['policy'] = base64.b64encode( 

846 json.dumps(policy).encode('utf-8') 

847 ).decode('utf-8') 

848 

849 fields['x-amz-signature'] = self.signature(fields['policy'], request) 

850 

851 request.context['s3-presign-post-fields'] = fields 

852 request.context['s3-presign-post-policy'] = policy 

853 

854 

855class HmacV1Auth(BaseSigner): 

856 # List of Query String Arguments of Interest 

857 QSAOfInterest = [ 

858 'accelerate', 

859 'acl', 

860 'cors', 

861 'defaultObjectAcl', 

862 'location', 

863 'logging', 

864 'partNumber', 

865 'policy', 

866 'requestPayment', 

867 'torrent', 

868 'versioning', 

869 'versionId', 

870 'versions', 

871 'website', 

872 'uploads', 

873 'uploadId', 

874 'response-content-type', 

875 'response-content-language', 

876 'response-expires', 

877 'response-cache-control', 

878 'response-content-disposition', 

879 'response-content-encoding', 

880 'delete', 

881 'lifecycle', 

882 'tagging', 

883 'restore', 

884 'storageClass', 

885 'notification', 

886 'replication', 

887 'requestPayment', 

888 'analytics', 

889 'metrics', 

890 'inventory', 

891 'select', 

892 'select-type', 

893 'object-lock', 

894 ] 

895 

896 def __init__(self, credentials, service_name=None, region_name=None): 

897 self.credentials = credentials 

898 

899 def sign_string(self, string_to_sign): 

900 new_hmac = hmac.new( 

901 self.credentials.secret_key.encode('utf-8'), digestmod=sha1 

902 ) 

903 new_hmac.update(string_to_sign.encode('utf-8')) 

904 return encodebytes(new_hmac.digest()).strip().decode('utf-8') 

905 

906 def canonical_standard_headers(self, headers): 

907 interesting_headers = ['content-md5', 'content-type', 'date'] 

908 hoi = [] 

909 if 'Date' in headers: 

910 del headers['Date'] 

911 headers['Date'] = self._get_date() 

912 for ih in interesting_headers: 

913 found = False 

914 for key in headers: 

915 lk = key.lower() 

916 if headers[key] is not None and lk == ih: 

917 hoi.append(headers[key].strip()) 

918 found = True 

919 if not found: 

920 hoi.append('') 

921 return '\n'.join(hoi) 

922 

923 def canonical_custom_headers(self, headers): 

924 hoi = [] 

925 custom_headers = {} 

926 for key in headers: 

927 lk = key.lower() 

928 if headers[key] is not None: 

929 if lk.startswith('x-amz-'): 

930 custom_headers[lk] = ','.join( 

931 v.strip() for v in headers.get_all(key) 

932 ) 

933 sorted_header_keys = sorted(custom_headers.keys()) 

934 for key in sorted_header_keys: 

935 hoi.append(f"{key}:{custom_headers[key]}") 

936 return '\n'.join(hoi) 

937 

938 def unquote_v(self, nv): 

939 """ 

940 TODO: Do we need this? 

941 """ 

942 if len(nv) == 1: 

943 return nv 

944 else: 

945 return (nv[0], unquote(nv[1])) 

946 

947 def canonical_resource(self, split, auth_path=None): 

948 # don't include anything after the first ? in the resource... 

949 # unless it is one of the QSA of interest, defined above 

950 # NOTE: 

951 # The path in the canonical resource should always be the 

952 # full path including the bucket name, even for virtual-hosting 

953 # style addressing. The ``auth_path`` keeps track of the full 

954 # path for the canonical resource and would be passed in if 

955 # the client was using virtual-hosting style. 

956 if auth_path is not None: 

957 buf = auth_path 

958 else: 

959 buf = split.path 

960 if split.query: 

961 qsa = split.query.split('&') 

962 qsa = [a.split('=', 1) for a in qsa] 

963 qsa = [ 

964 self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest 

965 ] 

966 if len(qsa) > 0: 

967 qsa.sort(key=itemgetter(0)) 

968 qsa = ['='.join(a) for a in qsa] 

969 buf += '?' 

970 buf += '&'.join(qsa) 

971 return buf 

972 

973 def canonical_string( 

974 self, method, split, headers, expires=None, auth_path=None 

975 ): 

976 cs = method.upper() + '\n' 

977 cs += self.canonical_standard_headers(headers) + '\n' 

978 custom_headers = self.canonical_custom_headers(headers) 

979 if custom_headers: 

980 cs += custom_headers + '\n' 

981 cs += self.canonical_resource(split, auth_path=auth_path) 

982 return cs 

983 

984 def get_signature( 

985 self, method, split, headers, expires=None, auth_path=None 

986 ): 

987 if self.credentials.token: 

988 del headers['x-amz-security-token'] 

989 headers['x-amz-security-token'] = self.credentials.token 

990 string_to_sign = self.canonical_string( 

991 method, split, headers, auth_path=auth_path 

992 ) 

993 logger.debug('StringToSign:\n%s', string_to_sign) 

994 return self.sign_string(string_to_sign) 

995 

996 def add_auth(self, request): 

997 if self.credentials is None: 

998 raise NoCredentialsError 

999 logger.debug("Calculating signature using hmacv1 auth.") 

1000 split = urlsplit(request.url) 

1001 logger.debug('HTTP request method: %s', request.method) 

1002 signature = self.get_signature( 

1003 request.method, split, request.headers, auth_path=request.auth_path 

1004 ) 

1005 self._inject_signature(request, signature) 

1006 

1007 def _get_date(self): 

1008 return formatdate(usegmt=True) 

1009 

1010 def _inject_signature(self, request, signature): 

1011 if 'Authorization' in request.headers: 

1012 # We have to do this because request.headers is not 

1013 # normal dictionary. It has the (unintuitive) behavior 

1014 # of aggregating repeated setattr calls for the same 

1015 # key value. For example: 

1016 # headers['foo'] = 'a'; headers['foo'] = 'b' 

1017 # list(headers) will print ['foo', 'foo']. 

1018 del request.headers['Authorization'] 

1019 

1020 auth_header = f"AWS {self.credentials.access_key}:{signature}" 

1021 request.headers['Authorization'] = auth_header 

1022 

1023 

1024class HmacV1QueryAuth(HmacV1Auth): 

1025 """ 

1026 Generates a presigned request for s3. 

1027 

1028 Spec from this document: 

1029 

1030 http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html 

1031 #RESTAuthenticationQueryStringAuth 

1032 

1033 """ 

1034 

1035 DEFAULT_EXPIRES = 3600 

1036 

1037 def __init__(self, credentials, expires=DEFAULT_EXPIRES): 

1038 self.credentials = credentials 

1039 self._expires = expires 

1040 

1041 def _get_date(self): 

1042 return str(int(time.time() + int(self._expires))) 

1043 

1044 def _inject_signature(self, request, signature): 

1045 query_dict = {} 

1046 query_dict['AWSAccessKeyId'] = self.credentials.access_key 

1047 query_dict['Signature'] = signature 

1048 

1049 for header_key in request.headers: 

1050 lk = header_key.lower() 

1051 # For query string requests, Expires is used instead of the 

1052 # Date header. 

1053 if header_key == 'Date': 

1054 query_dict['Expires'] = request.headers['Date'] 

1055 # We only want to include relevant headers in the query string. 

1056 # These can be anything that starts with x-amz, is Content-MD5, 

1057 # or is Content-Type. 

1058 elif lk.startswith('x-amz-') or lk in ( 

1059 'content-md5', 

1060 'content-type', 

1061 ): 

1062 query_dict[lk] = request.headers[lk] 

1063 # Combine all of the identified headers into an encoded 

1064 # query string 

1065 new_query_string = percent_encode_sequence(query_dict) 

1066 

1067 # Create a new url with the presigned url. 

1068 p = urlsplit(request.url) 

1069 if p[3]: 

1070 # If there was a pre-existing query string, we should 

1071 # add that back before injecting the new query string. 

1072 new_query_string = f'{p[3]}&{new_query_string}' 

1073 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) 

1074 request.url = urlunsplit(new_url_parts) 

1075 

1076 

1077class HmacV1PostAuth(HmacV1Auth): 

1078 """ 

1079 Generates a presigned post for s3. 

1080 

1081 Spec from this document: 

1082 

1083 http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html 

1084 """ 

1085 

1086 def add_auth(self, request): 

1087 fields = {} 

1088 if request.context.get('s3-presign-post-fields', None) is not None: 

1089 fields = request.context['s3-presign-post-fields'] 

1090 

1091 policy = {} 

1092 conditions = [] 

1093 if request.context.get('s3-presign-post-policy', None) is not None: 

1094 policy = request.context['s3-presign-post-policy'] 

1095 if policy.get('conditions', None) is not None: 

1096 conditions = policy['conditions'] 

1097 

1098 policy['conditions'] = conditions 

1099 

1100 fields['AWSAccessKeyId'] = self.credentials.access_key 

1101 

1102 if self.credentials.token is not None: 

1103 fields['x-amz-security-token'] = self.credentials.token 

1104 conditions.append({'x-amz-security-token': self.credentials.token}) 

1105 

1106 # Dump the base64 encoded policy into the fields dictionary. 

1107 fields['policy'] = base64.b64encode( 

1108 json.dumps(policy).encode('utf-8') 

1109 ).decode('utf-8') 

1110 

1111 fields['signature'] = self.sign_string(fields['policy']) 

1112 

1113 request.context['s3-presign-post-fields'] = fields 

1114 request.context['s3-presign-post-policy'] = policy 

1115 

1116 

1117class BearerAuth(TokenSigner): 

1118 """ 

1119 Performs bearer token authorization by placing the bearer token in the 

1120 Authorization header as specified by Section 2.1 of RFC 6750. 

1121 

1122 https://datatracker.ietf.org/doc/html/rfc6750#section-2.1 

1123 """ 

1124 

1125 def add_auth(self, request): 

1126 if self.auth_token is None: 

1127 raise NoAuthTokenError() 

1128 

1129 auth_header = f'Bearer {self.auth_token.token}' 

1130 if 'Authorization' in request.headers: 

1131 del request.headers['Authorization'] 

1132 request.headers['Authorization'] = auth_header 

1133 

1134 

1135AUTH_TYPE_MAPS = { 

1136 'v2': SigV2Auth, 

1137 'v3': SigV3Auth, 

1138 'v3https': SigV3Auth, 

1139 's3': HmacV1Auth, 

1140 's3-query': HmacV1QueryAuth, 

1141 's3-presign-post': HmacV1PostAuth, 

1142 's3v4-presign-post': S3SigV4PostAuth, 

1143 'v4-s3express': S3ExpressAuth, 

1144 'v4-s3express-query': S3ExpressQueryAuth, 

1145 'v4-s3express-presign-post': S3ExpressPostAuth, 

1146 'bearer': BearerAuth, 

1147} 

1148 

1149# Define v4 signers depending on if CRT is present 

1150if HAS_CRT: 

1151 from botocore.crt.auth import CRT_AUTH_TYPE_MAPS 

1152 

1153 AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS) 

1154else: 

1155 AUTH_TYPE_MAPS.update( 

1156 { 

1157 'v4': SigV4Auth, 

1158 'v4-query': SigV4QueryAuth, 

1159 's3v4': S3SigV4Auth, 

1160 's3v4-query': S3SigV4QueryAuth, 

1161 } 

1162 )