Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/endpoint.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 

2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"). You 

5# may not use this file except in compliance with the License. A copy of 

6# the License is located at 

7# 

8# http://aws.amazon.com/apache2.0/ 

9# 

10# or in the "license" file accompanying this file. This file is 

11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

12# ANY KIND, either express or implied. See the License for the specific 

13# language governing permissions and limitations under the License. 

14 

15import datetime 

16import logging 

17import os 

18import threading 

19import time 

20import uuid 

21 

22from botocore import parsers 

23from botocore.awsrequest import create_request_object 

24from botocore.exceptions import HTTPClientError 

25from botocore.history import get_global_history_recorder 

26from botocore.hooks import first_non_none_response 

27from botocore.httpchecksum import handle_checksum_body 

28from botocore.httpsession import URLLib3Session 

29from botocore.response import StreamingBody 

30from botocore.utils import ( 

31 get_environ_proxies, 

32 is_valid_endpoint_url, 

33 is_valid_ipv6_endpoint_url, 

34) 

35 

36logger = logging.getLogger(__name__) 

37history_recorder = get_global_history_recorder() 

38DEFAULT_TIMEOUT = 60 

39MAX_POOL_CONNECTIONS = 10 

40 

41 

42def convert_to_response_dict(http_response, operation_model): 

43 """Convert an HTTP response object to a request dict. 

44 

45 This converts the HTTP response object to a dictionary. 

46 

47 :type http_response: botocore.awsrequest.AWSResponse 

48 :param http_response: The HTTP response from an AWS service request. 

49 

50 :rtype: dict 

51 :return: A response dictionary which will contain the following keys: 

52 * headers (dict) 

53 * status_code (int) 

54 * body (string or file-like object) 

55 

56 """ 

57 response_dict = { 

58 'headers': http_response.headers, 

59 'status_code': http_response.status_code, 

60 'context': { 

61 'operation_name': operation_model.name, 

62 }, 

63 } 

64 if response_dict['status_code'] >= 300: 

65 response_dict['body'] = http_response.content 

66 elif operation_model.has_event_stream_output: 

67 response_dict['body'] = http_response.raw 

68 elif operation_model.has_streaming_output: 

69 length = response_dict['headers'].get('content-length') 

70 response_dict['body'] = StreamingBody(http_response.raw, length) 

71 else: 

72 response_dict['body'] = http_response.content 

73 return response_dict 

74 

75 

76class Endpoint: 

77 """ 

78 Represents an endpoint for a particular service in a specific 

79 region. Only an endpoint can make requests. 

80 

81 :ivar service: The Service object that describes this endpoints 

82 service. 

83 :ivar host: The fully qualified endpoint hostname. 

84 :ivar session: The session object. 

85 """ 

86 

87 def __init__( 

88 self, 

89 host, 

90 endpoint_prefix, 

91 event_emitter, 

92 response_parser_factory=None, 

93 http_session=None, 

94 ): 

95 self._endpoint_prefix = endpoint_prefix 

96 self._event_emitter = event_emitter 

97 self.host = host 

98 self._lock = threading.Lock() 

99 if response_parser_factory is None: 

100 response_parser_factory = parsers.ResponseParserFactory() 

101 self._response_parser_factory = response_parser_factory 

102 self.http_session = http_session 

103 if self.http_session is None: 

104 self.http_session = URLLib3Session() 

105 

106 def __repr__(self): 

107 return f'{self._endpoint_prefix}({self.host})' 

108 

109 def close(self): 

110 self.http_session.close() 

111 

112 def make_request(self, operation_model, request_dict): 

113 logger.debug( 

114 "Making request for %s with params: %s", 

115 operation_model, 

116 request_dict, 

117 ) 

118 return self._send_request(request_dict, operation_model) 

119 

120 def create_request(self, params, operation_model=None): 

121 request = create_request_object(params) 

122 if operation_model: 

