Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpretty/core.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1008 statements  

1# <HTTPretty - HTTP client mock for Python> 

2# Copyright (C) <2011-2021> Gabriel Falcão <gabriel@nacaolivre.org> 

3# 

4# Permission is hereby granted, free of charge, to any person 

5# obtaining a copy of this software and associated documentation 

6# files (the "Software"), to deal in the Software without 

7# restriction, including without limitation the rights to use, 

8# copy, modify, merge, publish, distribute, sublicense, and/or sell 

9# copies of the Software, and to permit persons to whom the 

10# Software is furnished to do so, subject to the following 

11# conditions: 

12# 

13# The above copyright notice and this permission notice shall be 

14# included in all copies or substantial portions of the Software. 

15# 

16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 

17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 

18# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 

19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 

20# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 

21# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 

23# OTHER DEALINGS IN THE SOFTWARE. 

24 

25import io 

26import time 

27import codecs 

28import contextlib 

29import functools 

30import hashlib 

31import inspect 

32import logging 

33import itertools 

34import json 

35import types 

36import re 

37import socket 

38import tempfile 

39import threading 

40import traceback 

41import warnings 

42 

43from functools import partial 

44from typing import Callable 

45 

46from .compat import ( 

47 BaseClass, 

48 BaseHTTPRequestHandler, 

49 quote, 

50 quote_plus, 

51 urlencode, 

52 encode_obj, 

53 urlunsplit, 

54 urlsplit, 

55 parse_qs, 

56 unquote_utf8, 

57) 

58from .http import ( 

59 STATUSES, 

60 HttpBaseClass, 

61 parse_requestline, 

62 last_requestline, 

63) 

64 

65from .utils import ( 

66 utf8, 

67 decode_utf8, 

68) 

69 

70from .errors import HTTPrettyError, UnmockedError 

71 

72from datetime import datetime 

73from datetime import timedelta 

74from errno import EAGAIN 

75 

76class __internals__: 

77 thread_timeout = 0.1 # https://github.com/gabrielfalcao/HTTPretty/issues/430 

78 temp_files = [] 

79 threads = [] 

80 

81 @classmethod 

82 def cleanup_sockets(cls): 

83 cls.cleanup_temp_files() 

84 cls.cleanup_threads() 

85 

86 @classmethod 

87 def cleanup_threads(cls): 

88 for t in cls.threads: 

89 t.join(cls.thread_timeout) 

90 if t.is_alive(): 

91 raise socket.timeout(cls.thread_timeout) 

92 

93 @classmethod 

94 def create_thread(cls, *args, **kwargs): 

95 return threading.Thread(*args, **kwargs) 

96 

97 @classmethod 

98 def cleanup_temp_files(cls): 

99 for fd in cls.temp_files[:]: 

100 try: 

101 fd.close() 

102 except Exception as e: 

103 logger.debug('error closing file {}: {}'.format(fd, e)) 

104 cls.temp_files.remove(fd) 

105 

106 @classmethod 

107 def create_temp_file(cls): 

108 fd = tempfile.TemporaryFile() 

109 cls.temp_files.append(fd) 

110 return fd 

111 

112def set_default_thread_timeout(timeout): 

113 """sets the default thread timeout for HTTPretty threads 

114 

115 :param timeout: int 

116 """ 

117 __internals__.thread_timeout = timeout 

118 

119def get_default_thread_timeout(): 

120 """sets the default thread timeout for HTTPretty threads 

121 

122 :returns: int 

123 """ 

124 

125 return __internals__.thread_timeout 

126 

127 

128SOCKET_GLOBAL_DEFAULT_TIMEOUT = socket._GLOBAL_DEFAULT_TIMEOUT 

129old_socket = socket.socket 

130old_socketpair = getattr(socket, 'socketpair', None) 

131old_SocketType = socket.SocketType 

132old_create_connection = socket.create_connection 

133old_gethostbyname = socket.gethostbyname 

134old_gethostname = socket.gethostname 

135old_getaddrinfo = socket.getaddrinfo 

136old_socksocket = None 

137old_ssl_wrap_socket = None 

138old_sslwrap_simple = None 

139old_sslsocket = None 

140old_sslcontext_wrap_socket = None 

141old_sslcontext = None 

142 

143MULTILINE_ANY_REGEX = re.compile(r'.*', re.M) 

144hostname_re = re.compile(r'\^?(?:https?://)?[^:/]*[:/]?') 

145 

146 

147logger = logging.getLogger(__name__) 

148 

149try: # pragma: no cover 

150 import socks 

151 old_socksocket = socks.socksocket 

152except ImportError: 

153 socks = None 

154 

155try: # pragma: no cover 

156 import ssl 

157 old_sslcontext_class = ssl.SSLContext 

158 old_sslcontext = ssl.create_default_context() 

159 old_ssl_wrap_socket = old_sslcontext.wrap_socket 

160 try: 

161 old_sslcontext_wrap_socket = ssl.SSLContext.wrap_socket 

162 except AttributeError: 

163 pass 

164 old_sslsocket = ssl.SSLSocket 

165except ImportError: # pragma: no cover 

166 ssl = None 

167 

168try: 

169 import _ssl 

170except ImportError: 

171 _ssl = None 

172# used to handle error caused by ndg-httpsclient 

173pyopenssl_overrides_inject = [] 

174pyopenssl_overrides_extract = [] 

175try: 

176 from requests.packages.urllib3.contrib.pyopenssl import inject_into_urllib3, extract_from_urllib3 

177 pyopenssl_overrides_extract.append(extract_from_urllib) 

178 pyopenssl_overrides_inject.append(inject_from_urllib) 

179except Exception: 

180 pass 

181 

182 

183 

184try: 

185 from urllib3.contrib.pyopenssl import extract_from_urllib3, inject_into_urllib3 

186 pyopenssl_overrides_extract.append(extract_from_urllib) 

187 pyopenssl_overrides_inject.append(inject_from_urllib) 

188except Exception: 

189 pass 

190 

191 

192try: 

193 import requests.packages.urllib3.connection as requests_urllib3_connection 

194 old_requests_ssl_wrap_socket = requests_urllib3_connection.ssl_wrap_socket 

195except ImportError: 

196 requests_urllib3_connection = None 

197 old_requests_ssl_wrap_socket = None 

198 

199try: 

200 import eventlet 

201 import eventlet.green 

202except ImportError: 

203 eventlet = None 

204 

205DEFAULT_HTTP_PORTS = frozenset([80]) 

206POTENTIAL_HTTP_PORTS = set(DEFAULT_HTTP_PORTS) 

207DEFAULT_HTTPS_PORTS = frozenset([443]) 

208POTENTIAL_HTTPS_PORTS = set(DEFAULT_HTTPS_PORTS) 

209 

210 

211 

212def FALLBACK_FUNCTION(x): 

213 return x 

214 

215 

216class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass): 

217 r"""Represents a HTTP request. It takes a valid multi-line, 

218 ``\r\n`` separated string with HTTP headers and parse them out using 

219 the internal `parse_request` method. 

220 

221 It also replaces the `rfile` and `wfile` attributes with :py:class:`io.BytesIO` 

222 instances so that we guarantee that it won't make any I/O, neither 

223 for writing nor reading. 

224 

225 It has some convenience attributes: 

226 

227 ``headers`` -> a mimetype object that can be cast into a dictionary, 

228 contains all the request headers 

229 

230 ``protocol`` -> the protocol of this host, inferred from the port 

231 of the underlying fake TCP socket. 

232 

233 ``host`` -> the hostname of this request. 

234 

235 ``url`` -> the full url of this request. 

236 

237 ``path`` -> the path of the request. 

238 

239 ``method`` -> the HTTP method used in this request. 

240 

241 ``querystring`` -> a dictionary containing lists with the 

242 attributes. Please notice that if you need a single value from a 

243 query string you will need to get it manually like: 

244 

245 ``body`` -> the request body as a string. 

246 

247 ``parsed_body`` -> the request body parsed by ``parse_request_body``. 

248 

249 .. testcode:: 

250 

251 >>> request.querystring 

252 {'name': ['Gabriel Falcao']} 

253 >>> print request.querystring['name'][0] 

254 

255 """ 

256 def __init__(self, headers, body='', sock=None, path_encoding = 'iso-8859-1'): 

257 # first of all, lets make sure that if headers or body are 

258 # unicode strings, it must be converted into a utf-8 encoded 

259 # byte string 

260 self.created_at = time.time() 

261 self.raw_headers = utf8(headers.strip()) 

262 self._body = utf8(body) 

263 self.connection = sock 

264 # Now let's concatenate the headers with the body, and create 

265 # `rfile` based on it 

266 self.rfile = io.BytesIO(b'\r\n\r\n'.join([self.raw_headers, self.body])) 

267 

268 # Creating `wfile` as an empty BytesIO, just to avoid any 

269 # real I/O calls 

270 self.wfile = io.BytesIO() 

271 

272 # parsing the request line preemptively 

273 self.raw_requestline = self.rfile.readline() 

274 

275 # initiating the error attributes with None 

276 self.error_code = None 

277 self.error_message = None 

278 

279 # Parse the request based on the attributes above 

