Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/endpoint.py: 23%

183 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14 

15import datetime 

16import logging 

17import os 

18import threading 

19import time 

20import uuid 

21 

22from botocore import parsers 

23from botocore.awsrequest import create_request_object 

24from botocore.exceptions import HTTPClientError 

25from botocore.history import get_global_history_recorder 

26from botocore.hooks import first_non_none_response 

27from botocore.httpchecksum import handle_checksum_body 

28from botocore.httpsession import URLLib3Session 

29from botocore.response import StreamingBody 

30from botocore.utils import ( 

31 get_environ_proxies, 

32 is_valid_endpoint_url, 

33 is_valid_ipv6_endpoint_url, 

34) 

35 

36logger = logging.getLogger(__name__) 

37history_recorder = get_global_history_recorder() 

38DEFAULT_TIMEOUT = 60 

39MAX_POOL_CONNECTIONS = 10 

40 

41 

42def convert_to_response_dict(http_response, operation_model): 

43 """Convert an HTTP response object to a request dict. 

44 

45 This converts the requests library's HTTP response object to 

46 a dictionary. 

47 

48 :type http_response: botocore.vendored.requests.model.Response 

49 :param http_response: The HTTP response from an AWS service request. 

50 

51 :rtype: dict 

52 :return: A response dictionary which will contain the following keys: 

53 * headers (dict) 

54 * status_code (int) 

55 * body (string or file-like object) 

56 

57 """ 

58 response_dict = { 

59 'headers': http_response.headers, 

60 'status_code': http_response.status_code, 

61 'context': { 

62 'operation_name': operation_model.name, 

63 }, 

64 } 

65 if response_dict['status_code'] >= 300: 

66 response_dict['body'] = http_response.content 

67 elif operation_model.has_event_stream_output: 

68 response_dict['body'] = http_response.raw 

69 elif operation_model.has_streaming_output: 

70 length = response_dict['headers'].get('content-length') 

71 response_dict['body'] = StreamingBody(http_response.raw, length) 

72 else: 

73 response_dict['body'] = http_response.content 

74 return response_dict 

75 

76 

77class Endpoint: 

78 """ 

79 Represents an endpoint for a particular service in a specific 

80 region. Only an endpoint can make requests. 

81 

82 :ivar service: The Service object that describes this endpoints 

83 service. 

84 :ivar host: The fully qualified endpoint hostname. 

85 :ivar session: The session object. 

86 """ 

87 

88 def __init__( 

89 self, 

90 host, 

91 endpoint_prefix, 

92 event_emitter, 

93 response_parser_factory=None, 

94 http_session=None, 

95 ): 

96 self._endpoint_prefix = endpoint_prefix 

97 self._event_emitter = event_emitter 

98 self.host = host 

99 self._lock = threading.Lock() 

100 if response_parser_factory is None: 

101 response_parser_factory = parsers.ResponseParserFactory() 

102 self._response_parser_factory = response_parser_factory 

103 self.http_session = http_session 

104 if self.http_session is None: 

105 self.http_session = URLLib3Session() 

106 

107 def __repr__(self): 

108 return f'{self._endpoint_prefix}({self.host})' 

109 

110 def close(self): 

111 self.http_session.close() 

112 

113 def make_request(self, operation_model, request_dict): 

114 logger.debug( 

115 "Making request for %s with params: %s", 

116 operation_model, 

117 request_dict, 

118 ) 

119 return self._send_request(request_dict, operation_model) 

120 

121 def create_request(self, params, operation_model=None): 

122 request = create_request_object(params) 

123 if operation_model: 

124 request.stream_output = any( 

125 [ 

126 operation_model.has_streaming_output, 

127 operation_model.has_event_stream_output, 

128 ] 

129 ) 

130 service_id = operation_model.service_model.service_id.hyphenize() 

131 event_name = 'request-created.{service_id}.{op_name}'.format( 

132 service_id=service_id, op_name=operation_model.name 

133 ) 

134 self._event_emitter.emit( 

135 event_name, 

136 request=request, 

137 operation_name=operation_model.name, 

138 ) 

139 prepared_request = self.prepare_request(request) 

140 return prepared_request 

141 

142 def _encode_headers(self, headers): 

143 # In place encoding of headers to utf-8 if they are unicode. 

144 for key, value in headers.items(): 

145 if isinstance(value, str): 

146 headers[key] = value.encode('utf-8') 

147 

148 def prepare_request(self, request): 

149 self._encode_headers(request.headers) 

150 return request.prepare() 

151 

152 def _calculate_ttl( 

153 self, response_received_timestamp, date_header, read_timeout 

154 ): 

155 local_timestamp = datetime.datetime.utcnow() 

156 date_conversion = datetime.datetime.strptime( 

157 date_header, "%a, %d %b %Y %H:%M:%S %Z" 

158 ) 

159 estimated_skew = date_conversion - response_received_timestamp 

160 ttl = ( 

161 local_timestamp 

162 + datetime.timedelta(seconds=read_timeout) 

163 + estimated_skew 

164 ) 

