Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpretty/core.py: 52%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# <HTTPretty - HTTP client mock for Python>
2# Copyright (C) <2011-2021> Gabriel Falcão <gabriel@nacaolivre.org>
3#
4# Permission is hereby granted, free of charge, to any person
5# obtaining a copy of this software and associated documentation
6# files (the "Software"), to deal in the Software without
7# restriction, including without limitation the rights to use,
8# copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the
10# Software is furnished to do so, subject to the following
11# conditions:
12#
13# The above copyright notice and this permission notice shall be
14# included in all copies or substantial portions of the Software.
15#
16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
18# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
21# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23# OTHER DEALINGS IN THE SOFTWARE.
25import io
26import time
27import codecs
28import contextlib
29import functools
30import hashlib
31import inspect
32import logging
33import itertools
34import json
35import types
36import re
37import socket
38import tempfile
39import threading
40import traceback
41import warnings
43from functools import partial
44from typing import Callable
46from .compat import (
47 BaseClass,
48 BaseHTTPRequestHandler,
49 quote,
50 quote_plus,
51 urlencode,
52 encode_obj,
53 urlunsplit,
54 urlsplit,
55 parse_qs,
56 unquote_utf8,
57)
58from .http import (
59 STATUSES,
60 HttpBaseClass,
61 parse_requestline,
62 last_requestline,
63)
65from .utils import (
66 utf8,
67 decode_utf8,
68)
70from .errors import HTTPrettyError, UnmockedError
72from datetime import datetime
73from datetime import timedelta
74from errno import EAGAIN
76class __internals__:
77 thread_timeout = 0.1 # https://github.com/gabrielfalcao/HTTPretty/issues/430
78 temp_files = []
79 threads = []
81 @classmethod
82 def cleanup_sockets(cls):
83 cls.cleanup_temp_files()
84 cls.cleanup_threads()
86 @classmethod
87 def cleanup_threads(cls):
88 for t in cls.threads:
89 t.join(cls.thread_timeout)
90 if t.is_alive():
91 raise socket.timeout(cls.thread_timeout)
93 @classmethod
94 def create_thread(cls, *args, **kwargs):
95 return threading.Thread(*args, **kwargs)
97 @classmethod
98 def cleanup_temp_files(cls):
99 for fd in cls.temp_files[:]:
100 try:
101 fd.close()
102 except Exception as e:
103 logger.debug('error closing file {}: {}'.format(fd, e))
104 cls.temp_files.remove(fd)
106 @classmethod
107 def create_temp_file(cls):
108 fd = tempfile.TemporaryFile()
109 cls.temp_files.append(fd)
110 return fd
112def set_default_thread_timeout(timeout):
113 """sets the default thread timeout for HTTPretty threads
115 :param timeout: int
116 """
117 __internals__.thread_timeout = timeout
119def get_default_thread_timeout():
120 """sets the default thread timeout for HTTPretty threads
122 :returns: int
123 """
125 return __internals__.thread_timeout
128SOCKET_GLOBAL_DEFAULT_TIMEOUT = socket._GLOBAL_DEFAULT_TIMEOUT
129old_socket = socket.socket
130old_socketpair = getattr(socket, 'socketpair', None)
131old_SocketType = socket.SocketType
132old_create_connection = socket.create_connection
133old_gethostbyname = socket.gethostbyname
134old_gethostname = socket.gethostname
135old_getaddrinfo = socket.getaddrinfo
136old_socksocket = None
137old_ssl_wrap_socket = None
138old_sslwrap_simple = None
139old_sslsocket = None
140old_sslcontext_wrap_socket = None
141old_sslcontext = None
143MULTILINE_ANY_REGEX = re.compile(r'.*', re.M)
144hostname_re = re.compile(r'\^?(?:https?://)?[^:/]*[:/]?')
147logger = logging.getLogger(__name__)
149try: # pragma: no cover
150 import socks
151 old_socksocket = socks.socksocket
152except ImportError:
153 socks = None
155try: # pragma: no cover
156 import ssl
157 old_sslcontext_class = ssl.SSLContext
158 old_sslcontext = ssl.create_default_context()
159 old_ssl_wrap_socket = old_sslcontext.wrap_socket
160 try:
161 old_sslcontext_wrap_socket = ssl.SSLContext.wrap_socket
162 except AttributeError:
163 pass
164 old_sslsocket = ssl.SSLSocket
165except ImportError: # pragma: no cover
166 ssl = None
168try:
169 import _ssl
170except ImportError:
171 _ssl = None
172# used to handle error caused by ndg-httpsclient
173pyopenssl_overrides_inject = []
174pyopenssl_overrides_extract = []
175try:
176 from requests.packages.urllib3.contrib.pyopenssl import inject_into_urllib3, extract_from_urllib3
177 pyopenssl_overrides_extract.append(extract_from_urllib)
178 pyopenssl_overrides_inject.append(inject_from_urllib)
179except Exception:
180 pass
184try:
185 from urllib3.contrib.pyopenssl import extract_from_urllib3, inject_into_urllib3
186 pyopenssl_overrides_extract.append(extract_from_urllib)
187 pyopenssl_overrides_inject.append(inject_from_urllib)
188except Exception:
189 pass
192try:
193 import requests.packages.urllib3.connection as requests_urllib3_connection
194 old_requests_ssl_wrap_socket = requests_urllib3_connection.ssl_wrap_socket
195except ImportError:
196 requests_urllib3_connection = None
197 old_requests_ssl_wrap_socket = None
199try:
200 import eventlet
201 import eventlet.green
202except ImportError:
203 eventlet = None
205DEFAULT_HTTP_PORTS = frozenset([80])
206POTENTIAL_HTTP_PORTS = set(DEFAULT_HTTP_PORTS)
207DEFAULT_HTTPS_PORTS = frozenset([443])
208POTENTIAL_HTTPS_PORTS = set(DEFAULT_HTTPS_PORTS)
212def FALLBACK_FUNCTION(x):
213 return x
216class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
217 r"""Represents a HTTP request. It takes a valid multi-line,
218 ``\r\n`` separated string with HTTP headers and parse them out using
219 the internal `parse_request` method.
221 It also replaces the `rfile` and `wfile` attributes with :py:class:`io.BytesIO`
222 instances so that we guarantee that it won't make any I/O, neither
223 for writing nor reading.
225 It has some convenience attributes:
227 ``headers`` -> a mimetype object that can be cast into a dictionary,
228 contains all the request headers
230 ``protocol`` -> the protocol of this host, inferred from the port
231 of the underlying fake TCP socket.
233 ``host`` -> the hostname of this request.
235 ``url`` -> the full url of this request.
237 ``path`` -> the path of the request.
239 ``method`` -> the HTTP method used in this request.
241 ``querystring`` -> a dictionary containing lists with the
242 attributes. Please notice that if you need a single value from a
243 query string you will need to get it manually like:
245 ``body`` -> the request body as a string.
247 ``parsed_body`` -> the request body parsed by ``parse_request_body``.
249 .. testcode::
251 >>> request.querystring
252 {'name': ['Gabriel Falcao']}
253 >>> print request.querystring['name'][0]
255 """
256 def __init__(self, headers, body='', sock=None, path_encoding = 'iso-8859-1'):
257 # first of all, lets make sure that if headers or body are
258 # unicode strings, it must be converted into a utf-8 encoded
259 # byte string
260 self.created_at = time.time()
261 self.raw_headers = utf8(headers.strip())
262 self._body = utf8(body)
263 self.connection = sock
264 # Now let's concatenate the headers with the body, and create
265 # `rfile` based on it
266 self.rfile = io.BytesIO(b'\r\n\r\n'.join([self.raw_headers, self.body]))
268 # Creating `wfile` as an empty BytesIO, just to avoid any
269 # real I/O calls
270 self.wfile = io.BytesIO()
272 # parsing the request line preemptively
273 self.raw_requestline = self.rfile.readline()
275 # initiating the error attributes with None
276 self.error_code = None
277 self.error_message = None
279 # Parse the request based on the attributes above
280 if not self.parse_request():
281 return
283 # Now 2 convenient attributes for the HTTPretty API:
285 # - `path`
286 # - `querystring` holds a dictionary with the parsed query string
287 # - `parsed_body` a string
288 try:
289 self.path = self.path.encode(path_encoding)
290 except UnicodeDecodeError:
291 pass
293 self.path = decode_utf8(self.path)
295 qstring = self.path.split("?", 1)[-1]
296 self.querystring = self.parse_querystring(qstring)
298 # And the body will be attempted to be parsed as
299 # `application/json` or `application/x-www-form-urlencoded`
300 """a dictionary containing parsed request body or None if
301 HTTPrettyRequest doesn't know how to parse it. It currently
302 supports parsing body data that was sent under the
303 ``content`-type` headers values: ``application/json`` or
304 ``application/x-www-form-urlencoded``
305 """
306 self.parsed_body = self.parse_request_body(self._body)
308 @property
309 def method(self):
310 """the HTTP method used in this request"""
311 return self.command
313 @property
314 def protocol(self):
315 """the protocol used in this request"""
316 proto = ''
317 if not self.connection:
318 return ''
319 elif self.connection.is_http:
320 proto = 'http'
322 if self.connection.is_secure:
323 proto = 'https'
325 return proto
327 @property
328 def body(self):
329 return self._body
331 @body.setter
332 def body(self, value):
333 self._body = utf8(value)
335 # And the body will be attempted to be parsed as
336 # `application/json` or `application/x-www-form-urlencoded`
337 self.parsed_body = self.parse_request_body(self._body)
339 def __nonzero__(self):
340 return bool(self.body) or bool(self.raw_headers)
342 @property
343 def url(self):
344 """the full url of this recorded request"""
345 return "{}://{}{}".format(self.protocol, self.host, self.path)
347 @property
348 def host(self):
349 return self.headers.get('Host') or '<unknown>'
351 def __str__(self):
352 tmpl = '<HTTPrettyRequest("{}", "{}", headers={}, body={})>'
353 return tmpl.format(
354 self.method,
355 self.url,
356 dict(self.headers),
357 len(self.body),
358 )
360 def parse_querystring(self, qs):
361 """parses an UTF-8 encoded query string into a dict of string lists
363 :param qs: a querystring
364 :returns: a dict of lists
366 """
367 expanded = unquote_utf8(qs)
368 parsed = parse_qs(expanded)
369 result = {}
370 for k in parsed:
371 result[k] = list(map(decode_utf8, parsed[k]))
373 return result
375 def parse_request_body(self, body):
376 """Attempt to parse the post based on the content-type passed.
377 Return the regular body if not
379 :param body: string
380 :returns: a python object such as dict or list in case the deserialization suceeded. Else returns the given param ``body``
381 """
383 PARSING_FUNCTIONS = {
384 'application/json': json.loads,
385 'text/json': json.loads,
386 'application/x-www-form-urlencoded': self.parse_querystring,
387 }
389 content_type = self.headers.get('content-type', '')
391 do_parse = PARSING_FUNCTIONS.get(content_type, FALLBACK_FUNCTION)
392 try:
393 body = decode_utf8(body)
394 return do_parse(body)
395 except Exception:
396 return body
399class EmptyRequestHeaders(dict):
400 """A dict subclass used as internal representation of empty request
401 headers
402 """
405class HTTPrettyRequestEmpty(object):
406 """Represents an empty :py:class:`~httpretty.core.HTTPrettyRequest`
407 where all its properties are somehow empty or ``None``
408 """
410 method = None
411 url = None
412 body = ''
413 headers = EmptyRequestHeaders()
417class FakeSockFile(object):
418 """Fake socket file descriptor. Under the hood all data is written in
419 a temporary file, giving it a real file descriptor number.
421 """
422 def __init__(self):
423 self.file = None
424 self._fileno = None
425 self.__closed__ = None
426 self.reset()
428 def reset(self):
429 if self.file:
430 try:
431 self.file.close()
432 except Exception as e:
433 logger.debug('error closing file {}: {}'.format(self.file, e))
434 self.file = None
436 self.file = __internals__.create_temp_file()
437 self._fileno = self.file.fileno()
438 self.__closed__ = False
440 def getvalue(self):
441 if hasattr(self.file, 'getvalue'):
442 value = self.file.getvalue()
443 else:
444 value = self.file.read()
445 self.file.seek(0)
446 return value
448 def close(self):
449 if self.__closed__:
450 return
451 self.__closed__ = True
452 self.flush()
454 def flush(self):
455 try:
456 super().flush()
457 except Exception as e:
458 logger.debug('error closing file {}: {}'.format(self, e))
460 try:
461 self.file.flush()
462 except Exception as e:
463 logger.debug('error closing file {}: {}'.format(self.file, e))
467 def fileno(self):
468 return self._fileno
470 def __getattr__(self, name):
471 try:
472 return getattr(self.file, name)
473 except AttributeError:
474 return super().__getattribute__(name)
476 def __del__(self):
477 try:
478 self.close()
479 except (ValueError, AttributeError):
480 pass
482 # Adding the line below as a potential fix of github issue #426
483 # that seems to be a compatible the solution of #413
484 self.file.close()
488class FakeSSLSocket(object):
489 """Shorthand for :py:class:`~httpretty.core.fakesock`
490 """
491 def __init__(self, sock, *args, **kw):
492 self._httpretty_sock = sock
494 def __getattr__(self, attr):
495 return getattr(self._httpretty_sock, attr)
498class FakeAddressTuple(object):
499 def __init__(self, fakesocket):
500 self.fakesocket = fakesocket
502 def __getitem__(self, *args, **kw):
503 raise AssertionError('socket {} is not connected'.format(self.fakesocket.truesock))
506def fake_socketpair(*args, **kw):
507 with restored_libs():
508 return old_socketpair(*args, **kw)
510class fakesock(object):
511 """
512 fake :py:mod:`socket`
513 """
514 class socket(object):
515 """drop-in replacement for :py:class:`socket.socket`
516 """
517 _entry = None
518 _read_buf = None
520 debuglevel = 0
521 _sent_data = []
522 is_secure = False
523 def __init__(
524 self,
525 family=socket.AF_INET,
526 type=socket.SOCK_STREAM,
527 proto=0,
528 fileno=None
529 ):
530 self.socket_family = family
531 self.socket_type = type
532 self.socket_proto = proto
533 if httpretty.allow_net_connect:
534 self.truesock = self.create_socket()
535 else:
536 self.truesock = None
538 self._address = FakeAddressTuple(self)
539 self.__truesock_is_connected__ = False
540 self.fd = FakeSockFile()
541 self.fd.socket = fileno or self
542 self.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
543 self._sock = fileno or self
544 self.is_http = False
545 self._bufsize = 32 * 1024
547 def __repr__(self):
548 return '{self.__class__.__module__}.{self.__class__.__name__}("{self.host}")'.format(**locals())
550 @property
551 def host(self):
552 return ":".join(map(str, self._address))
554 def create_socket(self, address=None):
555 return old_socket(self.socket_family, self.socket_type, self.socket_proto)
557 def getpeercert(self, *a, **kw):
558 now = datetime.now()
559 shift = now + timedelta(days=30 * 12)
560 return {
561 'notAfter': shift.strftime('%b %d %H:%M:%S GMT'),
562 'subjectAltName': (
563 ('DNS', '*.%s' % self._host),
564 ('DNS', self._host),
565 ('DNS', '*'),
566 ),
567 'subject': (
568 (
569 ('organizationName', '*.%s' % self._host),
570 ),
571 (
572 ('organizationalUnitName',
573 'Domain Control Validated'),
574 ),
575 (
576 ('commonName', '*.%s' % self._host),
577 ),
578 ),
579 }
581 def ssl(self, sock, *args, **kw):
582 return sock
584 def setsockopt(self, level, optname, value):
585 if httpretty.allow_net_connect and not self.truesock:
586 self.truesock = self.create_socket()
587 elif not self.truesock:
588 logger.debug('setsockopt(%s, %s, %s) failed', level, optname, value)
589 return
591 return self.truesock.setsockopt(level, optname, value)
593 def connect(self, address):
594 try:
595 self._address = (self._host, self._port) = address
596 except ValueError:
597 # We get here when the address is just a string pointing to a
598 # unix socket path/file
599 #
600 # See issue #206
601 self.is_http = False
602 else:
603 ports_to_check = (
604 POTENTIAL_HTTP_PORTS.union(POTENTIAL_HTTPS_PORTS))
605 self.is_http = self._port in ports_to_check
606 self.is_secure = self._port in POTENTIAL_HTTPS_PORTS
608 if not self.is_http:
609 self.connect_truesock(address=address)
610 elif self.truesock and not self.real_socket_is_connected():
611 # TODO: remove nested if
612 matcher = httpretty.match_http_address(self._host, self._port)
613 if matcher is None:
614 self.connect_truesock(address=address)
616 def bind(self, address):
617 self._address = (self._host, self._port) = address
618 if self.truesock:
619 self.bind_truesock(address)
621 def bind_truesock(self, address):
622 if httpretty.allow_net_connect and not self.truesock:
623 self.truesock = self.create_socket()
624 elif not self.truesock:
625 raise UnmockedError('Failed to socket.bind() because because a real socket was never created.', address=address)
627 return self.truesock.bind(address)
629 def connect_truesock(self, request=None, address=None):
630 address = address or self._address
632 if self.__truesock_is_connected__:
633 return self.truesock
635 if request:
636 logger.warning('real call to socket.connect() for {request}'.format(**locals()))
637 elif address:
638 logger.warning('real call to socket.connect() for {address}'.format(**locals()))
639 else:
640 logger.warning('real call to socket.connect()')
642 if httpretty.allow_net_connect and not self.truesock:
643 self.truesock = self.create_socket(address)
644 elif not self.truesock:
645 raise UnmockedError('Failed to socket.connect() because because a real socket was never created.', request=request, address=address)
647 undo_patch_socket()
648 try:
649 hostname = self._address[0]
650 port = 80
651 if len(self._address) == 2:
652 port = self._address[1]
653 if port == 443 and old_sslsocket:
654 self.truesock = old_ssl_wrap_socket(self.truesock, server_hostname=hostname)
656 sock = self.truesock
658 sock.connect(self._address)
659 self.__truesock_is_connected__ = True
660 self.truesock = sock
661 finally:
662 apply_patch_socket()
664 return self.truesock
666 def real_socket_is_connected(self):
667 return self.__truesock_is_connected__
669 def fileno(self):
670 if self.truesock:
671 return self.truesock.fileno()
672 return self.fd.fileno()
674 def close(self):
675 if self.truesock:
676 self.truesock.close()
677 self.truesock = None
678 self.__truesock_is_connected__ = False
680 def makefile(self, mode='r', bufsize=-1):
681 """Returns this fake socket's own tempfile buffer.
683 If there is an entry associated with the socket, the file
684 descriptor gets filled in with the entry data before being
685 returned.
686 """
687 self._mode = mode
688 self._bufsize = bufsize
690 if self._entry:
691 t = __internals__.create_thread(
692 target=self._entry.fill_filekind, args=(self.fd,)
693 )
695 # execute body callback and send http response in a
696 # thread, wait for thread to finish within the timeout
697 # set via socket.settimeout()
698 t.start()
699 if self.timeout == SOCKET_GLOBAL_DEFAULT_TIMEOUT:
700 timeout = get_default_thread_timeout()
701 else:
702 timeout = self.timeout
704 # fake socket timeout error by checking if the thread
705 # finished in time.
706 t.join(timeout)
707 if t.is_alive():
708 # For more info check issue https://github.com/gabrielfalcao/HTTPretty/issues/430
709 raise socket.timeout(timeout)
711 return self.fd
713 def real_sendall(self, data, *args, **kw):
714 """Sends data to the remote server. This method is called
715 when HTTPretty identifies that someone is trying to send
716 non-http data.
718 The received bytes are written in this socket's tempfile
719 buffer so that HTTPretty can return it accordingly when
720 necessary.
721 """
722 request = kw.pop('request', None)
723 if request:
724 bytecount = len(data)
725 logger.warning('{self}.real_sendall({bytecount} bytes) to {request.url} via {request.method} at {request.created_at}'.format(**locals()))
727 if httpretty.allow_net_connect and not self.truesock:
729 self.connect_truesock(request=request)
730 elif not self.truesock:
731 raise UnmockedError(request=request)
733 if not self.is_http:
734 self.truesock.setblocking(1)
735 return self.truesock.sendall(data, *args, **kw)
737 sock = self.connect_truesock(request=request)
739 sock.setblocking(1)
740 sock.sendall(data, *args, **kw)
742 should_continue = True
743 while should_continue:
744 try:
745 received = sock.recv(self._bufsize)
746 self.fd.write(received)
747 should_continue = bool(received.strip())
749 except socket.error as e:
750 if e.errno == EAGAIN:
751 continue
752 break
754 self.fd.seek(0)
756 def sendall(self, data, *args, **kw):
757 # if self.__truesock_is_connected__:
758 # return self.truesock.sendall(data, *args, **kw)
760 self._sent_data.append(data)
761 self.fd = FakeSockFile()
762 self.fd.socket = self
763 if isinstance(data, str):
764 data = data.encode('utf-8')
765 elif not isinstance(data, bytes):
766 logger.debug('cannot sendall({data!r})')
767 data = bytes(data)
769 try:
770 requestline, _ = data.split(b'\r\n', 1)
771 method, path, version = parse_requestline(
772 decode_utf8(requestline))
773 is_parsing_headers = True
774 except ValueError:
775 path = ''
776 is_parsing_headers = False
778 if self._entry is None:
779 # If the previous request wasn't mocked, don't
780 # mock the subsequent sending of data
781 return self.real_sendall(data, *args, **kw)
782 else:
783 method = self._entry.method
784 path = self._entry.info.path
786 self.fd.seek(0)
788 if not is_parsing_headers:
789 if len(self._sent_data) > 1:
790 headers = utf8(last_requestline(self._sent_data))
791 meta = self._entry.request.headers
792 body = utf8(self._sent_data[-1])
793 if meta.get('transfer-encoding', '') == 'chunked':
794 if not body.isdigit() and (body != b'\r\n') and (body != b'0\r\n\r\n'):
795 self._entry.request.body += body
796 else:
797 self._entry.request.body += body
799 httpretty.historify_request(headers, body, sock=self, append=False)
800 return
802 if path[:2] == '//':
803 path = '//' + path
804 # path might come with
805 s = urlsplit(path)
806 POTENTIAL_HTTP_PORTS.add(int(s.port or 80))
807 parts = list(map(utf8, data.split(b'\r\n\r\n', 1)))
808 if len(parts) == 2:
809 headers, body = parts
810 else:
811 headers = ''
812 body = data
814 request = httpretty.historify_request(headers, body, sock=self)
816 info = URIInfo(
817 hostname=self._host,
818 port=self._port,
819 path=s.path,
820 query=s.query,
821 last_request=request
822 )
824 matcher, entries = httpretty.match_uriinfo(info)
826 if not entries:
827 logger.debug('no entries matching {}'.format(request))
828 self._entry = None
829 self._read_buf = None
830 self.real_sendall(data, request=request)
831 return
833 self._entry = matcher.get_next_entry(method, info, request)
835 def forward_and_trace(self, function_name, *a, **kw):
836 if not self.truesock:
837 raise UnmockedError('Failed to socket.{}() because because a real socket was never created.'.format(function_name))
839 callback = getattr(self.truesock, function_name)
840 return callback(*a, **kw)
842 def settimeout(self, new_timeout):
843 self.timeout = new_timeout
844 if not self.is_http:
845 if self.truesock:
846 self.truesock.settimeout(new_timeout)
848 def send(self, data, *args, **kwargs):
849 self.sendall(data, *args, **kwargs)
850 return len(data)
852 def sendto(self, *args, **kwargs):
853 return self.forward_and_trace('sendto', *args, **kwargs)
855 def recvfrom_into(self, *args, **kwargs):
856 return self.forward_and_trace('recvfrom_into', *args, **kwargs)
858 def recv_into(self, *args, **kwargs):
859 return self.forward_and_trace('recv_into', *args, **kwargs)
861 def recvfrom(self, *args, **kwargs):
862 return self.forward_and_trace('recvfrom', *args, **kwargs)
864 def recv(self, buffersize=0, *args, **kwargs):
865 if not self._read_buf:
866 self._read_buf = io.BytesIO()
868 if self._entry:
869 self._entry.fill_filekind(self._read_buf)
871 if not self._read_buf:
872 raise UnmockedError('socket cannot recv(): {!r}'.format(self))
874 return self._read_buf.read(buffersize)
876 def __getattr__(self, name):
877 if name in ('getsockopt', 'selected_alpn_protocol') and not self.truesock:
878 self.truesock = self.create_socket()
879 elif httpretty.allow_net_connect and not self.truesock:
880 # can't call self.connect_truesock() here because we
881 # don't know if user wants to execute server of client
882 # calls (or can they?)
883 self.truesock = self.create_socket()
884 elif not self.truesock:
885 # Special case for
886 # `hasattr(sock, "version")` call added in urllib3>=1.26.
887 if name == 'version':
888 raise AttributeError(
889 "HTTPretty synthesized this error to fix urllib3 compatibility "
890 "(see issue https://github.com/gabrielfalcao/HTTPretty/issues/409). "
891 "Please open an issue if this error causes further unexpected issues."
892 )
894 raise UnmockedError('Failed to socket.{} because because a real socket does not exist'.format(name))
896 return getattr(self.truesock, name)
898def with_socket_is_secure(sock, kw):
899 sock.is_secure = True
900 sock.kwargs = kw
901 for k, v in kw.items():
902 setattr(sock, k, v)
903 return sock
905def fake_wrap_socket(orig_wrap_socket_fn, *args, **kw):
906 """drop-in replacement for py:func:`ssl.wrap_socket`
907 """
908 if 'sock' in kw:
909 sock = kw['sock']
910 else:
911 sock = args[0]
913 server_hostname = kw.get('server_hostname')
914 if server_hostname is not None:
915 matcher = httpretty.match_https_hostname(server_hostname)
916 if matcher is None:
917 logger.debug('no requests registered for hostname: "{}"'.format(server_hostname))
918 return with_socket_is_secure(sock, kw)
920 return with_socket_is_secure(sock, kw)
923def create_fake_connection(
924 address,
925 timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
926 source_address=None):
927 """drop-in replacement for :py:func:`socket.create_connection`"""
928 s = fakesock.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
929 if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
930 s.settimeout(timeout)
932 if isinstance(source_address, tuple) and len(source_address) == 2:
933 source_address[1] = int(source_address[1])
935 if source_address:
936 s.bind(source_address)
937 s.connect(address)
938 return s
941def fake_gethostbyname(host):
942 """drop-in replacement for :py:func:`socket.gethostbyname`"""
943 return '127.0.0.1'
946def fake_gethostname():
947 """drop-in replacement for :py:func:`socket.gethostname`"""
948 return 'localhost'
951def fake_getaddrinfo(
952 host, port, family=None, socktype=None, proto=None, flags=None):
953 """drop-in replacement for :py:func:`socket.getaddrinfo`"""
954 return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP,
955 '', (host, port)),
956 (socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP,
957 '', (host, port))]
960class Entry(BaseClass):
961 """Created by :py:meth:`~httpretty.core.httpretty.register_uri` and
962 stored in memory as internal representation of a HTTP
963 request/response definition.
965 Args:
966 method (str): One of ``httpretty.GET``, ``httpretty.PUT``, ``httpretty.POST``, ``httpretty.DELETE``, ``httpretty.HEAD``, ``httpretty.PATCH``, ``httpretty.OPTIONS``, ``httpretty.CONNECT``.
967 uri (str|re.Pattern): The URL to match
968 adding_headers (dict): Extra headers to be added to the response
969 forcing_headers (dict): Overwrite response headers.
970 status (int): The status code for the response, defaults to ``200``.
971 streaming (bool): Whether should stream the response into chunks via generator.
972 headers: Headers to inject in the faked response.
974 Returns:
975 httpretty.Entry: containing the request-matching metadata.
978 .. warning:: When using the ``forcing_headers`` option make sure to add the header ``Content-Length`` to match at most the total body length, otherwise some HTTP clients can hang indefinitely.
979 """
980 def __init__(self, method, uri, body,
981 adding_headers=None,
982 forcing_headers=None,
983 status=200,
984 streaming=False,
985 **headers):
987 self.method = method
988 self.uri = uri
989 self.info = None
990 self.request = None
992 self.body_is_callable = False
993 if hasattr(body, "__call__"):
994 self.callable_body = body
995 self.body = None
996 self.body_is_callable = True
997 elif isinstance(body, str):
998 self.body = utf8(body)
999 else:
1000 self.body = body
1002 self.streaming = streaming
1003 if not streaming and not self.body_is_callable:
1004 self.body_length = len(self.body or '')
1005 else:
1006 self.body_length = 0
1008 self.adding_headers = adding_headers or {}
1009 self.forcing_headers = forcing_headers or {}
1010 self.status = int(status)
1012 for k, v in headers.items():
1013 name = "-".join(k.split("_")).title()
1014 self.adding_headers[name] = v
1016 self.validate()
1018 def validate(self):
1019 """validates the body size with the value of the ``Content-Length``
1020 header
1021 """
1022 content_length_keys = 'Content-Length', 'content-length'
1023 for key in content_length_keys:
1024 got = self.adding_headers.get(
1025 key, self.forcing_headers.get(key, None))
1027 if got is None:
1028 continue
1030 igot = None
1031 try:
1032 igot = int(got)
1033 except (ValueError, TypeError):
1034 warnings.warn(
1035 'HTTPretty got to register the Content-Length header '
1036 'with "%r" which is not a number' % got)
1037 return
1039 if igot and igot > self.body_length:
1040 raise HTTPrettyError(
1041 'HTTPretty got inconsistent parameters. The header '
1042 'Content-Length you registered expects size "%d" but '
1043 'the body you registered for that has actually length '
1044 '"%d".' % (
1045 igot, self.body_length,
1046 )
1047 )
1049 def __str__(self):
1050 return r'<Entry {} {} getting {}>'.format(
1051 self.method,
1052 self.uri,
1053 self.status
1054 )
1056 def normalize_headers(self, headers):
1057 """Normalize keys in header names so that ``COntent-tyPe`` becomes ``content-type``
1059 :param headers: dict
1061 :returns: dict
1062 """
1063 new = {}
1064 for k in headers:
1065 new_k = '-'.join([s.lower() for s in k.split('-')])
1066 new[new_k] = headers[k]
1068 return new
1070 def fill_filekind(self, fk):
1071 """writes HTTP Response data to a file descriptor
1073 :parm fk: a file-like object
1075 .. warning:: **side-effect:** this method moves the cursor of the given file object to zero
1076 """
1077 now = datetime.utcnow()
1079 headers = {
1080 'status': self.status,
1081 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
1082 'server': 'Python/HTTPretty',
1083 'connection': 'close',
1084 }
1086 if self.forcing_headers:
1087 headers = self.forcing_headers
1089 if self.adding_headers:
1090 headers.update(
1091 self.normalize_headers(
1092 self.adding_headers))
1094 headers = self.normalize_headers(headers)
1095 status = headers.get('status', self.status)
1096 if self.body_is_callable:
1097 status, headers, self.body = self.callable_body(self.request, self.info.full_url(), headers)
1098 headers = self.normalize_headers(headers)
1099 # TODO: document this behavior:
1100 if 'content-length' not in headers:
1101 headers.update({
1102 'content-length': len(self.body)
1103 })
1105 string_list = [
1106 'HTTP/1.1 %d %s' % (status, STATUSES[status]),
1107 ]
1109 if 'date' in headers:
1110 string_list.append('date: %s' % headers.pop('date'))
1112 if not self.forcing_headers:
1113 content_type = headers.pop('content-type',
1114 'text/plain; charset=utf-8')
1116 content_length = headers.pop('content-length',
1117 self.body_length)
1119 string_list.append('content-type: %s' % content_type)
1120 if not self.streaming:
1121 string_list.append('content-length: %s' % content_length)
1123 server = headers.pop('server', None)
1124 if server:
1125 string_list.append('server: %s' % server)
1127 for k, v in headers.items():
1128 string_list.append(
1129 '{}: {}'.format(k, v),
1130 )
1132 for item in string_list:
1133 fk.write(utf8(item) + b'\n')
1135 fk.write(b'\r\n')
1137 if self.streaming:
1138 self.body, body = itertools.tee(self.body)
1139 for chunk in body:
1140 fk.write(utf8(chunk))
1141 else:
1142 fk.write(utf8(self.body))
1144 fk.seek(0)
1147def url_fix(s, charset=None):
1148 """escapes special characters
1149 """
1150 if charset:
1151 warnings.warn("{}.url_fix() charset argument is deprecated".format(__name__), DeprecationWarning)
1153 scheme, netloc, path, querystring, fragment = urlsplit(s)
1154 path = quote(path, b'/%')
1155 querystring = quote_plus(querystring, b':&=')
1156 return urlunsplit((scheme, netloc, path, querystring, fragment))
1159class URIInfo(BaseClass):
1160 """Internal representation of `URIs <https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>`_
1162 .. tip:: all arguments are optional
1164 :param username:
1165 :param password:
1166 :param hostname:
1167 :param port:
1168 :param path:
1169 :param query:
1170 :param fragment:
1171 :param scheme:
1172 :param last_request:
1173 """
1174 default_str_attrs = (
1175 'username',
1176 'password',
1177 'hostname',
1178 'port',
1179 'path',
1180 )
1182 def __init__(self,
1183 username='',
1184 password='',
1185 hostname='',
1186 port=80,
1187 path='/',
1188 query='',
1189 fragment='',
1190 scheme='',
1191 last_request=None):
1193 self.username = username or ''
1194 self.password = password or ''
1195 self.hostname = hostname or ''
1197 if port:
1198 port = int(port)
1200 elif scheme == 'https':
1201 port = 443
1203 self.port = port or 80
1204 self.path = path or ''
1205 if query:
1206 query_items = sorted(parse_qs(query).items())
1207 self.query = urlencode(
1208 encode_obj(query_items),
1209 doseq=True,
1210 )
1211 else:
1212 self.query = ''
1213 if scheme:
1214 self.scheme = scheme
1215 elif self.port in POTENTIAL_HTTPS_PORTS:
1216 self.scheme = 'https'
1217 else:
1218 self.scheme = 'http'
1219 self.fragment = fragment or ''
1220 self.last_request = last_request
1222 def to_str(self, attrs):
1223 fmt = ", ".join(['%s="%s"' % (k, getattr(self, k, '')) for k in attrs])
1224 return r'<httpretty.URIInfo(%s)>' % fmt
1226 def __str__(self):
1227 return self.to_str(self.default_str_attrs)
1229 def str_with_query(self):
1230 attrs = self.default_str_attrs + ('query',)
1231 return self.to_str(attrs)
1233 def __hash__(self):
1234 return int(hashlib.sha1(bytes(self, 'ascii')).hexdigest(), 16)
1236 def __eq__(self, other):
1237 self_tuple = (
1238 self.port,
1239 decode_utf8(self.hostname.lower()),
1240 url_fix(decode_utf8(self.path)),
1241 )
1242 other_tuple = (
1243 other.port,
1244 decode_utf8(other.hostname.lower()),
1245 url_fix(decode_utf8(other.path)),
1246 )
1247 return self_tuple == other_tuple
1249 def full_url(self, use_querystring=True):
1250 """
1251 :param use_querystring: bool
1252 :returns: a string with the full url with the format ``{scheme}://{credentials}{domain}{path}{query}``
1253 """
1254 credentials = ""
1255 if self.password:
1256 credentials = "{}:{}@".format(
1257 self.username, self.password)
1259 query = ""
1260 if use_querystring and self.query:
1261 query = "?{}".format(decode_utf8(self.query))
1263 result = "{scheme}://{credentials}{domain}{path}{query}".format(
1264 scheme=self.scheme,
1265 credentials=credentials,
1266 domain=self.get_full_domain(),
1267 path=decode_utf8(self.path),
1268 query=query
1269 )
1270 return result
1272 def get_full_domain(self):
1273 """
1274 :returns: a string in the form ``{domain}:{port}`` or just the domain if the port is 80 or 443
1275 """
1276 hostname = decode_utf8(self.hostname)
1277 # Port 80/443 should not be appended to the url
1278 if self.port not in DEFAULT_HTTP_PORTS | DEFAULT_HTTPS_PORTS:
1279 return ":".join([hostname, str(self.port)])
1281 return hostname
1283 @classmethod
1284 def from_uri(cls, uri, entry):
1285 """
1286 :param uri: string
1287 :param entry: an instance of :py:class:`~httpretty.core.Entry`
1288 """
1289 result = urlsplit(uri)
1290 if result.scheme == 'https':
1291 POTENTIAL_HTTPS_PORTS.add(int(result.port or 443))
1292 else:
1293 POTENTIAL_HTTP_PORTS.add(int(result.port or 80))
1294 return cls(result.username,
1295 result.password,
1296 result.hostname,
1297 result.port,
1298 result.path,
1299 result.query,
1300 result.fragment,
1301 result.scheme,
1302 entry)
1305class URIMatcher(object):
1306 regex = None
1307 info = None
1309 def __init__(self, uri, entries, match_querystring=False, priority=0):
1310 self._match_querystring = match_querystring
1311 # CPython, Jython
1312 regex_types = ('SRE_Pattern', 'org.python.modules.sre.PatternObject',
1313 'Pattern')
1314 is_regex = type(uri).__name__ in regex_types
1315 if is_regex:
1316 self.regex = uri
1317 result = urlsplit(uri.pattern)
1318 if result.scheme == 'https':
1319 POTENTIAL_HTTPS_PORTS.add(int(result.port or 443))
1320 else:
1321 POTENTIAL_HTTP_PORTS.add(int(result.port or 80))
1322 else:
1323 self.info = URIInfo.from_uri(uri, entries)
1325 self.entries = entries
1326 self.priority = priority
1327 self.uri = uri
1328 # hash of current_entry pointers, per method.
1329 self.current_entries = {}
1331 def matches(self, info):
1332 if self.info:
1333 # Query string is not considered when comparing info objects, compare separately
1334 return self.info == info and (not self._match_querystring or self.info.query == info.query)
1335 else:
1336 return self.regex.search(info.full_url(
1337 use_querystring=self._match_querystring))
1339 def __str__(self):
1340 wrap = 'URLMatcher({})'
1341 if self.info:
1342 if self._match_querystring:
1343 return wrap.format(str(self.info.str_with_query()))
1344 else:
1345 return wrap.format(str(self.info))
1346 else:
1347 return wrap.format(self.regex.pattern)
1349 def get_next_entry(self, method, info, request):
1350 """Cycle through available responses, but only once.
1351 Any subsequent requests will receive the last response"""
1353 if method not in self.current_entries:
1354 self.current_entries[method] = 0
1356 # restrict selection to entries that match the requested
1357 # method
1358 entries_for_method = [e for e in self.entries if e.method == method]
1360 if self.current_entries[method] >= len(entries_for_method):
1361 self.current_entries[method] = -1
1363 if not self.entries or not entries_for_method:
1364 raise ValueError('I have no entries for method %s: %s'
1365 % (method, self))
1367 entry = entries_for_method[self.current_entries[method]]
1368 if self.current_entries[method] != -1:
1369 self.current_entries[method] += 1
1371 # Create a copy of the original entry to make it thread-safe
1372 body = entry.callable_body if entry.body_is_callable else entry.body
1373 new_entry = Entry(entry.method, entry.uri, body,
1374 status=entry.status,
1375 streaming=entry.streaming,
1376 adding_headers=entry.adding_headers,
1377 forcing_headers=entry.forcing_headers)
1379 # Attach more info to the entry
1380 # So the callback can be more clever about what to do
1381 # This does also fix the case where the callback
1382 # would be handed a compiled regex as uri instead of the
1383 # real uri
1384 new_entry.info = info
1385 new_entry.request = request
1386 return new_entry
1388 def __hash__(self):
1389 return hash(str(self))
1391 def __eq__(self, other):
1392 return str(self) == str(other)
1395class httpretty(HttpBaseClass):
1396 """manages HTTPretty's internal request/response registry and request matching.
1397 """
1398 _entries = {}
1399 latest_requests = []
1401 last_request = HTTPrettyRequestEmpty()
1402 _is_enabled = False
1403 allow_net_connect = True
1405 @classmethod
1406 def match_uriinfo(cls, info):
1407 """
1408 :param info: an :py:class:`~httpretty.core.URIInfo`
1409 :returns: a 2-item tuple: (:py:class:`~httpretty.core.URLMatcher`, :py:class:`~httpretty.core.URIInfo`) or ``(None, [])``
1410 """
1411 items = sorted(
1412 cls._entries.items(),
1413 key=lambda matcher_entries: matcher_entries[0].priority,
1414 reverse=True,
1415 )
1416 for matcher, value in items:
1417 if matcher.matches(info):
1418 return (matcher, info)
1420 return (None, [])
1422 @classmethod
1423 def match_https_hostname(cls, hostname):
1424 """
1425 :param hostname: a string
1426 :returns: an :py:class:`~httpretty.core.URLMatcher` or ``None``
1427 """
1428 items = sorted(
1429 cls._entries.items(),
1430 key=lambda matcher_entries: matcher_entries[0].priority,
1431 reverse=True,
1432 )
1433 for matcher, value in items:
1434 if matcher.info is None:
1435 pattern_with_port = "https://{0}:".format(hostname)
1436 pattern_without_port = "https://{0}/".format(hostname)
1437 hostname_pattern = (
1438 hostname_re
1439 .match(matcher.regex.pattern)
1440 .group(0)
1441 )
1442 for pattern in [pattern_with_port, pattern_without_port]:
1443 if re.match(hostname_pattern, pattern):
1444 return matcher
1446 elif matcher.info.hostname == hostname:
1447 return matcher
1448 return None
1450 @classmethod
1451 def match_http_address(cls, hostname, port):
1452 """
1453 :param hostname: a string
1454 :param port: an integer
1455 :returns: an :py:class:`~httpretty.core.URLMatcher` or ``None``
1456 """
1457 items = sorted(
1458 cls._entries.items(),
1459 key=lambda matcher_entries: matcher_entries[0].priority,
1460 reverse=True,
1461 )
1462 for matcher, value in items:
1463 if matcher.info is None:
1464 if port in POTENTIAL_HTTPS_PORTS:
1465 scheme = 'https://'
1466 else:
1467 scheme = 'http://'
1469 pattern_without_port = "{0}{1}/".format(scheme, hostname)
1470 pattern_with_port = "{0}{1}:{2}/".format(scheme, hostname, port)
1471 hostname_pattern = (
1472 hostname_re
1473 .match(matcher.regex.pattern)
1474 .group(0)
1475 )
1476 for pattern in [pattern_with_port, pattern_without_port]:
1477 if re.match(hostname_pattern, pattern):
1478 return matcher
1480 elif matcher.info.hostname == hostname \
1481 and matcher.info.port == port:
1482 return matcher
1484 return None
1486 @classmethod
1487 @contextlib.contextmanager
1488 def record(cls, filename, indentation=4, encoding='utf-8', verbose=False, allow_net_connect=True, pool_manager_params=None):
1489 """
1490 .. testcode::
1492 import io
1493 import json
1494 import requests
1495 import httpretty
1497 with httpretty.record('/tmp/ip.json'):
1498 data = requests.get('https://httpbin.org/ip').json()
1500 with io.open('/tmp/ip.json') as fd:
1501 assert data == json.load(fd)
1503 :param filename: a string
1504 :param indentation: an integer, defaults to **4**
1505 :param encoding: a string, defaults to **"utf-8"**
1507 :returns: a `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_
1508 """
1509 try:
1510 import urllib3
1511 except ImportError:
1512 msg = (
1513 'HTTPretty requires urllib3 installed '
1514 'for recording actual requests.'
1515 )
1516 raise RuntimeError(msg)
1518 http = urllib3.PoolManager(**pool_manager_params or {})
1520 cls.enable(allow_net_connect, verbose=verbose)
1521 calls = []
1523 def record_request(request, uri, headers):
1524 cls.disable()
1526 kw = {}
1527 kw.setdefault('body', request.body)
1528 kw.setdefault('headers', dict(request.headers))
1529 response = http.request(request.method, uri, **kw)
1530 calls.append({
1531 'request': {
1532 'uri': uri,
1533 'method': request.method,
1534 'headers': dict(request.headers),
1535 'body': decode_utf8(request.body),
1536 'querystring': request.querystring
1537 },
1538 'response': {
1539 'status': response.status,
1540 'body': decode_utf8(response.data),
1541 # urllib3 1.10 had a bug if you just did:
1542 # dict(response.headers)
1543 # which would cause all the values to become lists
1544 # with the header name as the first item and the
1545 # true value as the second item. Workaround that
1546 'headers': dict(response.headers.items())
1547 }
1548 })
1549 cls.enable(allow_net_connect, verbose=verbose)
1550 return response.status, response.headers, response.data
1552 for method in cls.METHODS:
1553 cls.register_uri(method, MULTILINE_ANY_REGEX, body=record_request)
1555 yield
1556 cls.disable()
1557 with codecs.open(filename, 'w', encoding) as f:
1558 f.write(json.dumps(calls, indent=indentation))
1560 @classmethod
1561 @contextlib.contextmanager
1562 def playback(cls, filename, allow_net_connect=True, verbose=False):
1563 """
1564 .. testcode::
1566 import io
1567 import json
1568 import requests
1569 import httpretty
1571 with httpretty.record('/tmp/ip.json'):
1572 data = requests.get('https://httpbin.org/ip').json()
1574 with io.open('/tmp/ip.json') as fd:
1575 assert data == json.load(fd)
1577 :param filename: a string
1578 :returns: a `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_
1579 """
1580 cls.enable(allow_net_connect, verbose=verbose)
1582 data = json.loads(open(filename).read())
1583 for item in data:
1584 uri = item['request']['uri']
1585 method = item['request']['method']
1586 body = item['response']['body']
1587 headers = item['response']['headers']
1588 cls.register_uri(method, uri, body=body, forcing_headers=headers)
1590 yield
1591 cls.disable()
1593 @classmethod
1594 def reset(cls):
1595 """resets the internal state of HTTPretty, unregistering all URLs
1596 """
1597 POTENTIAL_HTTP_PORTS.intersection_update(DEFAULT_HTTP_PORTS)
1598 POTENTIAL_HTTPS_PORTS.intersection_update(DEFAULT_HTTPS_PORTS)
1599 cls._entries.clear()
1600 cls.latest_requests = []
1601 cls.last_request = HTTPrettyRequestEmpty()
1602 __internals__.cleanup_sockets()
1604 @classmethod
1605 def historify_request(cls, headers, body='', sock=None, append=True):
1606 """appends request to a list for later retrieval
1608 .. testcode::
1610 import httpretty
1612 httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip', body='')
1613 with httpretty.enabled():
1614 requests.get('https://httpbin.org/ip')
1616 assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip'
1617 """
1618 request = HTTPrettyRequest(headers, body, sock=sock)
1619 cls.last_request = request
1621 if append or not cls.latest_requests:
1622 cls.latest_requests.append(request)
1623 else:
1624 cls.latest_requests[-1] = request
1626 logger.info("captured: {}".format(request))
1627 return request
1629 @classmethod
1630 def register_uri(cls, method, uri, body='{"message": "HTTPretty :)"}',
1631 adding_headers=None,
1632 forcing_headers=None,
1633 status=200,
1634 responses=None,
1635 match_querystring=False,
1636 priority=0,
1637 **headers):
1638 """
1639 .. testcode::
1641 import httpretty
1644 def request_callback(request, uri, response_headers):
1645 content_type = request.headers.get('Content-Type')
1646 assert request.body == '{"nothing": "here"}', 'unexpected body: {}'.format(request.body)
1647 assert content_type == 'application/json', 'expected application/json but received Content-Type: {}'.format(content_type)
1648 return [200, response_headers, json.dumps({"hello": "world"})]
1650 httpretty.register_uri(
1651 HTTPretty.POST, "https://httpretty.example.com/api",
1652 body=request_callback)
1655 with httpretty.enabled():
1656 requests.post('https://httpretty.example.com/api', data='{"nothing": "here"}', headers={'Content-Type': 'application/json'})
1658 assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip'
1660 :param method: one of ``httpretty.GET``, ``httpretty.PUT``, ``httpretty.POST``, ``httpretty.DELETE``, ``httpretty.HEAD``, ``httpretty.PATCH``, ``httpretty.OPTIONS``, ``httpretty.CONNECT``
1661 :param uri: a string or regex pattern (e.g.: **"https://httpbin.org/ip"**)
1662 :param body: a string, defaults to ``{"message": "HTTPretty :)"}``
1663 :param adding_headers: dict - headers to be added to the response
1664 :param forcing_headers: dict - headers to be forcefully set in the response
1665 :param status: an integer, defaults to **200**
1666 :param responses: a list of entries, ideally each created with :py:meth:`~httpretty.core.httpretty.Response`
1667 :param priority: an integer, useful for setting higher priority over previously registered urls. defaults to zero
1668 :param match_querystring: bool - whether to take the querystring into account when matching an URL
1669 :param headers: headers to be added to the response
1671 .. warning:: When using a port in the request, add a trailing slash if no path is provided otherwise Httpretty will not catch the request. Ex: ``httpretty.register_uri(httpretty.GET, 'http://fakeuri.com:8080/', body='{"hello":"world"}')``
1672 """
1673 uri_is_string = isinstance(uri, str)
1675 if uri_is_string and re.search(r'^\w+://[^/]+[.]\w{2,}(:[0-9]+)?$', uri):
1676 uri += '/'
1678 if isinstance(responses, list) and len(responses) > 0:
1679 for response in responses:
1680 response.uri = uri
1681 response.method = method
1682 entries_for_this_uri = responses
1683 else:
1684 headers['body'] = body
1685 headers['adding_headers'] = adding_headers
1686 headers['forcing_headers'] = forcing_headers
1687 headers['status'] = status
1689 entries_for_this_uri = [
1690 cls.Response(method=method, uri=uri, **headers),
1691 ]
1693 matcher = URIMatcher(uri, entries_for_this_uri,
1694 match_querystring, priority)
1695 if matcher in cls._entries:
1696 matcher.entries.extend(cls._entries[matcher])
1697 del cls._entries[matcher]
1699 cls._entries[matcher] = entries_for_this_uri
1701 def __str__(self):
1702 return '<HTTPretty with %d URI entries>' % len(self._entries)
1704 @classmethod
1705 def Response(
1706 cls, body,
1707 method=None,
1708 uri=None,
1709 adding_headers=None,
1710 forcing_headers=None,
1711 status=200,
1712 streaming=False,
1713 **kw):
1714 """Shortcut to create an :py:class:`~httpretty.core.Entry` that takes
1715 the body as first positional argument.
1717 .. seealso:: the parameters of this function match those of
1718 the :py:class:`~httpretty.core.Entry` constructor.
1720 Args:
1721 body (str): The body to return as response..
1722 method (str): One of ``httpretty.GET``, ``httpretty.PUT``, ``httpretty.POST``, ``httpretty.DELETE``, ``httpretty.HEAD``, ``httpretty.PATCH``, ``httpretty.OPTIONS``, ``httpretty.CONNECT``.
1723 uri (str|re.Pattern): The URL to match
1724 adding_headers (dict): Extra headers to be added to the response
1725 forcing_headers (dict): Overwrite **any** response headers, even "Content-Length".
1726 status (int): The status code for the response, defaults to ``200``.
1727 streaming (bool): Whether should stream the response into chunks via generator.
1728 kwargs: Keyword-arguments are forwarded to :py:class:`~httpretty.core.Entry`
1730 Returns:
1731 httpretty.Entry: containing the request-matching metadata.
1732 """
1733 kw['body'] = body
1734 kw['adding_headers'] = adding_headers
1735 kw['forcing_headers'] = forcing_headers
1736 kw['status'] = int(status)
1737 kw['streaming'] = streaming
1738 return Entry(method, uri, **kw)
1740 @classmethod
1741 def disable(cls):
1742 """Disables HTTPretty entirely, putting the original :py:mod:`socket`
1743 module back in its place.
1746 .. code::
1748 import re, json
1749 import httpretty
1751 httpretty.enable()
1752 # request passes through fake socket
1753 response = requests.get('https://httpbin.org')
1755 httpretty.disable()
1756 # request uses real python socket module
1757 response = requests.get('https://httpbin.org')
1759 .. note:: This method does not call :py:meth:`httpretty.core.reset` automatically.
1760 """
1761 undo_patch_socket()
1762 cls._is_enabled = False
1765 @classmethod
1766 def is_enabled(cls):
1767 """Check if HTTPretty is enabled
1769 :returns: bool
1771 .. testcode::
1773 import httpretty
1775 httpretty.enable()
1776 assert httpretty.is_enabled() == True
1778 httpretty.disable()
1779 assert httpretty.is_enabled() == False
1780 """
1781 return cls._is_enabled
1783 @classmethod
1784 def enable(cls, allow_net_connect=True, verbose=False):
1785 """Enables HTTPretty.
1787 :param allow_net_connect: boolean to determine if unmatched requests are forwarded to a real network connection OR throw :py:class:`httpretty.errors.UnmockedError`.
1788 :param verbose: boolean to set HTTPretty's logging level to DEBUG
1790 .. testcode::
1792 import re, json
1793 import httpretty
1795 httpretty.enable(allow_net_connect=True, verbose=True)
1797 httpretty.register_uri(
1798 httpretty.GET,
1799 re.compile(r'http://.*'),
1800 body=json.dumps({'man': 'in', 'the': 'middle'})
1801 )
1803 response = requests.get('https://foo.bar/foo/bar')
1805 response.json().should.equal({
1806 "man": "in",
1807 "the": "middle",
1808 })
1810 .. warning:: after calling this method the original :py:mod:`socket` is replaced with :py:class:`httpretty.core.fakesock`. Make sure to call :py:meth:`~httpretty.disable` after done with your tests or use the :py:class:`httpretty.enabled` as decorator or `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_
1811 """
1812 httpretty.allow_net_connect = allow_net_connect
1813 apply_patch_socket()
1814 cls._is_enabled = True
1815 if verbose:
1816 logger.setLevel(logging.DEBUG)
1817 else:
1818 logger.setLevel(logging.getLogger().level or logging.WARNING)
1821def apply_patch_socket():
1822 # Some versions of python internally shadowed the
1823 # SocketType variable incorrectly https://bugs.python.org/issue20386
1824 bad_socket_shadow = (socket.socket != socket.SocketType)
1826 new_wrap = None
1827 socket.socket = fakesock.socket
1828 socket.socketpair = fake_socketpair
1829 socket._socketobject = fakesock.socket
1830 if not bad_socket_shadow:
1831 socket.SocketType = fakesock.socket
1833 socket.create_connection = create_fake_connection
1834 socket.gethostname = fake_gethostname
1835 socket.gethostbyname = fake_gethostbyname
1836 socket.getaddrinfo = fake_getaddrinfo
1838 socket.__dict__['socket'] = fakesock.socket
1839 socket.__dict__['socketpair'] = fake_socketpair
1840 socket.__dict__['_socketobject'] = fakesock.socket
1841 if not bad_socket_shadow:
1842 socket.__dict__['SocketType'] = fakesock.socket
1844 socket.__dict__['create_connection'] = create_fake_connection
1845 socket.__dict__['gethostname'] = fake_gethostname
1846 socket.__dict__['gethostbyname'] = fake_gethostbyname
1847 socket.__dict__['getaddrinfo'] = fake_getaddrinfo
1850 # Take out the pyopenssl version - use the default implementation
1851 for extract_from_urllib3 in pyopenssl_overrides_extract:
1852 extract_into_urllib3()
1854 if requests_urllib3_connection is not None:
1855 urllib3_wrap = partial(fake_wrap_socket, old_requests_ssl_wrap_socket)
1856 requests_urllib3_connection.ssl_wrap_socket = urllib3_wrap
1857 requests_urllib3_connection.__dict__['ssl_wrap_socket'] = urllib3_wrap
1859 if eventlet:
1860 eventlet.green.ssl.GreenSSLContext = old_sslcontext_class
1861 eventlet.green.ssl.__dict__['GreenSSLContext'] = old_sslcontext_class
1862 eventlet.green.ssl.SSLContext = old_sslcontext_class
1863 eventlet.green.ssl.__dict__['SSLContext'] = old_sslcontext_class
1865 if socks:
1866 socks.socksocket = fakesock.socket
1867 socks.__dict__['socksocket'] = fakesock.socket
1869 if ssl:
1870 new_wrap = partial(fake_wrap_socket, old_ssl_wrap_socket)
1871 ssl.wrap_socket = new_wrap
1872 ssl.SSLSocket = FakeSSLSocket
1873 ssl.SSLContext = old_sslcontext_class
1874 try:
1875 ssl.SSLContext.wrap_socket = partial(fake_wrap_socket, old_ssl_wrap_socket)
1876 except AttributeError:
1877 pass
1879 ssl.__dict__['wrap_socket'] = new_wrap
1880 ssl.__dict__['SSLSocket'] = FakeSSLSocket
1881 ssl.__dict__['SSLContext'] = old_sslcontext_class
1884def undo_patch_socket():
1885 socket.socket = old_socket
1886 socket.socketpair = old_socketpair
1887 socket.SocketType = old_SocketType
1888 socket._socketobject = old_socket
1890 socket.create_connection = old_create_connection
1891 socket.gethostname = old_gethostname
1892 socket.gethostbyname = old_gethostbyname
1893 socket.getaddrinfo = old_getaddrinfo
1895 socket.__dict__['socket'] = old_socket
1896 socket.__dict__['socketpair'] = old_socketpair
1897 socket.__dict__['_socketobject'] = old_socket
1898 socket.__dict__['SocketType'] = old_SocketType
1900 socket.__dict__['create_connection'] = old_create_connection
1901 socket.__dict__['gethostname'] = old_gethostname
1902 socket.__dict__['gethostbyname'] = old_gethostbyname
1903 socket.__dict__['getaddrinfo'] = old_getaddrinfo
1905 if socks:
1906 socks.socksocket = old_socksocket
1907 socks.__dict__['socksocket'] = old_socksocket
1909 if ssl:
1910 ssl.wrap_socket = old_ssl_wrap_socket
1911 ssl.SSLSocket = old_sslsocket
1912 try:
1913 ssl.SSLContext.wrap_socket = old_sslcontext_wrap_socket
1914 except AttributeError:
1915 pass
1916 ssl.__dict__['wrap_socket'] = old_ssl_wrap_socket
1917 ssl.__dict__['SSLSocket'] = old_sslsocket
1919 if requests_urllib3_connection is not None:
1920 requests_urllib3_connection.ssl_wrap_socket = \
1921 old_requests_ssl_wrap_socket
1922 requests_urllib3_connection.__dict__['ssl_wrap_socket'] = \
1923 old_requests_ssl_wrap_socket
1926 # Put the pyopenssl version back in place
1927 for inject_from_urllib3 in pyopenssl_overrides_inject:
1928 inject_into_urllib3()
1931@contextlib.contextmanager
1932def restored_libs():
1933 undo_patch_socket()
1934 yield
1935 apply_patch_socket()
1938class httprettized(object):
1939 """`context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_ for enabling HTTPretty.
1941 .. tip:: Also available under the alias :py:func:`httpretty.enabled`
1943 .. testcode::
1945 import json
1946 import httpretty
1948 httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip', body=json.dumps({'origin': '42.42.42.42'}))
1949 with httpretty.enabled():
1950 response = requests.get('https://httpbin.org/ip')
1952 assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip'
1953 assert response.json() == {'origin': '42.42.42.42'}
1954 """
1955 def __init__(self, allow_net_connect=True, verbose=False):
1956 self.allow_net_connect = allow_net_connect
1957 self.verbose = verbose
1959 def __enter__(self):
1960 httpretty.reset()
1961 httpretty.enable(allow_net_connect=self.allow_net_connect, verbose=self.verbose)
1963 def __exit__(self, exc_type, exc_value, db):
1964 httpretty.disable()
1965 httpretty.reset()
1968def httprettified(test=None, allow_net_connect=True, verbose=False):
1969 """decorator for test functions
1971 .. tip:: Also available under the alias :py:func:`httpretty.activate`
1973 :param test: a callable
1976 example usage with `nosetests <https://nose.readthedocs.io/en/latest/>`_
1978 .. testcode::
1980 import sure
1981 from httpretty import httprettified
1983 @httprettified
1984 def test_using_nosetests():
1985 httpretty.register_uri(
1986 httpretty.GET,
1987 'https://httpbin.org/ip'
1988 )
1990 response = requests.get('https://httpbin.org/ip')
1992 response.json().should.equal({
1993 "message": "HTTPretty :)"
1994 })
1996 example usage with `unittest module <https://docs.python.org/3/library/unittest.html>`_
1998 .. testcode::
2000 import unittest
2001 from sure import expect
2002 from httpretty import httprettified
2004 @httprettified
2005 class TestWithPyUnit(unittest.TestCase):
2006 def test_httpbin(self):
2007 httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip')
2008 response = requests.get('https://httpbin.org/ip')
2009 expect(response.json()).to.equal({
2010 "message": "HTTPretty :)"
2011 })
2013 """
2014 def decorate_unittest_TestCase_setUp(klass):
2016 # Prefer addCleanup (added in python 2.7), but fall back
2017 # to using tearDown if it isn't available
2018 use_addCleanup = hasattr(klass, 'addCleanup')
2020 original_setUp = (klass.setUp
2021 if hasattr(klass, 'setUp')
2022 else None)
2024 def new_setUp(self):
2025 httpretty.reset()
2026 httpretty.enable(allow_net_connect, verbose=verbose)
2027 if use_addCleanup:
2028 self.addCleanup(httpretty.disable)
2029 if original_setUp:
2030 original_setUp(self)
2031 klass.setUp = new_setUp
2033 if not use_addCleanup:
2034 original_tearDown = (klass.setUp
2035 if hasattr(klass, 'tearDown')
2036 else None)
2038 def new_tearDown(self):
2039 httpretty.disable()
2040 httpretty.reset()
2041 if original_tearDown:
2042 original_tearDown(self)
2043 klass.tearDown = new_tearDown
2045 return klass
2047 def decorate_test_methods(klass):
2048 for attr in dir(klass):
2049 if not attr.startswith('test_'):
2050 continue
2052 attr_value = getattr(klass, attr)
2053 if not hasattr(attr_value, "__call__"):
2054 continue
2056 setattr(klass, attr, decorate_callable(attr_value))
2057 return klass
2059 def is_unittest_TestCase(klass):
2060 try:
2061 import unittest
2062 return issubclass(klass, unittest.TestCase)
2063 except ImportError:
2064 return False
2066 def decorate_class(klass):
2067 if is_unittest_TestCase(klass):
2068 return decorate_unittest_TestCase_setUp(klass)
2069 return decorate_test_methods(klass)
2071 def decorate_callable(test):
2072 @functools.wraps(test)
2073 def wrapper(*args, **kw):
2074 with httprettized(allow_net_connect):
2075 return test(*args, **kw)
2076 return wrapper
2078 if isinstance(test, type):
2079 return decorate_class(test)
2080 elif callable(test):
2081 return decorate_callable(test)
2082 return decorate_callable