280 if not self.parse_request(): 

281 return 

282 

283 # Now 2 convenient attributes for the HTTPretty API: 

284 

285 # - `path` 

286 # - `querystring` holds a dictionary with the parsed query string 

287 # - `parsed_body` a string 

288 try: 

289 self.path = self.path.encode(path_encoding) 

290 except UnicodeDecodeError: 

291 pass 

292 

293 self.path = decode_utf8(self.path) 

294 

295 qstring = self.path.split("?", 1)[-1] 

296 self.querystring = self.parse_querystring(qstring) 

297 

298 # And the body will be attempted to be parsed as 

299 # `application/json` or `application/x-www-form-urlencoded` 

300 """a dictionary containing parsed request body or None if 

301 HTTPrettyRequest doesn't know how to parse it. It currently 

302 supports parsing body data that was sent under the 

303 ``content`-type` headers values: ``application/json`` or 

304 ``application/x-www-form-urlencoded`` 

305 """ 

306 self.parsed_body = self.parse_request_body(self._body) 

307 

308 @property 

309 def method(self): 

310 """the HTTP method used in this request""" 

311 return self.command 

312 

313 @property 

314 def protocol(self): 

315 """the protocol used in this request""" 

316 proto = '' 

317 if not self.connection: 

318 return '' 

319 elif self.connection.is_http: 

320 proto = 'http' 

321 

322 if self.connection.is_secure: 

323 proto = 'https' 

324 

325 return proto 

326 

327 @property 

328 def body(self): 

329 return self._body 

330 

331 @body.setter 

332 def body(self, value): 

333 self._body = utf8(value) 

334 

335 # And the body will be attempted to be parsed as 

336 # `application/json` or `application/x-www-form-urlencoded` 

337 self.parsed_body = self.parse_request_body(self._body) 

338 

339 def __nonzero__(self): 

340 return bool(self.body) or bool(self.raw_headers) 

341 

342 @property 

343 def url(self): 

344 """the full url of this recorded request""" 

345 return "{}://{}{}".format(self.protocol, self.host, self.path) 

346 

347 @property 

348 def host(self): 

349 return self.headers.get('Host') or '<unknown>' 

350 

351 def __str__(self): 

352 tmpl = '<HTTPrettyRequest("{}", "{}", headers={}, body={})>' 

353 return tmpl.format( 

354 self.method, 

355 self.url, 

356 dict(self.headers), 

357 len(self.body), 

358 ) 

359 

360 def parse_querystring(self, qs): 

361 """parses an UTF-8 encoded query string into a dict of string lists 

362 

363 :param qs: a querystring 

364 :returns: a dict of lists 

365 

366 """ 

367 expanded = unquote_utf8(qs) 

368 parsed = parse_qs(expanded) 

369 result = {} 

370 for k in parsed: 

371 result[k] = list(map(decode_utf8, parsed[k])) 

372 

373 return result 

374 

375 def parse_request_body(self, body): 

376 """Attempt to parse the post based on the content-type passed. 

377 Return the regular body if not 

378 

379 :param body: string 

380 :returns: a python object such as dict or list in case the deserialization suceeded. Else returns the given param ``body`` 

381 """ 

382 

383 PARSING_FUNCTIONS = { 

384 'application/json': json.loads, 

385 'text/json': json.loads, 

386 'application/x-www-form-urlencoded': self.parse_querystring, 

387 } 

388 

389 content_type = self.headers.get('content-type', '') 

390 

391 do_parse = PARSING_FUNCTIONS.get(content_type, FALLBACK_FUNCTION) 

392 try: 

393 body = decode_utf8(body) 

394 return do_parse(body) 

395 except Exception: 

396 return body 

397 

398 

399class EmptyRequestHeaders(dict): 

400 """A dict subclass used as internal representation of empty request 

401 headers 

402 """ 

403 

404 

405class HTTPrettyRequestEmpty(object): 

406 """Represents an empty :py:class:`~httpretty.core.HTTPrettyRequest` 

407 where all its properties are somehow empty or ``None`` 

408 """ 

409 

410 method = None 

411 url = None 

412 body = '' 

413 headers = EmptyRequestHeaders() 

414 

415 

416 

417class FakeSockFile(object): 

418 """Fake socket file descriptor. Under the hood all data is written in 

419 a temporary file, giving it a real file descriptor number. 

420 

421 """ 

422 def __init__(self): 

423 self.file = None 

424 self._fileno = None 

425 self.__closed__ = None 

426 self.reset() 

427 

428 def reset(self): 

429 if self.file: 

430 try: 

431 self.file.close() 

432 except Exception as e: 

433 logger.debug('error closing file {}: {}'.format(self.file, e)) 

434 self.file = None 

435 

436 self.file = __internals__.create_temp_file() 

437 self._fileno = self.file.fileno() 

438 self.__closed__ = False 

439 

440 def getvalue(self): 

441 if hasattr(self.file, 'getvalue'): 

442 value = self.file.getvalue() 

443 else: 

444 value = self.file.read() 

445 self.file.seek(0) 

446 return value 

447 

448 def close(self): 

449 if self.__closed__: 

450 return 

451 self.__closed__ = True 

452 self.flush() 

453 

454 def flush(self): 

455 try: 

456 super().flush() 

457 except Exception as e: 

458 logger.debug('error closing file {}: {}'.format(self, e)) 

459 

460 try: 

461 self.file.flush() 

462 except Exception as e: 

463 logger.debug('error closing file {}: {}'.format(self.file, e)) 

464 

465 

466 

467 def fileno(self): 

468 return self._fileno 

469 

470 def __getattr__(self, name): 

471 try: 

472 return getattr(self.file, name) 

473 except AttributeError: 

474 return super().__getattribute__(name) 

475 

476 def __del__(self): 

477 try: 

478 self.close() 

479 except (ValueError, AttributeError): 

480 pass 

481 

482 # Adding the line below as a potential fix of github issue #426 

483 # that seems to be a compatible the solution of #413 

484 self.file.close() 

485 

486 

487 

488class FakeSSLSocket(object): 

489 """Shorthand for :py:class:`~httpretty.core.fakesock` 

490 """ 

491 def __init__(self, sock, *args, **kw): 

492 self._httpretty_sock = sock 

493 

494 def __getattr__(self, attr): 

495 return getattr(self._httpretty_sock, attr) 

496 

497 

498class FakeAddressTuple(object): 

499 def __init__(self, fakesocket): 

500 self.fakesocket = fakesocket 

501 

502 def __getitem__(self, *args, **kw): 

503 raise AssertionError('socket {} is not connected'.format(self.fakesocket.truesock)) 

504 

505 

506def fake_socketpair(*args, **kw): 

507 with restored_libs(): 

508 return old_socketpair(*args, **kw) 

509 

510class fakesock(object): 

511 """ 

512 fake :py:mod:`socket` 

513 """ 

514 class socket(object): 

515 """drop-in replacement for :py:class:`socket.socket` 

516 """ 

517 _entry = None 

518 _read_buf = None 

519 

520 debuglevel = 0 

521 _sent_data = [] 

522 is_secure = False 

523 def __init__( 

524 self, 

525 family=socket.AF_INET, 

526 type=socket.SOCK_STREAM, 

527 proto=0, 

528 fileno=None 

529 ): 

530 self.socket_family = family 

531 self.socket_type = type 

532 self.socket_proto = proto 

533 if httpretty.allow_net_connect: 

534 self.truesock = self.create_socket() 

535 else: 

536 self.truesock = None 

537 

538 self._address = FakeAddressTuple(self) 

539 self.__truesock_is_connected__ = False 

540 self.fd = FakeSockFile() 

541 self.fd.socket = fileno or self 

542 self.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 

543 self._sock = fileno or self 

544 self.is_http = False 

545 self._bufsize = 32 * 1024 

546 

547 def __repr__(self): 

548 return '{self.__class__.__module__}.{self.__class__.__name__}("{self.host}")'.format(**locals()) 

549 

550 @property 

551 def host(self): 

552 return ":".join(map(str, self._address)) 

553 

554 def create_socket(self, address=None): 

555 return old_socket(self.socket_family, self.socket_type, self.socket_proto) 

556 

557 def getpeercert(self, *a, **kw): 

558 now = datetime.now() 

559 shift = now + timedelta(days=30 * 12) 

560 return { 

561 'notAfter': shift.strftime('%b %d %H:%M:%S GMT'), 

562 'subjectAltName': ( 

563 ('DNS', '*.%s' % self._host), 

564 ('DNS', self._host), 

565 ('DNS', '*'), 

566 ), 

567 'subject': ( 

568 ( 

569 ('organizationName', '*.%s' % self._host), 

570 ), 

571 ( 

572 ('organizationalUnitName', 

573 'Domain Control Validated'), 

574 ), 

575 ( 

576 ('commonName', '*.%s' % self._host), 

577 ), 

578 ), 

579 } 

580 

581 def ssl(self, sock, *args, **kw): 

582 return sock 

583 

584 def setsockopt(self, level, optname, value): 

585 if httpretty.allow_net_connect and not self.truesock: 

586 self.truesock = self.create_socket() 

587 elif not self.truesock: 

