1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14
15import logging
16from io import IOBase
17
18from urllib3.exceptions import ProtocolError as URLLib3ProtocolError
19from urllib3.exceptions import ReadTimeoutError as URLLib3ReadTimeoutError
20
21from botocore import (
22 ScalarTypes, # noqa: F401
23 parsers,
24)
25from botocore.compat import (
26 XMLParseError, # noqa: F401
27 set_socket_timeout,
28)
29from botocore.exceptions import (
30 IncompleteReadError,
31 ReadTimeoutError,
32 ResponseStreamingError,
33)
34from botocore.hooks import first_non_none_response # noqa
35
36logger = logging.getLogger(__name__)
37
38
39class StreamingBody(IOBase):
40 """Wrapper class for an http response body.
41
42 This provides a few additional conveniences that do not exist
43 in the urllib3 model:
44
45 * Set the timeout on the socket (i.e read() timeouts)
46 * Auto validation of content length, if the amount of bytes
47 we read does not match the content length, an exception
48 is raised.
49
50 """
51
52 _DEFAULT_CHUNK_SIZE = 1024
53
54 def __init__(self, raw_stream, content_length):
55 self._raw_stream = raw_stream
56 self._content_length = content_length
57 self._amount_read = 0
58
59 def __del__(self):
60 # Extending destructor in order to preserve the underlying raw_stream.
61 # The ability to add custom cleanup logic introduced in Python3.4+.
62 # https://www.python.org/dev/peps/pep-0442/
63 pass
64
65 def set_socket_timeout(self, timeout):
66 """Set the timeout seconds on the socket."""
67 # The problem we're trying to solve is to prevent .read() calls from
68 # hanging. This can happen in rare cases. What we'd like to ideally
69 # do is set a timeout on the .read() call so that callers can retry
70 # the request.
71 # Unfortunately, this isn't currently possible in requests.
72 # See: https://github.com/kennethreitz/requests/issues/1803
73 # So what we're going to do is reach into the guts of the stream and
74 # grab the socket object, which we can set the timeout on. We're
75 # putting in a check here so in case this interface goes away, we'll
76 # know.
77 try:
78 set_socket_timeout(self._raw_stream, timeout)
79 except AttributeError:
80 logger.exception(
81 "Cannot access the socket object of a streaming response. "
82 "It's possible the interface has changed."
83 )
84 raise
85
86 def readable(self):
87 try:
88 return self._raw_stream.readable()
89 except AttributeError:
90 return False
91
92 def read(self, amt=None):
93 """Read at most amt bytes from the stream.
94
95 If the amt argument is omitted, read all data.
96 """
97 try:
98 chunk = self._raw_stream.read(amt)
99 except URLLib3ReadTimeoutError as e:
100 # TODO: the url will be None as urllib3 isn't setting it yet
101 raise ReadTimeoutError(endpoint_url=e.url, error=e)
102 except URLLib3ProtocolError as e:
103 raise ResponseStreamingError(error=e)
104 self._amount_read += len(chunk)
105 if amt is None or (not chunk and amt > 0):
106 # If the server sends empty contents or
107 # we ask to read all of the contents, then we know
108 # we need to verify the content length.
109 self._verify_content_length()
110 return chunk
111
112 def readinto(self, b):
113 """Read bytes into a pre-allocated, writable bytes-like object b, and return the number of bytes read."""
114 try:
115 amount_read = self._raw_stream.readinto(b)
116 except URLLib3ReadTimeoutError as e:
117 # TODO: the url will be None as urllib3 isn't setting it yet
118 raise ReadTimeoutError(endpoint_url=e.url, error=e)
119 except URLLib3ProtocolError as e:
120 raise ResponseStreamingError(error=e)
121 self._amount_read += amount_read
122 if amount_read == 0 and len(b) > 0:
123 # If the server sends empty contents then we know we need to verify
124 # the content length.
125 self._verify_content_length()
126 return amount_read
127
128 def readlines(self):
129 return self._raw_stream.readlines()
130
131 def __iter__(self):
132 """Return an iterator to yield 1k chunks from the raw stream."""
133 return self.iter_chunks(self._DEFAULT_CHUNK_SIZE)
134
135 def __next__(self):
136 """Return the next 1k chunk from the raw stream."""
137 current_chunk = self.read(self._DEFAULT_CHUNK_SIZE)
138 if current_chunk:
139 return current_chunk
140 raise StopIteration()
141
142 def __enter__(self):
143 return self._raw_stream
144
145 def __exit__(self, type, value, traceback):
146 self._raw_stream.close()
147
148 next = __next__
149
150 def iter_lines(self, chunk_size=_DEFAULT_CHUNK_SIZE, keepends=False):
151 """Return an iterator to yield lines from the raw stream.
152
153 This is achieved by reading chunk of bytes (of size chunk_size) at a
154 time from the raw stream, and then yielding lines from there.
155 """
156 pending = b''
157 for chunk in self.iter_chunks(chunk_size):
158 lines = (pending + chunk).splitlines(True)
159 for line in lines[:-1]:
160 yield line.splitlines(keepends)[0]
161 pending = lines[-1]
162 if pending:
163 yield pending.splitlines(keepends)[0]
164
165 def iter_chunks(self, chunk_size=_DEFAULT_CHUNK_SIZE):
166 """Return an iterator to yield chunks of chunk_size bytes from the raw
167 stream.
168 """
169 while True:
170 current_chunk = self.read(chunk_size)
171 if current_chunk == b"":
172 break
173 yield current_chunk
174
175 def _verify_content_length(self):
176 # See: https://github.com/kennethreitz/requests/issues/1855
177 # Basically, our http library doesn't do this for us, so we have
178 # to do this ourself.
179 if self._content_length is not None and self._amount_read != int(
180 self._content_length
181 ):
182 raise IncompleteReadError(
183 actual_bytes=self._amount_read,
184 expected_bytes=int(self._content_length),
185 )
186
187 def tell(self):
188 return self._raw_stream.tell()
189
190 def close(self):
191 """Close the underlying http response stream."""
192 self._raw_stream.close()
193
194
195def get_response(operation_model, http_response):
196 protocol = operation_model.service_model.resolved_protocol
197 response_dict = {
198 'headers': http_response.headers,
199 'status_code': http_response.status_code,
200 }
201 # TODO: Unfortunately, we have to have error logic here.
202 # If it looks like an error, in the streaming response case we
203 # need to actually grab the contents.
204 if response_dict['status_code'] >= 300:
205 response_dict['body'] = http_response.content
206 elif operation_model.has_streaming_output:
207 response_dict['body'] = StreamingBody(
208 http_response.raw, response_dict['headers'].get('content-length')
209 )
210 else:
211 response_dict['body'] = http_response.content
212
213 parser = parsers.create_parser(protocol)
214 return http_response, parser.parse(
215 response_dict, operation_model.output_shape
216 )