123 request.stream_output = any( 

124 [ 

125 operation_model.has_streaming_output, 

126 operation_model.has_event_stream_output, 

127 ] 

128 ) 

129 service_id = operation_model.service_model.service_id.hyphenize() 

130 event_name = f'request-created.{service_id}.{operation_model.name}' 

131 self._event_emitter.emit( 

132 event_name, 

133 request=request, 

134 operation_name=operation_model.name, 

135 ) 

136 prepared_request = self.prepare_request(request) 

137 return prepared_request 

138 

139 def _encode_headers(self, headers): 

140 # In place encoding of headers to utf-8 if they are unicode. 

141 for key, value in headers.items(): 

142 if isinstance(value, str): 

143 headers[key] = value.encode('utf-8') 

144 

145 def prepare_request(self, request): 

146 self._encode_headers(request.headers) 

147 return request.prepare() 

148 

149 def _calculate_ttl( 

150 self, response_received_timestamp, date_header, read_timeout 

151 ): 

152 local_timestamp = datetime.datetime.utcnow() 

153 date_conversion = datetime.datetime.strptime( 

154 date_header, "%a, %d %b %Y %H:%M:%S %Z" 

155 ) 

156 estimated_skew = date_conversion - response_received_timestamp 

157 ttl = ( 

158 local_timestamp 

159 + datetime.timedelta(seconds=read_timeout) 

160 + estimated_skew 

161 ) 

162 return ttl.strftime('%Y%m%dT%H%M%SZ') 

163 

164 def _set_ttl(self, retries_context, read_timeout, success_response): 

165 response_date_header = success_response[0].headers.get('Date') 

166 has_streaming_input = retries_context.get('has_streaming_input') 

167 if response_date_header and not has_streaming_input: 

168 try: 

169 response_received_timestamp = datetime.datetime.utcnow() 

170 retries_context['ttl'] = self._calculate_ttl( 

171 response_received_timestamp, 

172 response_date_header, 

173 read_timeout, 

174 ) 

175 except Exception: 

176 logger.debug( 

177 "Exception received when updating retries context with TTL", 

178 exc_info=True, 

179 ) 

180 

181 def _update_retries_context(self, context, attempt, success_response=None): 

182 retries_context = context.setdefault('retries', {}) 

183 retries_context['attempt'] = attempt 

184 if 'invocation-id' not in retries_context: 

185 retries_context['invocation-id'] = str(uuid.uuid4()) 

186 

187 if success_response: 

188 read_timeout = context['client_config'].read_timeout 

189 self._set_ttl(retries_context, read_timeout, success_response) 

190 

191 def _send_request(self, request_dict, operation_model): 

192 attempts = 1 

193 context = request_dict['context'] 

194 self._update_retries_context(context, attempts) 

195 request = self.create_request(request_dict, operation_model) 

196 success_response, exception = self._get_response( 

197 request, operation_model, context 

198 ) 

199 while self._needs_retry( 

200 attempts, 

201 operation_model, 

202 request_dict, 

203 success_response, 

204 exception, 

205 ): 

206 attempts += 1 

207 self._update_retries_context(context, attempts, success_response) 

208 # If there is a stream associated with the request, we need 

209 # to reset it before attempting to send the request again. 

210 # This will ensure that we resend the entire contents of the 

211 # body. 

212 request.reset_stream() 

213 # Create a new request when retried (including a new signature). 

214 request = self.create_request(request_dict, operation_model) 

215 success_response, exception = self._get_response( 

216 request, operation_model, context 

217 ) 

218 if ( 

219 success_response is not None 

220 and 'ResponseMetadata' in success_response[1] 

221 ): 

222 # We want to share num retries, not num attempts. 

223 total_retries = attempts - 1 

224 success_response[1]['ResponseMetadata']['RetryAttempts'] = ( 

225 total_retries 

226 ) 

227 if exception is not None: 

228 raise exception 