588 logger.debug('setsockopt(%s, %s, %s) failed', level, optname, value) 

589 return 

590 

591 return self.truesock.setsockopt(level, optname, value) 

592 

593 def connect(self, address): 

594 try: 

595 self._address = (self._host, self._port) = address 

596 except ValueError: 

597 # We get here when the address is just a string pointing to a 

598 # unix socket path/file 

599 # 

600 # See issue #206 

601 self.is_http = False 

602 else: 

603 ports_to_check = ( 

604 POTENTIAL_HTTP_PORTS.union(POTENTIAL_HTTPS_PORTS)) 

605 self.is_http = self._port in ports_to_check 

606 self.is_secure = self._port in POTENTIAL_HTTPS_PORTS 

607 

608 if not self.is_http: 

609 self.connect_truesock(address=address) 

610 elif self.truesock and not self.real_socket_is_connected(): 

611 # TODO: remove nested if 

612 matcher = httpretty.match_http_address(self._host, self._port) 

613 if matcher is None: 

614 self.connect_truesock(address=address) 

615 

616 def bind(self, address): 

617 self._address = (self._host, self._port) = address 

618 if self.truesock: 

619 self.bind_truesock(address) 

620 

621 def bind_truesock(self, address): 

622 if httpretty.allow_net_connect and not self.truesock: 

623 self.truesock = self.create_socket() 

624 elif not self.truesock: 

625 raise UnmockedError('Failed to socket.bind() because because a real socket was never created.', address=address) 

626 

627 return self.truesock.bind(address) 

628 

629 def connect_truesock(self, request=None, address=None): 

630 address = address or self._address 

631 

632 if self.__truesock_is_connected__: 

633 return self.truesock 

634 

635 if request: 

636 logger.warning('real call to socket.connect() for {request}'.format(**locals())) 

637 elif address: 

638 logger.warning('real call to socket.connect() for {address}'.format(**locals())) 

639 else: 

640 logger.warning('real call to socket.connect()') 

641 

642 if httpretty.allow_net_connect and not self.truesock: 

643 self.truesock = self.create_socket(address) 

644 elif not self.truesock: 

645 raise UnmockedError('Failed to socket.connect() because because a real socket was never created.', request=request, address=address) 

646 

647 undo_patch_socket() 

648 try: 

649 hostname = self._address[0] 

650 port = 80 

651 if len(self._address) == 2: 

652 port = self._address[1] 

653 if port == 443 and old_sslsocket: 

654 self.truesock = old_ssl_wrap_socket(self.truesock, server_hostname=hostname) 

655 

656 sock = self.truesock 

657 

658 sock.connect(self._address) 

659 self.__truesock_is_connected__ = True 

660 self.truesock = sock 

661 finally: 

662 apply_patch_socket() 

663 

664 return self.truesock 

665 

666 def real_socket_is_connected(self): 

667 return self.__truesock_is_connected__ 

668 

669 def fileno(self): 

670 if self.truesock: 

671 return self.truesock.fileno() 

672 return self.fd.fileno() 

673 

674 def close(self): 

675 if self.truesock: 

676 self.truesock.close() 

677 self.truesock = None 

678 self.__truesock_is_connected__ = False 

679 

680 def makefile(self, mode='r', bufsize=-1): 

681 """Returns this fake socket's own tempfile buffer. 

682 

683 If there is an entry associated with the socket, the file 

684 descriptor gets filled in with the entry data before being 

685 returned. 

686 """ 

687 self._mode = mode 

688 self._bufsize = bufsize 

689 

690 if self._entry: 

691 t = __internals__.create_thread( 

692 target=self._entry.fill_filekind, args=(self.fd,) 

693 ) 

694 

695 # execute body callback and send http response in a 

696 # thread, wait for thread to finish within the timeout 

697 # set via socket.settimeout() 

698 t.start() 

699 if self.timeout == SOCKET_GLOBAL_DEFAULT_TIMEOUT: 

700 timeout = get_default_thread_timeout() 

701 else: 

702 timeout = self.timeout 

703 

704 # fake socket timeout error by checking if the thread 

705 # finished in time. 

706 t.join(timeout) 

707 if t.is_alive(): 

708 # For more info check issue https://github.com/gabrielfalcao/HTTPretty/issues/430 

709 raise socket.timeout(timeout) 

710 

711 return self.fd 

712 

713 def real_sendall(self, data, *args, **kw): 

714 """Sends data to the remote server. This method is called 

715 when HTTPretty identifies that someone is trying to send 

716 non-http data. 

717 

718 The received bytes are written in this socket's tempfile 

719 buffer so that HTTPretty can return it accordingly when 

720 necessary. 

721 """ 

722 request = kw.pop('request', None) 

723 if request: 

724 bytecount = len(data) 

725 logger.warning('{self}.real_sendall({bytecount} bytes) to {request.url} via {request.method} at {request.created_at}'.format(**locals())) 

726 

727 if httpretty.allow_net_connect and not self.truesock: 

728 

729 self.connect_truesock(request=request) 

730 elif not self.truesock: 

731 raise UnmockedError(request=request) 

732 

733 if not self.is_http: 

734 self.truesock.setblocking(1) 

735 return self.truesock.sendall(data, *args, **kw) 

736 

737 sock = self.connect_truesock(request=request) 

738 

739 sock.setblocking(1) 

740 sock.sendall(data, *args, **kw) 

741 

742 should_continue = True 

743 while should_continue: 

744 try: 

745 received = sock.recv(self._bufsize) 

746 self.fd.write(received) 

747 should_continue = bool(received.strip()) 

748 

749 except socket.error as e: 

750 if e.errno == EAGAIN: 

751 continue 

752 break 

753 

754 self.fd.seek(0) 

755 

756 def sendall(self, data, *args, **kw): 

757 # if self.__truesock_is_connected__: 

758 # return self.truesock.sendall(data, *args, **kw) 

759 

760 self._sent_data.append(data) 

761 self.fd = FakeSockFile() 

762 self.fd.socket = self 

763 if isinstance(data, str): 

764 data = data.encode('utf-8') 

765 elif not isinstance(data, bytes): 

766 logger.debug('cannot sendall({data!r})') 

767 data = bytes(data) 

768 

769 try: 

770 requestline, _ = data.split(b'\r\n', 1) 

771 method, path, version = parse_requestline( 

772 decode_utf8(requestline)) 

773 is_parsing_headers = True 

774 except ValueError: 

775 path = '' 

776 is_parsing_headers = False 

777 

778 if self._entry is None: 

779 # If the previous request wasn't mocked, don't 

780 # mock the subsequent sending of data 

781 return self.real_sendall(data, *args, **kw) 

782 else: 

783 method = self._entry.method 

784 path = self._entry.info.path 

785 

786 self.fd.seek(0) 

787 

788 if not is_parsing_headers: 

789 if len(self._sent_data) > 1: 

790 headers = utf8(last_requestline(self._sent_data)) 

791 meta = self._entry.request.headers 

792 body = utf8(self._sent_data[-1]) 

793 if meta.get('transfer-encoding', '') == 'chunked': 

794 if not body.isdigit() and (body != b'\r\n') and (body != b'0\r\n\r\n'): 

795 self._entry.request.body += body 

796 else: 

797 self._entry.request.body += body 

798 

799 httpretty.historify_request(headers, body, sock=self, append=False) 

800 return 

801 

802 if path[:2] == '//': 

803 path = '//' + path 

804 # path might come with 

805 s = urlsplit(path) 

806 POTENTIAL_HTTP_PORTS.add(int(s.port or 80)) 

807 parts = list(map(utf8, data.split(b'\r\n\r\n', 1))) 

808 if len(parts) == 2: 

809 headers, body = parts 

810 else: 

811 headers = '' 

812 body = data 

813 

814 request = httpretty.historify_request(headers, body, sock=self) 

815 

816 info = URIInfo( 

817 hostname=self._host, 

818 port=self._port, 

819 path=s.path, 

820 query=s.query, 

821 last_request=request 

822 ) 

823 

824 matcher, entries = httpretty.match_uriinfo(info) 

825 

826 if not entries: 

827 logger.debug('no entries matching {}'.format(request)) 

828 self._entry = None 

829 self._read_buf = None 

830 self.real_sendall(data, request=request) 

831 return 

832 

833 self._entry = matcher.get_next_entry(method, info, request) 

834 

835 def forward_and_trace(self, function_name, *a, **kw): 

836 if not self.truesock: 

837 raise UnmockedError('Failed to socket.{}() because because a real socket was never created.'.format(function_name)) 

838 

839 callback = getattr(self.truesock, function_name) 

840 return callback(*a, **kw) 

841 

842 def settimeout(self, new_timeout): 

843 self.timeout = new_timeout 

844 if not self.is_http: 

845 if self.truesock: 

846 self.truesock.settimeout(new_timeout) 

847 

848 def send(self, data, *args, **kwargs): 

849 self.sendall(data, *args, **kwargs) 

850 return len(data) 

851 

852 def sendto(self, *args, **kwargs): 

853 return self.forward_and_trace('sendto', *args, **kwargs) 

854 

855 def recvfrom_into(self, *args, **kwargs): 