165 return ttl.strftime('%Y%m%dT%H%M%SZ') 

166 

167 def _set_ttl(self, retries_context, read_timeout, success_response): 

168 response_date_header = success_response[0].headers.get('Date') 

169 has_streaming_input = retries_context.get('has_streaming_input') 

170 if response_date_header and not has_streaming_input: 

171 try: 

172 response_received_timestamp = datetime.datetime.utcnow() 

173 retries_context['ttl'] = self._calculate_ttl( 

174 response_received_timestamp, 

175 response_date_header, 

176 read_timeout, 

177 ) 

178 except Exception: 

179 logger.debug( 

180 "Exception received when updating retries context with TTL", 

181 exc_info=True, 

182 ) 

183 

184 def _update_retries_context(self, context, attempt, success_response=None): 

185 retries_context = context.setdefault('retries', {}) 

186 retries_context['attempt'] = attempt 

187 if 'invocation-id' not in retries_context: 

188 retries_context['invocation-id'] = str(uuid.uuid4()) 

189 

190 if success_response: 

191 read_timeout = context['client_config'].read_timeout 

192 self._set_ttl(retries_context, read_timeout, success_response) 

193 

194 def _send_request(self, request_dict, operation_model): 

195 attempts = 1 

196 context = request_dict['context'] 

197 self._update_retries_context(context, attempts) 

198 request = self.create_request(request_dict, operation_model) 

199 success_response, exception = self._get_response( 

200 request, operation_model, context 

201 ) 

202 while self._needs_retry( 

203 attempts, 

204 operation_model, 

205 request_dict, 

206 success_response, 

207 exception, 

208 ): 

209 attempts += 1 

210 self._update_retries_context(context, attempts, success_response) 

211 # If there is a stream associated with the request, we need 

212 # to reset it before attempting to send the request again. 

213 # This will ensure that we resend the entire contents of the 

214 # body. 

215 request.reset_stream() 

216 # Create a new request when retried (including a new signature). 

217 request = self.create_request(request_dict, operation_model) 

218 success_response, exception = self._get_response( 

219 request, operation_model, context 

220 ) 

221 if ( 

222 success_response is not None 

223 and 'ResponseMetadata' in success_response[1] 

224 ): 

225 # We want to share num retries, not num attempts. 

226 total_retries = attempts - 1 

227 success_response[1]['ResponseMetadata'][ 

228 'RetryAttempts' 

229 ] = total_retries 

230 if exception is not None: 

231 raise exception 

232 else: 

233 return success_response 

234 

235 def _get_response(self, request, operation_model, context): 

236 # This will return a tuple of (success_response, exception) 

237 # and success_response is itself a tuple of 

238 # (http_response, parsed_dict). 

239 # If an exception occurs then the success_response is None. 

240 # If no exception occurs then exception is None. 

241 success_response, exception = self._do_get_response( 

242 request, operation_model, context 

243 ) 

244 kwargs_to_emit = { 

245 'response_dict': None, 

246 'parsed_response': None, 

247 'context': context, 

248 'exception': exception, 

249 } 

250 if success_response is not None: 

251 http_response, parsed_response = success_response 

252 kwargs_to_emit['parsed_response'] = parsed_response 

253 kwargs_to_emit['response_dict'] = convert_to_response_dict( 

254 http_response, operation_model 

255 ) 

256 service_id = operation_model.service_model.service_id.hyphenize() 

257 self._event_emitter.emit( 

258 f"response-received.{service_id}.{operation_model.name}", 

259 **kwargs_to_emit, 

260 ) 

261 return success_response, exception 

262 

263 def _do_get_response(self, request, operation_model, context): 

264 try: 

265 logger.debug("Sending http request: %s", request) 

266 history_recorder.record( 

267 'HTTP_REQUEST', 

268 { 

269 'method': request.method, 

270 'headers': request.headers, 

271 'streaming': operation_model.has_streaming_input, 

272 'url': request.url, 

273 'body': request.body, 

274 }, 

275 ) 

276 service_id = operation_model.service_model.service_id.hyphenize() 

277 event_name = f"before-send.{service_id}.{operation_model.name}" 

278 responses = self._event_emitter.emit(event_name, request=request) 

279 http_response = first_non_none_response(responses) 

280 if http_response is None: 

281 http_response = self._send(request) 

282 except HTTPClientError as e: 

283 return (None, e) 

284 except Exception as e: 

285 logger.debug( 

286 "Exception received when sending HTTP request.", exc_info=True 

287 ) 

288 return (None, e) 

289 # This returns the http_response and the parsed_data. 

290 response_dict = convert_to_response_dict( 

291 http_response, operation_model 

292 ) 

293 handle_checksum_body( 

294 http_response, 

295 response_dict, 

296 context, 

297 operation_model, 

298 ) 

299 

300 http_response_record_dict = response_dict.copy() 

301 http_response_record_dict[ 

302 'streaming' 

303 ] = operation_model.has_streaming_output 