229 else: 

230 return success_response 

231 

232 def _get_response(self, request, operation_model, context): 

233 # This will return a tuple of (success_response, exception) 

234 # and success_response is itself a tuple of 

235 # (http_response, parsed_dict). 

236 # If an exception occurs then the success_response is None. 

237 # If no exception occurs then exception is None. 

238 success_response, exception = self._do_get_response( 

239 request, operation_model, context 

240 ) 

241 kwargs_to_emit = { 

242 'response_dict': None, 

243 'parsed_response': None, 

244 'context': context, 

245 'exception': exception, 

246 } 

247 if success_response is not None: 

248 http_response, parsed_response = success_response 

249 kwargs_to_emit['parsed_response'] = parsed_response 

250 kwargs_to_emit['response_dict'] = convert_to_response_dict( 

251 http_response, operation_model 

252 ) 

253 service_id = operation_model.service_model.service_id.hyphenize() 

254 self._event_emitter.emit( 

255 f"response-received.{service_id}.{operation_model.name}", 

256 **kwargs_to_emit, 

257 ) 

258 return success_response, exception 

259 

260 def _do_get_response(self, request, operation_model, context): 

261 try: 

262 logger.debug("Sending http request: %s", request) 

263 history_recorder.record( 

264 'HTTP_REQUEST', 

265 { 

266 'method': request.method, 

267 'headers': request.headers, 

268 'streaming': operation_model.has_streaming_input, 

269 'url': request.url, 

270 'body': request.body, 

271 }, 

272 ) 

273 service_id = operation_model.service_model.service_id.hyphenize() 

274 event_name = f"before-send.{service_id}.{operation_model.name}" 

275 responses = self._event_emitter.emit(event_name, request=request) 

276 http_response = first_non_none_response(responses) 

277 if http_response is None: 

278 http_response = self._send(request) 

279 except HTTPClientError as e: 

280 return (None, e) 

281 except Exception as e: 

282 logger.debug( 

283 "Exception received when sending HTTP request.", exc_info=True 

284 ) 

285 return (None, e) 

286 # This returns the http_response and the parsed_data. 

287 response_dict = convert_to_response_dict( 

288 http_response, operation_model 

289 ) 

290 handle_checksum_body( 

291 http_response, 

292 response_dict, 

293 context, 

294 operation_model, 

295 ) 

296 

297 http_response_record_dict = response_dict.copy() 

298 http_response_record_dict['streaming'] = ( 

299 operation_model.has_streaming_output 

300 ) 

301 history_recorder.record('HTTP_RESPONSE', http_response_record_dict) 

302 

303 protocol = operation_model.service_model.resolved_protocol 

304 customized_response_dict = {} 

305 self._event_emitter.emit( 

306 f"before-parse.{service_id}.{operation_model.name}", 

307 operation_model=operation_model, 

308 response_dict=response_dict, 

309 customized_response_dict=customized_response_dict, 

310 ) 

311 parser = self._response_parser_factory.create_parser(protocol) 

312 parsed_response = parser.parse( 

313 response_dict, operation_model.output_shape 

314 ) 

315 parsed_response.update(customized_response_dict) 

316 # Do a second parsing pass to pick up on any modeled error fields 

317 # NOTE: Ideally, we would push this down into the parser classes but 

318 # they currently have no reference to the operation or service model 

319 # The parsers should probably take the operation model instead of 

320 # output shape but we can't change that now 

321 if http_response.status_code >= 300: 

322 self._add_modeled_error_fields( 

323 response_dict, 

324 parsed_response, 

325 operation_model, 

326 parser, 

327 ) 

328 history_recorder.record('PARSED_RESPONSE', parsed_response) 

329 return (http_response, parsed_response), None 

330 

331 def _add_modeled_error_fields( 

332 self, 

333 response_dict, 

334 parsed_response, 

335 operation_model, 

336 parser, 

337 ): 