856 return self.forward_and_trace('recvfrom_into', *args, **kwargs) 

857 

858 def recv_into(self, *args, **kwargs): 

859 return self.forward_and_trace('recv_into', *args, **kwargs) 

860 

861 def recvfrom(self, *args, **kwargs): 

862 return self.forward_and_trace('recvfrom', *args, **kwargs) 

863 

864 def recv(self, buffersize=0, *args, **kwargs): 

865 if not self._read_buf: 

866 self._read_buf = io.BytesIO() 

867 

868 if self._entry: 

869 self._entry.fill_filekind(self._read_buf) 

870 

871 if not self._read_buf: 

872 raise UnmockedError('socket cannot recv(): {!r}'.format(self)) 

873 

874 return self._read_buf.read(buffersize) 

875 

876 def __getattr__(self, name): 

877 if name in ('getsockopt', 'selected_alpn_protocol') and not self.truesock: 

878 self.truesock = self.create_socket() 

879 elif httpretty.allow_net_connect and not self.truesock: 

880 # can't call self.connect_truesock() here because we 

881 # don't know if user wants to execute server of client 

882 # calls (or can they?) 

883 self.truesock = self.create_socket() 

884 elif not self.truesock: 

885 # Special case for 

886 # `hasattr(sock, "version")` call added in urllib3>=1.26. 

887 if name == 'version': 

888 raise AttributeError( 

889 "HTTPretty synthesized this error to fix urllib3 compatibility " 

890 "(see issue https://github.com/gabrielfalcao/HTTPretty/issues/409). " 

891 "Please open an issue if this error causes further unexpected issues." 

892 ) 

893 

894 raise UnmockedError('Failed to socket.{} because because a real socket does not exist'.format(name)) 

895 

896 return getattr(self.truesock, name) 

897 

898def with_socket_is_secure(sock, kw): 

899 sock.is_secure = True 

900 sock.kwargs = kw 

901 for k, v in kw.items(): 

902 setattr(sock, k, v) 

903 return sock 

904 

905def fake_wrap_socket(orig_wrap_socket_fn, *args, **kw): 

906 """drop-in replacement for py:func:`ssl.wrap_socket` 

907 """ 

908 if 'sock' in kw: 

909 sock = kw['sock'] 

910 else: 

911 sock = args[0] 

912 

913 server_hostname = kw.get('server_hostname') 

914 if server_hostname is not None: 

915 matcher = httpretty.match_https_hostname(server_hostname) 

916 if matcher is None: 

917 logger.debug('no requests registered for hostname: "{}"'.format(server_hostname)) 

918 return with_socket_is_secure(sock, kw) 

919 

920 return with_socket_is_secure(sock, kw) 

921 

922 

923def create_fake_connection( 

924 address, 

925 timeout=socket._GLOBAL_DEFAULT_TIMEOUT, 

926 source_address=None): 

927 """drop-in replacement for :py:func:`socket.create_connection`""" 

928 s = fakesock.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) 

929 if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: 

930 s.settimeout(timeout) 

931 

932 if isinstance(source_address, tuple) and len(source_address) == 2: 

933 source_address[1] = int(source_address[1]) 

934 

935 if source_address: 

936 s.bind(source_address) 

937 s.connect(address) 

938 return s 

939 

940 

941def fake_gethostbyname(host): 

942 """drop-in replacement for :py:func:`socket.gethostbyname`""" 

943 return '127.0.0.1' 

944 

945 

946def fake_gethostname(): 

947 """drop-in replacement for :py:func:`socket.gethostname`""" 

948 return 'localhost' 

949 

950 

951def fake_getaddrinfo( 

952 host, port, family=None, socktype=None, proto=None, flags=None): 

953 """drop-in replacement for :py:func:`socket.getaddrinfo`""" 

954 return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, 

955 '', (host, port)), 

956 (socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, 

957 '', (host, port))] 

958 

959 

960class Entry(BaseClass): 

961 """Created by :py:meth:`~httpretty.core.httpretty.register_uri` and 

962 stored in memory as internal representation of a HTTP 

963 request/response definition. 

964 

965 Args: 

966 method (str): One of ``httpretty.GET``, ``httpretty.PUT``, ``httpretty.POST``, ``httpretty.DELETE``, ``httpretty.HEAD``, ``httpretty.PATCH``, ``httpretty.OPTIONS``, ``httpretty.CONNECT``. 

967 uri (str|re.Pattern): The URL to match 

968 adding_headers (dict): Extra headers to be added to the response 

969 forcing_headers (dict): Overwrite response headers. 

970 status (int): The status code for the response, defaults to ``200``. 

971 streaming (bool): Whether should stream the response into chunks via generator. 

972 headers: Headers to inject in the faked response. 

973 

974 Returns: 

975 httpretty.Entry: containing the request-matching metadata. 

976 

977 

978 .. warning:: When using the ``forcing_headers`` option make sure to add the header ``Content-Length`` to match at most the total body length, otherwise some HTTP clients can hang indefinitely. 

979 """ 

980 def __init__(self, method, uri, body, 

981 adding_headers=None, 

982 forcing_headers=None, 

983 status=200, 

984 streaming=False, 

985 **headers): 

986 

987 self.method = method 

988 self.uri = uri 

989 self.info = None 

990 self.request = None 

991 

992 self.body_is_callable = False 

993 if hasattr(body, "__call__"): 

994 self.callable_body = body 

995 self.body = None 

996 self.body_is_callable = True 

997 elif isinstance(body, str): 

998 self.body = utf8(body) 

999 else: 

1000 self.body = body 

1001 

1002 self.streaming = streaming 

1003 if not streaming and not self.body_is_callable: 

1004 self.body_length = len(self.body or '') 

1005 else: 

1006 self.body_length = 0 

1007 

1008 self.adding_headers = adding_headers or {} 

1009 self.forcing_headers = forcing_headers or {} 

1010 self.status = int(status) 

1011 

1012 for k, v in headers.items(): 

1013 name = "-".join(k.split("_")).title() 

1014 self.adding_headers[name] = v 

1015 

1016 self.validate() 

1017 

1018 def validate(self): 

1019 """validates the body size with the value of the ``Content-Length`` 

1020 header 

1021 """ 

1022 content_length_keys = 'Content-Length', 'content-length' 

1023 for key in content_length_keys: 

1024 got = self.adding_headers.get( 

1025 key, self.forcing_headers.get(key, None)) 

1026 

1027 if got is None: 

1028 continue 

1029 

1030 igot = None 

1031 try: 

1032 igot = int(got) 

1033 except (ValueError, TypeError): 

1034 warnings.warn( 

1035 'HTTPretty got to register the Content-Length header ' 

1036 'with "%r" which is not a number' % got) 

1037 return 

1038 

1039 if igot and igot > self.body_length: 

1040 raise HTTPrettyError( 

1041 'HTTPretty got inconsistent parameters. The header ' 

1042 'Content-Length you registered expects size "%d" but ' 

1043 'the body you registered for that has actually length ' 

1044 '"%d".' % ( 

1045 igot, self.body_length, 

1046 ) 

1047 ) 

1048 

1049 def __str__(self): 

1050 return r'<Entry {} {} getting {}>'.format( 

1051 self.method, 

1052 self.uri, 

1053 self.status 

1054 ) 

1055 

1056 def normalize_headers(self, headers): 

1057 """Normalize keys in header names so that ``COntent-tyPe`` becomes ``content-type`` 

1058 

1059 :param headers: dict 

1060 

1061 :returns: dict 

1062 """ 

1063 new = {} 

1064 for k in headers: 

1065 new_k = '-'.join([s.lower() for s in k.split('-')]) 

1066 new[new_k] = headers[k] 

1067 

1068 return new 

1069 

1070 def fill_filekind(self, fk): 

1071 """writes HTTP Response data to a file descriptor 

1072 

1073 :parm fk: a file-like object 

1074 

1075 .. warning:: **side-effect:** this method moves the cursor of the given file object to zero 

1076 """ 

1077 now = datetime.utcnow() 

1078 

1079 headers = { 

1080 'status': self.status, 

1081 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), 

1082 'server': 'Python/HTTPretty', 

1083 'connection': 'close', 

1084 } 

1085 

1086 if self.forcing_headers: 

1087 headers = self.forcing_headers 

1088 

1089 if self.adding_headers: 

1090 headers.update( 

1091 self.normalize_headers( 

1092 self.adding_headers)) 

1093 

1094 headers = self.normalize_headers(headers) 

1095 status = headers.get('status', self.status) 

1096 if self.body_is_callable: 

1097 status, headers, self.body = self.callable_body(self.request, self.info.full_url(), headers) 

1098 headers = self.normalize_headers(headers) 

1099 # TODO: document this behavior: 

1100 if 'content-length' not in headers: 

1101 headers.update({ 

1102 'content-length': len(self.body) 

1103 }) 

1104 

1105 string_list = [ 

1106 'HTTP/1.1 %d %s' % (status, STATUSES[status]), 

1107 ] 

1108 

1109 if 'date' in headers: 

1110 string_list.append('date: %s' % headers.pop('date')) 

1111 

1112 if not self.forcing_headers: 

