1# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# https://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
13import io
14import http.client
15import json as jsonutils
16
17from requests.adapters import HTTPAdapter
18from requests.cookies import MockRequest, MockResponse
19from requests.cookies import RequestsCookieJar
20from requests.cookies import merge_cookies, cookiejar_from_dict
21from requests.utils import get_encoding_from_headers
22from urllib3.response import HTTPResponse
23
24from requests_mock import exceptions
25
26_BODY_ARGS = frozenset(['raw', 'body', 'content', 'text', 'json'])
27_HTTP_ARGS = frozenset([
28 'status_code',
29 'reason',
30 'headers',
31 'cookies',
32 'json_encoder',
33])
34
35_DEFAULT_STATUS = 200
36_http_adapter = HTTPAdapter()
37
38
39class CookieJar(RequestsCookieJar):
40
41 def set(self, name, value, **kwargs):
42 """Add a cookie to the Jar.
43
44 :param str name: cookie name/key.
45 :param str value: cookie value.
46 :param int version: Integer or None. Netscape cookies have version 0.
47 RFC 2965 and RFC 2109 cookies have a version cookie-attribute of 1.
48 However, note that cookielib may 'downgrade' RFC 2109 cookies to
49 Netscape cookies, in which case version is 0.
50 :param str port: String representing a port or a set of ports
51 (eg. '80', or '80,8080'),
52 :param str domain: The domain the cookie should apply to.
53 :param str path: Cookie path (a string, eg. '/acme/rocket_launchers').
54 :param bool secure: True if cookie should only be returned over a
55 secure connection.
56 :param int expires: Integer expiry date in seconds since epoch or None.
57 :param bool discard: True if this is a session cookie.
58 :param str comment: String comment from the server explaining the
59 function of this cookie.
60 :param str comment_url: URL linking to a comment from the server
61 explaining the function of this cookie.
62 """
63 # just here to provide the function documentation
64 return super(CookieJar, self).set(name, value, **kwargs)
65
66
67def _check_body_arguments(**kwargs):
68 # mutual exclusion, only 1 body method may be provided
69 provided = [x for x in _BODY_ARGS if kwargs.pop(x, None) is not None]
70
71 if len(provided) > 1:
72 raise RuntimeError('You may only supply one body element. You '
73 'supplied %s' % ', '.join(provided))
74
75 extra = [x for x in kwargs if x not in _HTTP_ARGS]
76
77 if extra:
78 raise TypeError('Too many arguments provided. Unexpected '
79 'arguments %s.' % ', '.join(extra))
80
81
82class _FakeConnection(object):
83 """An object that can mock the necessary parts of a socket interface."""
84
85 def send(self, request, **kwargs):
86 msg = 'This response was created without a connection. You are ' \
87 'therefore unable to make a request directly on that connection.'
88 raise exceptions.InvalidRequest(msg)
89
90 def close(self):
91 pass
92
93
94def _extract_cookies(request, response, cookies):
95 """Add cookies to the response.
96
97 Cookies in requests are extracted from the headers in the original_response
98 httplib.HTTPMessage which we don't create so we have to do this step
99 manually.
100 """
101 # This will add cookies set manually via the Set-Cookie or Set-Cookie2
102 # header but this only allows 1 cookie to be set.
103 response.cookies.extract_cookies(MockResponse(response.raw.headers),
104 MockRequest(request))
105
106 # This allows you to pass either a CookieJar or a dictionary to request_uri
107 # or directly to create_response. To allow more than one cookie to be set.
108 if cookies:
109 merge_cookies(response.cookies, cookies)
110
111
112class _IOReader(io.BytesIO):
113 """A reader that makes a BytesIO look like a HTTPResponse.
114
115 A HTTPResponse will return an empty string when you read from it after
116 the socket has been closed. A BytesIO will raise a ValueError. For
117 compatibility we want to do the same thing a HTTPResponse does.
118 """
119
120 def read(self, *args, **kwargs):
121 if self.closed:
122 return b''
123
124 # if the file is open, but you asked for zero bytes read you should get
125 # back zero without closing the stream.
126 if len(args) > 0 and args[0] == 0:
127 return b''
128
129 result = io.BytesIO.read(self, *args, **kwargs)
130
131 # when using resp.iter_content(None) it'll go through a different
132 # request path in urllib3. This path checks whether the object is
133 # marked closed instead of the return value. see gh124.
134 if result == b'':
135 self.close()
136
137 return result
138
139
140def create_response(request, **kwargs):
141 """
142 :param int status_code: The status code to return upon a successful
143 match. Defaults to 200.
144 :param HTTPResponse raw: A HTTPResponse object to return upon a
145 successful match.
146 :param io.IOBase body: An IO object with a read() method that can
147 return a body on successful match.
148 :param bytes content: A byte string to return upon a successful match.
149 :param unicode text: A text string to return upon a successful match.
150 :param object json: A python object to be converted to a JSON string
151 and returned upon a successful match.
152 :param class json_encoder: Encoder object to use for JOSON.
153 :param dict headers: A dictionary object containing headers that are
154 returned upon a successful match.
155 :param CookieJar cookies: A cookie jar with cookies to set on the
156 response.
157
158 :returns requests.Response: A response object that can
159 be returned to requests.
160 """
161 connection = kwargs.pop('connection', _FakeConnection())
162
163 _check_body_arguments(**kwargs)
164
165 raw = kwargs.pop('raw', None)
166 body = kwargs.pop('body', None)
167 content = kwargs.pop('content', None)
168 text = kwargs.pop('text', None)
169 json = kwargs.pop('json', None)
170 headers = kwargs.pop('headers', {})
171 encoding = None
172
173 if content is not None and not isinstance(content, bytes):
174 raise TypeError('Content should be binary data')
175 if text is not None and not isinstance(text, str):
176 raise TypeError('Text should be string data')
177
178 if json is not None:
179 encoder = kwargs.pop('json_encoder', None) or jsonutils.JSONEncoder
180 text = jsonutils.dumps(json, cls=encoder)
181 if text is not None:
182 encoding = get_encoding_from_headers(headers) or 'utf-8'
183 content = text.encode(encoding)
184 if content is not None:
185 body = _IOReader(content)
186 if not raw:
187 status = kwargs.get('status_code', _DEFAULT_STATUS)
188 reason = kwargs.get('reason', http.client.responses.get(status))
189
190 raw = HTTPResponse(status=status,
191 reason=reason,
192 headers=headers,
193 body=body or _IOReader(b''),
194 decode_content=False,
195 enforce_content_length=False,
196 preload_content=False,
197 original_response=None)
198
199 response = _http_adapter.build_response(request, raw)
200 response.connection = connection
201
202 if encoding and not response.encoding:
203 response.encoding = encoding
204
205 _extract_cookies(request, response, kwargs.get('cookies'))
206
207 return response
208
209
210class _Context(object):
211 """Stores the data being used to process a current URL match."""
212
213 def __init__(self, headers, status_code, reason, cookies):
214 self.headers = headers
215 self.status_code = status_code
216 self.reason = reason
217 self.cookies = cookies
218
219
220class _MatcherResponse(object):
221
222 def __init__(self, **kwargs):
223 self._exc = kwargs.pop('exc', None)
224
225 # If the user is asking for an exception to be thrown then prevent them
226 # specifying any sort of body or status response as it won't be used.
227 # This may be protecting the user too much but can be removed later.
228 if self._exc and kwargs:
229 raise TypeError('Cannot provide other arguments with exc.')
230
231 _check_body_arguments(**kwargs)
232 self._params = kwargs
233
234 # whilst in general you shouldn't do type checking in python this
235 # makes sure we don't end up with differences between the way types
236 # are handled between python 2 and 3.
237 content = self._params.get('content')
238 text = self._params.get('text')
239
240 if content is not None and not (callable(content) or
241 isinstance(content, bytes)):
242 raise TypeError('Content should be a callback or binary data')
243
244 if text is not None and not (callable(text) or
245 isinstance(text, str)):
246 raise TypeError('Text should be a callback or string data')
247
248 def get_response(self, request):
249 # if an error was requested then raise that instead of doing response
250 if self._exc:
251 raise self._exc
252
253 # If a cookie dict is passed convert it into a CookieJar so that the
254 # cookies object available in a callback context is always a jar.
255 cookies = self._params.get('cookies', CookieJar())
256 if isinstance(cookies, dict):
257 cookies = cookiejar_from_dict(cookies, CookieJar())
258
259 context = _Context(self._params.get('headers', {}).copy(),
260 self._params.get('status_code', _DEFAULT_STATUS),
261 self._params.get('reason'),
262 cookies)
263
264 # if a body element is a callback then execute it
265 def _call(f, *args, **kwargs):
266 return f(request, context, *args, **kwargs) if callable(f) else f
267
268 return create_response(request,
269 json=_call(self._params.get('json')),
270 text=_call(self._params.get('text')),
271 content=_call(self._params.get('content')),
272 body=_call(self._params.get('body')),
273 raw=_call(self._params.get('raw')),
274 json_encoder=self._params.get('json_encoder'),
275 status_code=context.status_code,
276 reason=context.reason,
277 headers=context.headers,
278 cookies=context.cookies)