304 history_recorder.record('HTTP_RESPONSE', http_response_record_dict) 

305 

306 protocol = operation_model.metadata['protocol'] 

307 parser = self._response_parser_factory.create_parser(protocol) 

308 parsed_response = parser.parse( 

309 response_dict, operation_model.output_shape 

310 ) 

311 # Do a second parsing pass to pick up on any modeled error fields 

312 # NOTE: Ideally, we would push this down into the parser classes but 

313 # they currently have no reference to the operation or service model 

314 # The parsers should probably take the operation model instead of 

315 # output shape but we can't change that now 

316 if http_response.status_code >= 300: 

317 self._add_modeled_error_fields( 

318 response_dict, 

319 parsed_response, 

320 operation_model, 

321 parser, 

322 ) 

323 history_recorder.record('PARSED_RESPONSE', parsed_response) 

324 return (http_response, parsed_response), None 

325 

326 def _add_modeled_error_fields( 

327 self, 

328 response_dict, 

329 parsed_response, 

330 operation_model, 

331 parser, 

332 ): 

333 error_code = parsed_response.get("Error", {}).get("Code") 

334 if error_code is None: 

335 return 

336 service_model = operation_model.service_model 

337 error_shape = service_model.shape_for_error_code(error_code) 

338 if error_shape is None: 

339 return 

340 modeled_parse = parser.parse(response_dict, error_shape) 

341 # TODO: avoid naming conflicts with ResponseMetadata and Error 

342 parsed_response.update(modeled_parse) 

343 

344 def _needs_retry( 

345 self, 

346 attempts, 

347 operation_model, 

348 request_dict, 

349 response=None, 

350 caught_exception=None, 

351 ): 

352 service_id = operation_model.service_model.service_id.hyphenize() 

353 event_name = f"needs-retry.{service_id}.{operation_model.name}" 

354 responses = self._event_emitter.emit( 

355 event_name, 

356 response=response, 

357 endpoint=self, 

358 operation=operation_model, 

359 attempts=attempts, 

360 caught_exception=caught_exception, 

361 request_dict=request_dict, 

362 ) 

363 handler_response = first_non_none_response(responses) 

364 if handler_response is None: 

365 return False 

366 else: 

367 # Request needs to be retried, and we need to sleep 

368 # for the specified number of times. 

369 logger.debug( 

370 "Response received to retry, sleeping for %s seconds", 

371 handler_response, 

372 ) 

373 time.sleep(handler_response) 

374 return True 

375 

376 def _send(self, request): 

377 return self.http_session.send(request) 

378 

379 

380class EndpointCreator: 

381 def __init__(self, event_emitter): 

382 self._event_emitter = event_emitter 

383 

384 def create_endpoint( 

385 self, 

386 service_model, 

387 region_name, 

388 endpoint_url, 

389 verify=None, 

390 response_parser_factory=None, 

391 timeout=DEFAULT_TIMEOUT, 

392 max_pool_connections=MAX_POOL_CONNECTIONS, 

393 http_session_cls=URLLib3Session, 

394 proxies=None, 

395 socket_options=None, 

396 client_cert=None, 

397 proxies_config=None, 

398 ): 

399 if not is_valid_endpoint_url( 

400 endpoint_url 

401 ) and not is_valid_ipv6_endpoint_url(endpoint_url): 

402 raise ValueError("Invalid endpoint: %s" % endpoint_url) 

403 

404 if proxies is None: 

405 proxies = self._get_proxies(endpoint_url) 

406 endpoint_prefix = service_model.endpoint_prefix 

407 

408 logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout) 

409 http_session = http_session_cls( 

410 timeout=timeout, 

411 proxies=proxies, 

412 verify=self._get_verify_value(verify), 

413 max_pool_connections=max_pool_connections, 

414 socket_options=socket_options, 

415 client_cert=client_cert, 

416 proxies_config=proxies_config, 

417 ) 

418 

419 return Endpoint( 

420 endpoint_url, 

421 endpoint_prefix=endpoint_prefix, 

422 event_emitter=self._event_emitter, 

423 response_parser_factory=response_parser_factory, 

424 http_session=http_session, 

425 ) 

426 

427 def _get_proxies(self, url): 

428 # We could also support getting proxies from a config file, 

429 # but for now proxy support is taken from the environment. 

430 return get_environ_proxies(url) 

431 

432 def _get_verify_value(self, verify): 

433 # This is to account for: 

434 # https://github.com/kennethreitz/requests/issues/1436 

435 # where we need to honor REQUESTS_CA_BUNDLE because we're creating our 

436 # own request objects. 

437 # First, if verify is not None, then the user explicitly specified 

438 # a value so this automatically wins. 

439 if verify is not None: 

440 return verify 

441 # Otherwise use the value from REQUESTS_CA_BUNDLE, or default to 

442 # True if the env var does not exist. 

443 return os.environ.get('REQUESTS_CA_BUNDLE', True)