1113 content_type = headers.pop('content-type', 

1114 'text/plain; charset=utf-8') 

1115 

1116 content_length = headers.pop('content-length', 

1117 self.body_length) 

1118 

1119 string_list.append('content-type: %s' % content_type) 

1120 if not self.streaming: 

1121 string_list.append('content-length: %s' % content_length) 

1122 

1123 server = headers.pop('server', None) 

1124 if server: 

1125 string_list.append('server: %s' % server) 

1126 

1127 for k, v in headers.items(): 

1128 string_list.append( 

1129 '{}: {}'.format(k, v), 

1130 ) 

1131 

1132 for item in string_list: 

1133 fk.write(utf8(item) + b'\n') 

1134 

1135 fk.write(b'\r\n') 

1136 

1137 if self.streaming: 

1138 self.body, body = itertools.tee(self.body) 

1139 for chunk in body: 

1140 fk.write(utf8(chunk)) 

1141 else: 

1142 fk.write(utf8(self.body)) 

1143 

1144 fk.seek(0) 

1145 

1146 

1147def url_fix(s, charset=None): 

1148 """escapes special characters 

1149 """ 

1150 if charset: 

1151 warnings.warn("{}.url_fix() charset argument is deprecated".format(__name__), DeprecationWarning) 

1152 

1153 scheme, netloc, path, querystring, fragment = urlsplit(s) 

1154 path = quote(path, b'/%') 

1155 querystring = quote_plus(querystring, b':&=') 

1156 return urlunsplit((scheme, netloc, path, querystring, fragment)) 

1157 

1158 

1159class URIInfo(BaseClass): 

1160 """Internal representation of `URIs <https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>`_ 

1161 

1162 .. tip:: all arguments are optional 

1163 

1164 :param username: 

1165 :param password: 

1166 :param hostname: 

1167 :param port: 

1168 :param path: 

1169 :param query: 

1170 :param fragment: 

1171 :param scheme: 

1172 :param last_request: 

1173 """ 

1174 default_str_attrs = ( 

1175 'username', 

1176 'password', 

1177 'hostname', 

1178 'port', 

1179 'path', 

1180 ) 

1181 

1182 def __init__(self, 

1183 username='', 

1184 password='', 

1185 hostname='', 

1186 port=80, 

1187 path='/', 

1188 query='', 

1189 fragment='', 

1190 scheme='', 

1191 last_request=None): 

1192 

1193 self.username = username or '' 

1194 self.password = password or '' 

1195 self.hostname = hostname or '' 

1196 

1197 if port: 

1198 port = int(port) 

1199 

1200 elif scheme == 'https': 

1201 port = 443 

1202 

1203 self.port = port or 80 

1204 self.path = path or '' 

1205 if query: 

1206 query_items = sorted(parse_qs(query).items()) 

1207 self.query = urlencode( 

1208 encode_obj(query_items), 

1209 doseq=True, 

1210 ) 

1211 else: 

1212 self.query = '' 

1213 if scheme: 

1214 self.scheme = scheme 

1215 elif self.port in POTENTIAL_HTTPS_PORTS: 

1216 self.scheme = 'https' 

1217 else: 

1218 self.scheme = 'http' 

1219 self.fragment = fragment or '' 

1220 self.last_request = last_request 

1221 

1222 def to_str(self, attrs): 

1223 fmt = ", ".join(['%s="%s"' % (k, getattr(self, k, '')) for k in attrs]) 

1224 return r'<httpretty.URIInfo(%s)>' % fmt 

1225 

1226 def __str__(self): 

1227 return self.to_str(self.default_str_attrs) 

1228 

1229 def str_with_query(self): 

1230 attrs = self.default_str_attrs + ('query',) 

1231 return self.to_str(attrs) 

1232 

1233 def __hash__(self): 

1234 return int(hashlib.sha1(bytes(self, 'ascii')).hexdigest(), 16) 

1235 

1236 def __eq__(self, other): 

1237 self_tuple = ( 

1238 self.port, 

1239 decode_utf8(self.hostname.lower()), 

1240 url_fix(decode_utf8(self.path)), 

1241 ) 

1242 other_tuple = ( 

1243 other.port, 

1244 decode_utf8(other.hostname.lower()), 

1245 url_fix(decode_utf8(other.path)), 

1246 ) 

1247 return self_tuple == other_tuple 

1248 

1249 def full_url(self, use_querystring=True): 

1250 """ 

1251 :param use_querystring: bool 

1252 :returns: a string with the full url with the format ``{scheme}://{credentials}{domain}{path}{query}`` 

1253 """ 

1254 credentials = "" 

1255 if self.password: 

1256 credentials = "{}:{}@".format( 

1257 self.username, self.password) 

1258 

1259 query = "" 

1260 if use_querystring and self.query: 

1261 query = "?{}".format(decode_utf8(self.query)) 

1262 

1263 result = "{scheme}://{credentials}{domain}{path}{query}".format( 

1264 scheme=self.scheme, 

1265 credentials=credentials, 

1266 domain=self.get_full_domain(), 

1267 path=decode_utf8(self.path), 

1268 query=query 

1269 ) 

1270 return result 

1271 

1272 def get_full_domain(self): 

1273 """ 

1274 :returns: a string in the form ``{domain}:{port}`` or just the domain if the port is 80 or 443 

1275 """ 

1276 hostname = decode_utf8(self.hostname) 

1277 # Port 80/443 should not be appended to the url 

1278 if self.port not in DEFAULT_HTTP_PORTS | DEFAULT_HTTPS_PORTS: 

1279 return ":".join([hostname, str(self.port)]) 

1280 

1281 return hostname 

1282 

1283 @classmethod 

1284 def from_uri(cls, uri, entry): 

1285 """ 

1286 :param uri: string 

1287 :param entry: an instance of :py:class:`~httpretty.core.Entry` 

1288 """ 

1289 result = urlsplit(uri) 

1290 if result.scheme == 'https': 

1291 POTENTIAL_HTTPS_PORTS.add(int(result.port or 443)) 

1292 else: 

1293 POTENTIAL_HTTP_PORTS.add(int(result.port or 80)) 

1294 return cls(result.username, 

1295 result.password, 

1296 result.hostname, 

1297 result.port, 

1298 result.path, 

1299 result.query, 

1300 result.fragment, 

1301 result.scheme, 

1302 entry) 

1303 

1304 

1305class URIMatcher(object): 

1306 regex = None 

1307 info = None 

1308 

1309 def __init__(self, uri, entries, match_querystring=False, priority=0): 

1310 self._match_querystring = match_querystring 

1311 # CPython, Jython 

1312 regex_types = ('SRE_Pattern', 'org.python.modules.sre.PatternObject', 

1313 'Pattern') 

1314 is_regex = type(uri).__name__ in regex_types 

1315 if is_regex: 

1316 self.regex = uri 

1317 result = urlsplit(uri.pattern) 

1318 if result.scheme == 'https': 

1319 POTENTIAL_HTTPS_PORTS.add(int(result.port or 443)) 

1320 else: 

1321 POTENTIAL_HTTP_PORTS.add(int(result.port or 80)) 

1322 else: 

1323 self.info = URIInfo.from_uri(uri, entries) 

1324 

1325 self.entries = entries 

1326 self.priority = priority 

1327 self.uri = uri 

1328 # hash of current_entry pointers, per method. 

1329 self.current_entries = {} 

1330 

1331 def matches(self, info): 

1332 if self.info: 

1333 # Query string is not considered when comparing info objects, compare separately 

1334 return self.info == info and (not self._match_querystring or self.info.query == info.query) 

1335 else: 

1336 return self.regex.search(info.full_url( 

1337 use_querystring=self._match_querystring)) 

1338 

1339 def __str__(self): 

1340 wrap = 'URLMatcher({})' 

1341 if self.info: 

1342 if self._match_querystring: 

1343 return wrap.format(str(self.info.str_with_query())) 

1344 else: 

1345 return wrap.format(str(self.info)) 

1346 else: 

1347 return wrap.format(self.regex.pattern) 

1348 

1349 def get_next_entry(self, method, info, request): 

1350 """Cycle through available responses, but only once. 

1351 Any subsequent requests will receive the last response""" 

1352 

1353 if method not in self.current_entries: 

1354 self.current_entries[method] = 0 

1355 

1356 # restrict selection to entries that match the requested 

1357 # method 

1358 entries_for_method = [e for e in self.entries if e.method == method] 

1359 

1360 if self.current_entries[method] >= len(entries_for_method): 

1361 self.current_entries[method] = -1 

1362 

1363 if not self.entries or not entries_for_method: 

1364 raise ValueError('I have no entries for method %s: %s' 

1365 % (method, self)) 

1366 

1367 entry = entries_for_method[self.current_entries[method]] 

1368 if self.current_entries[method] != -1: 

1369 self.current_entries[method] += 1 

1370 

1371 # Create a copy of the original entry to make it thread-safe 

1372 body = entry.callable_body if entry.body_is_callable else entry.body 

1373 new_entry = Entry(entry.method, entry.uri, body, 

1374 status=entry.status, 

1375 streaming=entry.streaming, 

1376 adding_headers=entry.adding_headers, 

1377 forcing_headers=entry.forcing_headers) 