338 error_code = parsed_response.get("Error", {}).get("Code") 

339 if error_code is None: 

340 return 

341 service_model = operation_model.service_model 

342 error_shape = service_model.shape_for_error_code(error_code) 

343 if error_shape is None: 

344 return 

345 modeled_parse = parser.parse(response_dict, error_shape) 

346 # TODO: avoid naming conflicts with ResponseMetadata and Error 

347 parsed_response.update(modeled_parse) 

348 

349 def _needs_retry( 

350 self, 

351 attempts, 

352 operation_model, 

353 request_dict, 

354 response=None, 

355 caught_exception=None, 

356 ): 

357 service_id = operation_model.service_model.service_id.hyphenize() 

358 event_name = f"needs-retry.{service_id}.{operation_model.name}" 

359 responses = self._event_emitter.emit( 

360 event_name, 

361 response=response, 

362 endpoint=self, 

363 operation=operation_model, 

364 attempts=attempts, 

365 caught_exception=caught_exception, 

366 request_dict=request_dict, 

367 ) 

368 handler_response = first_non_none_response(responses) 

369 if handler_response is None: 

370 return False 

371 else: 

372 # Request needs to be retried, and we need to sleep 

373 # for the specified number of times. 

374 logger.debug( 

375 "Response received to retry, sleeping for %s seconds", 

376 handler_response, 

377 ) 

378 time.sleep(handler_response) 

379 return True 

380 

381 def _send(self, request): 

382 return self.http_session.send(request) 

383 

384 

385class EndpointCreator: 

386 def __init__(self, event_emitter): 

387 self._event_emitter = event_emitter 

388 

389 def create_endpoint( 

390 self, 

391 service_model, 

392 region_name, 

393 endpoint_url, 

394 verify=None, 

395 response_parser_factory=None, 

396 timeout=DEFAULT_TIMEOUT, 

397 max_pool_connections=MAX_POOL_CONNECTIONS, 

398 http_session_cls=URLLib3Session, 

399 proxies=None, 

400 socket_options=None, 

401 client_cert=None, 

402 proxies_config=None, 

403 ): 

404 if not is_valid_endpoint_url( 

405 endpoint_url 

406 ) and not is_valid_ipv6_endpoint_url(endpoint_url): 

407 raise ValueError(f"Invalid endpoint: {endpoint_url}") 

408 

409 if proxies is None: 

410 proxies = self._get_proxies(endpoint_url) 

411 endpoint_prefix = service_model.endpoint_prefix 

412 

413 logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout) 

414 http_session = http_session_cls( 

415 timeout=timeout, 

416 proxies=proxies, 

417 verify=self._get_verify_value(verify), 

418 max_pool_connections=max_pool_connections, 

419 socket_options=socket_options, 

420 client_cert=client_cert, 

421 proxies_config=proxies_config, 

422 ) 

423 

424 return Endpoint( 

425 endpoint_url, 

426 endpoint_prefix=endpoint_prefix, 

427 event_emitter=self._event_emitter, 

428 response_parser_factory=response_parser_factory, 

429 http_session=http_session, 

430 ) 

431 

432 def _get_proxies(self, url): 

433 # We could also support getting proxies from a config file, 

434 # but for now proxy support is taken from the environment. 

435 return get_environ_proxies(url) 

436 

437 def _get_verify_value(self, verify): 

438 # This is to account for: 

439 # https://github.com/kennethreitz/requests/issues/1436 

440 # where we need to honor REQUESTS_CA_BUNDLE because we're creating our 

441 # own request objects. 

442 # First, if verify is not None, then the user explicitly specified 

443 # a value so this automatically wins. 

444 if verify is not None: 

445 return verify 

446 # Otherwise use the value from REQUESTS_CA_BUNDLE, or default to 

447 # True if the env var does not exist. 

448 return os.environ.get('REQUESTS_CA_BUNDLE', True)