1378 

1379 # Attach more info to the entry 

1380 # So the callback can be more clever about what to do 

1381 # This does also fix the case where the callback 

1382 # would be handed a compiled regex as uri instead of the 

1383 # real uri 

1384 new_entry.info = info 

1385 new_entry.request = request 

1386 return new_entry 

1387 

1388 def __hash__(self): 

1389 return hash(str(self)) 

1390 

1391 def __eq__(self, other): 

1392 return str(self) == str(other) 

1393 

1394 

1395class httpretty(HttpBaseClass): 

1396 """manages HTTPretty's internal request/response registry and request matching. 

1397 """ 

1398 _entries = {} 

1399 latest_requests = [] 

1400 

1401 last_request = HTTPrettyRequestEmpty() 

1402 _is_enabled = False 

1403 allow_net_connect = True 

1404 

1405 @classmethod 

1406 def match_uriinfo(cls, info): 

1407 """ 

1408 :param info: an :py:class:`~httpretty.core.URIInfo` 

1409 :returns: a 2-item tuple: (:py:class:`~httpretty.core.URLMatcher`, :py:class:`~httpretty.core.URIInfo`) or ``(None, [])`` 

1410 """ 

1411 items = sorted( 

1412 cls._entries.items(), 

1413 key=lambda matcher_entries: matcher_entries[0].priority, 

1414 reverse=True, 

1415 ) 

1416 for matcher, value in items: 

1417 if matcher.matches(info): 

1418 return (matcher, info) 

1419 

1420 return (None, []) 

1421 

1422 @classmethod 

1423 def match_https_hostname(cls, hostname): 

1424 """ 

1425 :param hostname: a string 

1426 :returns: an :py:class:`~httpretty.core.URLMatcher` or ``None`` 

1427 """ 

1428 items = sorted( 

1429 cls._entries.items(), 

1430 key=lambda matcher_entries: matcher_entries[0].priority, 

1431 reverse=True, 

1432 ) 

1433 for matcher, value in items: 

1434 if matcher.info is None: 

1435 pattern_with_port = "https://{0}:".format(hostname) 

1436 pattern_without_port = "https://{0}/".format(hostname) 

1437 hostname_pattern = ( 

1438 hostname_re 

1439 .match(matcher.regex.pattern) 

1440 .group(0) 

1441 ) 

1442 for pattern in [pattern_with_port, pattern_without_port]: 

1443 if re.match(hostname_pattern, pattern): 

1444 return matcher 

1445 

1446 elif matcher.info.hostname == hostname: 

1447 return matcher 

1448 return None 

1449 

1450 @classmethod 

1451 def match_http_address(cls, hostname, port): 

1452 """ 

1453 :param hostname: a string 

1454 :param port: an integer 

1455 :returns: an :py:class:`~httpretty.core.URLMatcher` or ``None`` 

1456 """ 

1457 items = sorted( 

1458 cls._entries.items(), 

1459 key=lambda matcher_entries: matcher_entries[0].priority, 

1460 reverse=True, 

1461 ) 

1462 for matcher, value in items: 

1463 if matcher.info is None: 

1464 if port in POTENTIAL_HTTPS_PORTS: 

1465 scheme = 'https://' 

1466 else: 

1467 scheme = 'http://' 

1468 

1469 pattern_without_port = "{0}{1}/".format(scheme, hostname) 

1470 pattern_with_port = "{0}{1}:{2}/".format(scheme, hostname, port) 

1471 hostname_pattern = ( 

1472 hostname_re 

1473 .match(matcher.regex.pattern) 

1474 .group(0) 

1475 ) 

1476 for pattern in [pattern_with_port, pattern_without_port]: 

1477 if re.match(hostname_pattern, pattern): 

1478 return matcher 

1479 

1480 elif matcher.info.hostname == hostname \ 

1481 and matcher.info.port == port: 

1482 return matcher 

1483 

1484 return None 

1485 

1486 @classmethod 

1487 @contextlib.contextmanager 

1488 def record(cls, filename, indentation=4, encoding='utf-8', verbose=False, allow_net_connect=True, pool_manager_params=None): 

1489 """ 

1490 .. testcode:: 

1491 

1492 import io 

1493 import json 

1494 import requests 

1495 import httpretty 

1496 

1497 with httpretty.record('/tmp/ip.json'): 

1498 data = requests.get('https://httpbin.org/ip').json() 

1499 

1500 with io.open('/tmp/ip.json') as fd: 

1501 assert data == json.load(fd) 

1502 

1503 :param filename: a string 

1504 :param indentation: an integer, defaults to **4** 

1505 :param encoding: a string, defaults to **"utf-8"** 

1506 

1507 :returns: a `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_ 

1508 """ 

1509 try: 

1510 import urllib3 

1511 except ImportError: 

1512 msg = ( 

1513 'HTTPretty requires urllib3 installed ' 

1514 'for recording actual requests.' 

1515 ) 

1516 raise RuntimeError(msg) 

1517 

1518 http = urllib3.PoolManager(**pool_manager_params or {}) 

1519 

1520 cls.enable(allow_net_connect, verbose=verbose) 

1521 calls = [] 

1522 

1523 def record_request(request, uri, headers): 

1524 cls.disable() 

1525 

1526 kw = {} 

1527 kw.setdefault('body', request.body) 

1528 kw.setdefault('headers', dict(request.headers)) 

1529 response = http.request(request.method, uri, **kw) 

1530 calls.append({ 

1531 'request': { 

1532 'uri': uri, 

1533 'method': request.method, 

1534 'headers': dict(request.headers), 

1535 'body': decode_utf8(request.body), 

1536 'querystring': request.querystring 

1537 }, 

1538 'response': { 

1539 'status': response.status, 

1540 'body': decode_utf8(response.data), 

1541 # urllib3 1.10 had a bug if you just did: 

1542 # dict(response.headers) 

1543 # which would cause all the values to become lists 

1544 # with the header name as the first item and the 

1545 # true value as the second item. Workaround that 

1546 'headers': dict(response.headers.items()) 

1547 } 

1548 }) 

1549 cls.enable(allow_net_connect, verbose=verbose) 

1550 return response.status, response.headers, response.data 

1551 

1552 for method in cls.METHODS: 

1553 cls.register_uri(method, MULTILINE_ANY_REGEX, body=record_request) 

1554 

1555 yield 

1556 cls.disable() 

1557 with codecs.open(filename, 'w', encoding) as f: 

1558 f.write(json.dumps(calls, indent=indentation)) 

1559 

1560 @classmethod 

1561 @contextlib.contextmanager 

1562 def playback(cls, filename, allow_net_connect=True, verbose=False): 

1563 """ 

1564 .. testcode:: 

1565 

1566 import io 

1567 import json 

1568 import requests 

1569 import httpretty 

1570 

1571 with httpretty.record('/tmp/ip.json'): 

1572 data = requests.get('https://httpbin.org/ip').json() 

1573 

1574 with io.open('/tmp/ip.json') as fd: 

1575 assert data == json.load(fd) 

1576 

1577 :param filename: a string 

1578 :returns: a `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_ 

1579 """ 

1580 cls.enable(allow_net_connect, verbose=verbose) 

1581 

1582 data = json.loads(open(filename).read()) 

1583 for item in data: 

1584 uri = item['request']['uri'] 

1585 method = item['request']['method'] 

1586 body = item['response']['body'] 

1587 headers = item['response']['headers'] 

1588 cls.register_uri(method, uri, body=body, forcing_headers=headers) 

1589 

1590 yield 

1591 cls.disable() 

1592 

1593 @classmethod 

1594 def reset(cls): 

1595 """resets the internal state of HTTPretty, unregistering all URLs 

1596 """ 

1597 POTENTIAL_HTTP_PORTS.intersection_update(DEFAULT_HTTP_PORTS) 

1598 POTENTIAL_HTTPS_PORTS.intersection_update(DEFAULT_HTTPS_PORTS) 

1599 cls._entries.clear() 

1600 cls.latest_requests = [] 

1601 cls.last_request = HTTPrettyRequestEmpty() 

1602 __internals__.cleanup_sockets() 

1603 

1604 @classmethod 

1605 def historify_request(cls, headers, body='', sock=None, append=True): 

1606 """appends request to a list for later retrieval 

1607 

1608 .. testcode:: 

1609 

1610 import httpretty 

1611 

1612 httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip', body='') 

1613 with httpretty.enabled(): 

1614 requests.get('https://httpbin.org/ip') 

1615 

1616 assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip' 

1617 """ 

1618 request = HTTPrettyRequest(headers, body, sock=sock) 

1619 cls.last_request = request 

1620 

1621 if append or not cls.latest_requests: 

1622 cls.latest_requests.append(request) 

1623 else: 

1624 cls.latest_requests[-1] = request 

1625 

1626 logger.info("captured: {}".format(request)) 

1627 return request 

1628 

1629 @classmethod 

1630 def register_uri(cls, method, uri, body='{"message": "HTTPretty :)"}', 

1631 adding_headers=None, 

1632 forcing_headers=None, 

1633 status=200, 

1634 responses=None, 

1635 match_querystring=False, 

1636 priority=0, 

1637 **headers): 

1638 """ 

1639 .. testcode:: 

1640 

1641 import httpretty 

1642 

1643 

1644 def request_callback(request, uri, response_headers): 

1645 content_type = request.headers.get('Content-Type') 

1646 assert request.body == '{"nothing": "here"}', 'unexpected body: {}'.format(request.body) 

1647 assert content_type == 'application/json', 'expected application/json but received Content-Type: {}'.format(content_type) 

1648 return [200, response_headers, json.dumps({"hello": "world"})] 

1649 

1650 httpretty.register_uri( 

1651 HTTPretty.POST, "https://httpretty.example.com/api", 

1652 body=request_callback) 

1653 

1654 

1655 with httpretty.enabled(): 

1656 requests.post('https://httpretty.example.com/api', data='{"nothing": "here"}', headers={'Content-Type': 'application/json'}) 

1657 

1658 assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip' 

1659 

1660 :param method: one of ``httpretty.GET``, ``httpretty.PUT``, ``httpretty.POST``, ``httpretty.DELETE``, ``httpretty.HEAD``, ``httpretty.PATCH``, ``httpretty.OPTIONS``, ``httpretty.CONNECT`` 

1661 :param uri: a string or regex pattern (e.g.: **"https://httpbin.org/ip"**) 

1662 :param body: a string, defaults to ``{"message": "HTTPretty :)"}`` 

1663 :param adding_headers: dict - headers to be added to the response 

1664 :param forcing_headers: dict - headers to be forcefully set in the response 

1665 :param status: an integer, defaults to **200** 

1666 :param responses: a list of entries, ideally each created with :py:meth:`~httpretty.core.httpretty.Response` 

1667 :param priority: an integer, useful for setting higher priority over previously registered urls. defaults to zero 

1668 :param match_querystring: bool - whether to take the querystring into account when matching an URL 

1669 :param headers: headers to be added to the response 

1670 

1671 .. warning:: When using a port in the request, add a trailing slash if no path is provided otherwise Httpretty will not catch the request. Ex: ``httpretty.register_uri(httpretty.GET, 'http://fakeuri.com:8080/', body='{"hello":"world"}')`` 

1672 """ 

1673 uri_is_string = isinstance(uri, str) 

1674 

1675 if uri_is_string and re.search(r'^\w+://[^/]+[.]\w{2,}(:[0-9]+)?$', uri): 

1676 uri += '/' 

1677 

1678 if isinstance(responses, list) and len(responses) > 0: 

1679 for response in responses: 

1680 response.uri = uri 

1681 response.method = method 

1682 entries_for_this_uri = responses 

1683 else: 

1684 headers['body'] = body 

1685 headers['adding_headers'] = adding_headers 

1686 headers['forcing_headers'] = forcing_headers 

1687 headers['status'] = status 

1688 

1689 entries_for_this_uri = [ 

1690 cls.Response(method=method, uri=uri, **headers), 

1691 ] 

1692 

1693 matcher = URIMatcher(uri, entries_for_this_uri, 

1694 match_querystring, priority) 

1695 if matcher in cls._entries: 

1696 matcher.entries.extend(cls._entries[matcher]) 

1697 del cls._entries[matcher] 

1698 

1699 cls._entries[matcher] = entries_for_this_uri 

1700 

1701 def __str__(self): 

1702 return '<HTTPretty with %d URI entries>' % len(self._entries) 

1703 

1704 @classmethod 

1705 def Response( 

1706 cls, body, 

1707 method=None, 

1708 uri=None, 

1709 adding_headers=None, 

1710 forcing_headers=None, 

1711 status=200, 

1712 streaming=False, 

1713 **kw): 

1714 """Shortcut to create an :py:class:`~httpretty.core.Entry` that takes 

1715 the body as first positional argument. 

1716 

1717 .. seealso:: the parameters of this function match those of 

1718 the :py:class:`~httpretty.core.Entry` constructor. 

1719 

1720 Args: 

1721 body (str): The body to return as response.. 

1722 method (str): One of ``httpretty.GET``, ``httpretty.PUT``, ``httpretty.POST``, ``httpretty.DELETE``, ``httpretty.HEAD``, ``httpretty.PATCH``, ``httpretty.OPTIONS``, ``httpretty.CONNECT``. 

1723 uri (str|re.Pattern): The URL to match 

1724 adding_headers (dict): Extra headers to be added to the response 

1725 forcing_headers (dict): Overwrite **any** response headers, even "Content-Length". 

1726 status (int): The status code for the response, defaults to ``200``. 

1727 streaming (bool): Whether should stream the response into chunks via generator. 

1728 kwargs: Keyword-arguments are forwarded to :py:class:`~httpretty.core.Entry` 

1729 

1730 Returns: 

1731 httpretty.Entry: containing the request-matching metadata. 

1732 """ 

1733 kw['body'] = body 

1734 kw['adding_headers'] = adding_headers 

1735 kw['forcing_headers'] = forcing_headers 

1736 kw['status'] = int(status) 

1737 kw['streaming'] = streaming 

1738 return Entry(method, uri, **kw) 

1739 

1740 @classmethod 

1741 def disable(cls): 

1742 """Disables HTTPretty entirely, putting the original :py:mod:`socket` 

1743 module back in its place. 

1744 

1745 

1746 .. code:: 

1747 

1748 import re, json 

1749 import httpretty 

1750 

1751 httpretty.enable() 

1752 # request passes through fake socket 

1753 response = requests.get('https://httpbin.org') 

1754 

1755 httpretty.disable() 

1756 # request uses real python socket module 

1757 response = requests.get('https://httpbin.org') 

1758 

1759 .. note:: This method does not call :py:meth:`httpretty.core.reset` automatically. 

1760 """ 

1761 undo_patch_socket() 

1762 cls._is_enabled = False 

1763 

1764 

1765 @classmethod 

1766 def is_enabled(cls): 

1767 """Check if HTTPretty is enabled 

1768 

1769 :returns: bool 

1770 

1771 .. testcode:: 

1772 

1773 import httpretty 

1774 

1775 httpretty.enable() 

1776 assert httpretty.is_enabled() == True 

1777 

1778 httpretty.disable() 

1779 assert httpretty.is_enabled() == False 

1780 """ 

1781 return cls._is_enabled 

1782 

1783 @classmethod 

1784 def enable(cls, allow_net_connect=True, verbose=False): 

1785 """Enables HTTPretty. 

1786 

1787 :param allow_net_connect: boolean to determine if unmatched requests are forwarded to a real network connection OR throw :py:class:`httpretty.errors.UnmockedError`. 

1788 :param verbose: boolean to set HTTPretty's logging level to DEBUG 

1789 

1790 .. testcode:: 

1791 

1792 import re, json 

1793 import httpretty 

1794 

1795 httpretty.enable(allow_net_connect=True, verbose=True) 

1796 

1797 httpretty.register_uri( 

1798 httpretty.GET, 

1799 re.compile(r'http://.*'), 

1800 body=json.dumps({'man': 'in', 'the': 'middle'}) 

1801 ) 

1802 

1803 response = requests.get('https://foo.bar/foo/bar') 

1804 

1805 response.json().should.equal({ 

1806 "man": "in", 

1807 "the": "middle", 

1808 }) 

1809 

1810 .. warning:: after calling this method the original :py:mod:`socket` is replaced with :py:class:`httpretty.core.fakesock`. Make sure to call :py:meth:`~httpretty.disable` after done with your tests or use the :py:class:`httpretty.enabled` as decorator or `context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_ 

1811 """ 

1812 httpretty.allow_net_connect = allow_net_connect 

1813 apply_patch_socket() 

1814 cls._is_enabled = True 

1815 if verbose: 

1816 logger.setLevel(logging.DEBUG) 

1817 else: 

1818 logger.setLevel(logging.getLogger().level or logging.WARNING) 

1819 

1820 

1821def apply_patch_socket(): 

1822 # Some versions of python internally shadowed the 

1823 # SocketType variable incorrectly https://bugs.python.org/issue20386 

1824 bad_socket_shadow = (socket.socket != socket.SocketType) 

1825 

1826 new_wrap = None 

1827 socket.socket = fakesock.socket 

1828 socket.socketpair = fake_socketpair 

1829 socket._socketobject = fakesock.socket 

1830 if not bad_socket_shadow: 

1831 socket.SocketType = fakesock.socket 

1832 

1833 socket.create_connection = create_fake_connection 

1834 socket.gethostname = fake_gethostname 

1835 socket.gethostbyname = fake_gethostbyname 

1836 socket.getaddrinfo = fake_getaddrinfo 

1837 

1838 socket.__dict__['socket'] = fakesock.socket 

1839 socket.__dict__['socketpair'] = fake_socketpair 

1840 socket.__dict__['_socketobject'] = fakesock.socket 

1841 if not bad_socket_shadow: 

1842 socket.__dict__['SocketType'] = fakesock.socket 

1843 

1844 socket.__dict__['create_connection'] = create_fake_connection 

1845 socket.__dict__['gethostname'] = fake_gethostname 

1846 socket.__dict__['gethostbyname'] = fake_gethostbyname 

1847 socket.__dict__['getaddrinfo'] = fake_getaddrinfo 

1848 

1849 

1850 # Take out the pyopenssl version - use the default implementation 

1851 for extract_from_urllib3 in pyopenssl_overrides_extract: 

1852 extract_into_urllib3() 

1853 

1854 if requests_urllib3_connection is not None: 

1855 urllib3_wrap = partial(fake_wrap_socket, old_requests_ssl_wrap_socket) 

1856 requests_urllib3_connection.ssl_wrap_socket = urllib3_wrap 

1857 requests_urllib3_connection.__dict__['ssl_wrap_socket'] = urllib3_wrap 

1858 

1859 if eventlet: 

1860 eventlet.green.ssl.GreenSSLContext = old_sslcontext_class 

1861 eventlet.green.ssl.__dict__['GreenSSLContext'] = old_sslcontext_class 

1862 eventlet.green.ssl.SSLContext = old_sslcontext_class 

1863 eventlet.green.ssl.__dict__['SSLContext'] = old_sslcontext_class 

1864 

1865 if socks: 

1866 socks.socksocket = fakesock.socket 

1867 socks.__dict__['socksocket'] = fakesock.socket 

1868 

1869 if ssl: 

1870 new_wrap = partial(fake_wrap_socket, old_ssl_wrap_socket) 

1871 ssl.wrap_socket = new_wrap 

1872 ssl.SSLSocket = FakeSSLSocket 

1873 ssl.SSLContext = old_sslcontext_class 

1874 try: 

1875 ssl.SSLContext.wrap_socket = partial(fake_wrap_socket, old_ssl_wrap_socket) 

1876 except AttributeError: 

1877 pass 

1878 

1879 ssl.__dict__['wrap_socket'] = new_wrap 

1880 ssl.__dict__['SSLSocket'] = FakeSSLSocket 

1881 ssl.__dict__['SSLContext'] = old_sslcontext_class 

1882 

1883 

1884def undo_patch_socket(): 

1885 socket.socket = old_socket 

1886 socket.socketpair = old_socketpair 

1887 socket.SocketType = old_SocketType 

1888 socket._socketobject = old_socket 

1889 

1890 socket.create_connection = old_create_connection 

1891 socket.gethostname = old_gethostname 

1892 socket.gethostbyname = old_gethostbyname 

1893 socket.getaddrinfo = old_getaddrinfo 

1894 

1895 socket.__dict__['socket'] = old_socket 

1896 socket.__dict__['socketpair'] = old_socketpair 

1897 socket.__dict__['_socketobject'] = old_socket 

1898 socket.__dict__['SocketType'] = old_SocketType 

1899 

1900 socket.__dict__['create_connection'] = old_create_connection 

1901 socket.__dict__['gethostname'] = old_gethostname 

1902 socket.__dict__['gethostbyname'] = old_gethostbyname 

1903 socket.__dict__['getaddrinfo'] = old_getaddrinfo 

1904 

1905 if socks: 

1906 socks.socksocket = old_socksocket 

1907 socks.__dict__['socksocket'] = old_socksocket 

1908 

1909 if ssl: 

1910 ssl.wrap_socket = old_ssl_wrap_socket 

1911 ssl.SSLSocket = old_sslsocket 

1912 try: 

1913 ssl.SSLContext.wrap_socket = old_sslcontext_wrap_socket 

1914 except AttributeError: 

1915 pass 

1916 ssl.__dict__['wrap_socket'] = old_ssl_wrap_socket 

1917 ssl.__dict__['SSLSocket'] = old_sslsocket 

1918 

1919 if requests_urllib3_connection is not None: 

1920 requests_urllib3_connection.ssl_wrap_socket = \ 

1921 old_requests_ssl_wrap_socket 

1922 requests_urllib3_connection.__dict__['ssl_wrap_socket'] = \ 

1923 old_requests_ssl_wrap_socket 

1924 

1925 

1926 # Put the pyopenssl version back in place 

1927 for inject_from_urllib3 in pyopenssl_overrides_inject: 

1928 inject_into_urllib3() 

1929 

1930 

1931@contextlib.contextmanager 

1932def restored_libs(): 

1933 undo_patch_socket() 

1934 yield 

1935 apply_patch_socket() 

1936 

1937 

1938class httprettized(object): 

1939 """`context-manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_ for enabling HTTPretty. 

1940 

1941 .. tip:: Also available under the alias :py:func:`httpretty.enabled` 

1942 

1943 .. testcode:: 

1944 

1945 import json 

1946 import httpretty 

1947 

1948 httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip', body=json.dumps({'origin': '42.42.42.42'})) 

1949 with httpretty.enabled(): 

1950 response = requests.get('https://httpbin.org/ip') 

1951 

1952 assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip' 

1953 assert response.json() == {'origin': '42.42.42.42'} 

1954 """ 

1955 def __init__(self, allow_net_connect=True, verbose=False): 

1956 self.allow_net_connect = allow_net_connect 

1957 self.verbose = verbose 

1958 

1959 def __enter__(self): 

1960 httpretty.reset() 

1961 httpretty.enable(allow_net_connect=self.allow_net_connect, verbose=self.verbose) 

1962 

1963 def __exit__(self, exc_type, exc_value, db): 

1964 httpretty.disable() 

1965 httpretty.reset() 

1966 

1967 

1968def httprettified(test=None, allow_net_connect=True, verbose=False): 

1969 """decorator for test functions 

1970 

1971 .. tip:: Also available under the alias :py:func:`httpretty.activate` 

1972 

1973 :param test: a callable 

1974 

1975 

1976 example usage with `nosetests <https://nose.readthedocs.io/en/latest/>`_ 

1977 

1978 .. testcode:: 

1979 

1980 import sure 

1981 from httpretty import httprettified 

1982 

1983 @httprettified 

1984 def test_using_nosetests(): 

1985 httpretty.register_uri( 

1986 httpretty.GET, 

1987 'https://httpbin.org/ip' 

1988 ) 

1989 

1990 response = requests.get('https://httpbin.org/ip') 

1991 

1992 response.json().should.equal({ 

1993 "message": "HTTPretty :)" 

1994 }) 

1995 

1996 example usage with `unittest module <https://docs.python.org/3/library/unittest.html>`_ 

1997 

1998 .. testcode:: 

1999 

2000 import unittest 

2001 from sure import expect 

2002 from httpretty import httprettified 

2003 

2004 @httprettified 

2005 class TestWithPyUnit(unittest.TestCase): 

2006 def test_httpbin(self): 

2007 httpretty.register_uri(httpretty.GET, 'https://httpbin.org/ip') 

2008 response = requests.get('https://httpbin.org/ip') 

2009 expect(response.json()).to.equal({ 

2010 "message": "HTTPretty :)" 

2011 }) 

2012 

2013 """ 

2014 def decorate_unittest_TestCase_setUp(klass): 

2015 

2016 # Prefer addCleanup (added in python 2.7), but fall back 

2017 # to using tearDown if it isn't available 

2018 use_addCleanup = hasattr(klass, 'addCleanup') 

2019 

2020 original_setUp = (klass.setUp 

2021 if hasattr(klass, 'setUp') 

2022 else None) 

2023 

2024 def new_setUp(self): 

2025 httpretty.reset() 

2026 httpretty.enable(allow_net_connect, verbose=verbose) 

2027 if use_addCleanup: 

2028 self.addCleanup(httpretty.disable) 

2029 if original_setUp: 

2030 original_setUp(self) 

2031 klass.setUp = new_setUp 

2032 

2033 if not use_addCleanup: 

2034 original_tearDown = (klass.setUp 

2035 if hasattr(klass, 'tearDown') 

2036 else None) 

2037 

2038 def new_tearDown(self): 

2039 httpretty.disable() 

2040 httpretty.reset() 

2041 if original_tearDown: 

2042 original_tearDown(self) 

2043 klass.tearDown = new_tearDown 

2044 

2045 return klass 

2046 

2047 def decorate_test_methods(klass): 

2048 for attr in dir(klass): 

2049 if not attr.startswith('test_'): 

2050 continue 

2051 

2052 attr_value = getattr(klass, attr) 

2053 if not hasattr(attr_value, "__call__"): 

2054 continue 

2055 

2056 setattr(klass, attr, decorate_callable(attr_value)) 

2057 return klass 

2058 

2059 def is_unittest_TestCase(klass): 

2060 try: 

2061 import unittest 

2062 return issubclass(klass, unittest.TestCase) 

2063 except ImportError: 

2064 return False 

2065 

2066 def decorate_class(klass): 

2067 if is_unittest_TestCase(klass): 

2068 return decorate_unittest_TestCase_setUp(klass) 

2069 return decorate_test_methods(klass) 

2070 

2071 def decorate_callable(test): 

2072 @functools.wraps(test) 

2073 def wrapper(*args, **kw): 

2074 with httprettized(allow_net_connect): 

2075 return test(*args, **kw) 

2076 return wrapper 

2077 

2078 if isinstance(test, type): 

2079 return decorate_class(test) 

2080 elif callable(test): 

2081 return decorate_callable(test) 

2082 return decorate_callable