Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/simple_httpclient.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

357 statements  

1from tornado.escape import _unicode 

2from tornado import gen, version 

3from tornado.httpclient import ( 

4 HTTPResponse, 

5 HTTPError, 

6 AsyncHTTPClient, 

7 main, 

8 _RequestProxy, 

9 HTTPRequest, 

10) 

11from tornado import httputil 

12from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters 

13from tornado.ioloop import IOLoop 

14from tornado.iostream import StreamClosedError, IOStream 

15from tornado.netutil import ( 

16 Resolver, 

17 OverrideResolver, 

18 _client_ssl_defaults, 

19 is_valid_ip, 

20) 

21from tornado.log import gen_log 

22from tornado.tcpclient import TCPClient 

23 

24import base64 

25import collections 

26import copy 

27import functools 

28import re 

29import socket 

30import ssl 

31import sys 

32import time 

33from io import BytesIO 

34import urllib.parse 

35 

36from typing import Dict, Any, Callable, Optional, Type, Union 

37from types import TracebackType 

38import typing 

39 

40if typing.TYPE_CHECKING: 

41 from typing import Deque, Tuple, List # noqa: F401 

42 

43 

44class HTTPTimeoutError(HTTPError): 

45 """Error raised by SimpleAsyncHTTPClient on timeout. 

46 

47 For historical reasons, this is a subclass of `.HTTPClientError` 

48 which simulates a response code of 599. 

49 

50 .. versionadded:: 5.1 

51 """ 

52 

53 def __init__(self, message: str) -> None: 

54 super().__init__(599, message=message) 

55 

56 def __str__(self) -> str: 

57 return self.message or "Timeout" 

58 

59 

60class HTTPStreamClosedError(HTTPError): 

61 """Error raised by SimpleAsyncHTTPClient when the underlying stream is closed. 

62 

63 When a more specific exception is available (such as `ConnectionResetError`), 

64 it may be raised instead of this one. 

65 

66 For historical reasons, this is a subclass of `.HTTPClientError` 

67 which simulates a response code of 599. 

68 

69 .. versionadded:: 5.1 

70 """ 

71 

72 def __init__(self, message: str) -> None: 

73 super().__init__(599, message=message) 

74 

75 def __str__(self) -> str: 

76 return self.message or "Stream closed" 

77 

78 

79class SimpleAsyncHTTPClient(AsyncHTTPClient): 

80 """Non-blocking HTTP client with no external dependencies. 

81 

82 This class implements an HTTP 1.1 client on top of Tornado's IOStreams. 

83 Some features found in the curl-based AsyncHTTPClient are not yet 

84 supported. In particular, proxies are not supported, connections 

85 are not reused, and callers cannot select the network interface to be 

86 used. 

87 

88 This implementation supports the following arguments, which can be passed 

89 to ``configure()`` to control the global singleton, or to the constructor 

90 when ``force_instance=True``. 

91 

92 ``max_clients`` is the number of concurrent requests that can be 

93 in progress; when this limit is reached additional requests will be 

94 queued. Note that time spent waiting in this queue still counts 

95 against the ``request_timeout``. 

96 

97 ``defaults`` is a dict of parameters that will be used as defaults on all 

98 `.HTTPRequest` objects submitted to this client. 

99 

100 ``hostname_mapping`` is a dictionary mapping hostnames to IP addresses. 

101 It can be used to make local DNS changes when modifying system-wide 

102 settings like ``/etc/hosts`` is not possible or desirable (e.g. in 

103 unittests). ``resolver`` is similar, but using the `.Resolver` interface 

104 instead of a simple mapping. 

105 

106 ``max_buffer_size`` (default 100MB) is the number of bytes 

107 that can be read into memory at once. ``max_body_size`` 

108 (defaults to ``max_buffer_size``) is the largest response body 

109 that the client will accept. Without a 

110 ``streaming_callback``, the smaller of these two limits 

111 applies; with a ``streaming_callback`` only ``max_body_size`` 

112 does. 

113 

114 .. versionchanged:: 4.2 

115 Added the ``max_body_size`` argument. 

116 """ 

117 

118 def initialize( # type: ignore 

119 self, 

120 max_clients: int = 10, 

121 hostname_mapping: Optional[Dict[str, str]] = None, 

122 max_buffer_size: int = 104857600, 

123 resolver: Optional[Resolver] = None, 

124 defaults: Optional[Dict[str, Any]] = None, 

125 max_header_size: Optional[int] = None, 

126 max_body_size: Optional[int] = None, 

127 ) -> None: 

128 super().initialize(defaults=defaults) 

129 self.max_clients = max_clients 

130 self.queue = ( 

131 collections.deque() 

132 ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]] 

133 self.active = ( 

134 {} 

135 ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]] 

136 self.waiting = ( 

137 {} 

138 ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]] 

139 self.max_buffer_size = max_buffer_size 

140 self.max_header_size = max_header_size 

141 self.max_body_size = max_body_size 

142 # TCPClient could create a Resolver for us, but we have to do it 

143 # ourselves to support hostname_mapping. 

144 if resolver: 

145 self.resolver = resolver 

146 self.own_resolver = False 

147 else: 

148 self.resolver = Resolver() 

149 self.own_resolver = True 

150 if hostname_mapping is not None: 

151 self.resolver = OverrideResolver( 

152 resolver=self.resolver, mapping=hostname_mapping 

153 ) 

154 self.tcp_client = TCPClient(resolver=self.resolver) 

155 

156 def close(self) -> None: 

157 super().close() 

158 if self.own_resolver: 

159 self.resolver.close() 

160 self.tcp_client.close() 

161 

162 def fetch_impl( 

163 self, request: HTTPRequest, callback: Callable[[HTTPResponse], None] 

164 ) -> None: 

165 key = object() 

166 self.queue.append((key, request, callback)) 

167 assert request.connect_timeout is not None 

168 assert request.request_timeout is not None 

169 timeout_handle = None 

170 if len(self.active) >= self.max_clients: 

171 timeout = ( 

172 min(request.connect_timeout, request.request_timeout) 

173 or request.connect_timeout 

174 or request.request_timeout 

175 ) # min but skip zero 

176 if timeout: 

177 timeout_handle = self.io_loop.add_timeout( 

178 self.io_loop.time() + timeout, 

179 functools.partial(self._on_timeout, key, "in request queue"), 

180 ) 

181 self.waiting[key] = (request, callback, timeout_handle) 

182 self._process_queue() 

183 if self.queue: 

184 gen_log.debug( 

185 "max_clients limit reached, request queued. " 

186 "%d active, %d queued requests." % (len(self.active), len(self.queue)) 

187 ) 

188 

189 def _process_queue(self) -> None: 

190 while self.queue and len(self.active) < self.max_clients: 

191 key, request, callback = self.queue.popleft() 

192 if key not in self.waiting: 

193 continue 

194 self._remove_timeout(key) 

195 self.active[key] = (request, callback) 

196 release_callback = functools.partial(self._release_fetch, key) 

197 self._handle_request(request, release_callback, callback) 

198 

199 def _connection_class(self) -> type: 

200 return _HTTPConnection 

201 

202 def _handle_request( 

203 self, 

204 request: HTTPRequest, 

205 release_callback: Callable[[], None], 

206 final_callback: Callable[[HTTPResponse], None], 

207 ) -> None: 

208 self._connection_class()( 

209 self, 

210 request, 

211 release_callback, 

212 final_callback, 

213 self.max_buffer_size, 

214 self.tcp_client, 

215 self.max_header_size, 

216 self.max_body_size, 

217 ) 

218 

219 def _release_fetch(self, key: object) -> None: 

220 del self.active[key] 

221 self._process_queue() 

222 

223 def _remove_timeout(self, key: object) -> None: 

224 if key in self.waiting: 

225 request, callback, timeout_handle = self.waiting[key] 

226 if timeout_handle is not None: 

227 self.io_loop.remove_timeout(timeout_handle) 

228 del self.waiting[key] 

229 

230 def _on_timeout(self, key: object, info: Optional[str] = None) -> None: 

231 """Timeout callback of request. 

232 

233 Construct a timeout HTTPResponse when a timeout occurs. 

234 

235 :arg object key: A simple object to mark the request. 

236 :info string key: More detailed timeout information. 

237 """ 

238 request, callback, timeout_handle = self.waiting[key] 

239 self.queue.remove((key, request, callback)) 

240 

241 error_message = "Timeout {0}".format(info) if info else "Timeout" 

242 timeout_response = HTTPResponse( 

243 request, 

244 599, 

245 error=HTTPTimeoutError(error_message), 

246 request_time=self.io_loop.time() - request.start_time, 

247 ) 

248 self.io_loop.add_callback(callback, timeout_response) 

249 del self.waiting[key] 

250 

251 

252class _HTTPConnection(httputil.HTTPMessageDelegate): 

253 _SUPPORTED_METHODS = set( 

254 ["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"] 

255 ) 

256 

257 def __init__( 

258 self, 

259 client: Optional[SimpleAsyncHTTPClient], 

260 request: HTTPRequest, 

261 release_callback: Callable[[], None], 

262 final_callback: Callable[[HTTPResponse], None], 

263 max_buffer_size: int, 

264 tcp_client: TCPClient, 

265 max_header_size: int, 

266 max_body_size: int, 

267 ) -> None: 

268 self.io_loop = IOLoop.current() 

269 self.start_time = self.io_loop.time() 

270 self.start_wall_time = time.time() 

271 self.client = client 

272 self.request = request 

273 self.release_callback = release_callback 

274 self.final_callback = final_callback 

275 self.max_buffer_size = max_buffer_size 

276 self.tcp_client = tcp_client 

277 self.max_header_size = max_header_size 

278 self.max_body_size = max_body_size 

279 self.code = None # type: Optional[int] 

280 self.headers = None # type: Optional[httputil.HTTPHeaders] 

281 self.chunks = [] # type: List[bytes] 

282 self._decompressor = None 

283 # Timeout handle returned by IOLoop.add_timeout 

284 self._timeout = None # type: object 

285 self._sockaddr = None 

286 IOLoop.current().add_future( 

287 gen.convert_yielded(self.run()), lambda f: f.result() 

288 ) 

289 

290 async def run(self) -> None: 

291 try: 

292 self.parsed = urllib.parse.urlsplit(_unicode(self.request.url)) 

293 if self.parsed.scheme not in ("http", "https"): 

294 raise ValueError("Unsupported url scheme: %s" % self.request.url) 

295 # urlsplit results have hostname and port results, but they 

296 # didn't support ipv6 literals until python 2.7. 

297 netloc = self.parsed.netloc 

298 if "@" in netloc: 

299 userpass, _, netloc = netloc.rpartition("@") 

300 host, port = httputil.split_host_and_port(netloc) 

301 if port is None: 

302 port = 443 if self.parsed.scheme == "https" else 80 

303 if re.match(r"^\[.*\]$", host): 

304 # raw ipv6 addresses in urls are enclosed in brackets 

305 host = host[1:-1] 

306 self.parsed_hostname = host # save final host for _on_connect 

307 

308 if self.request.allow_ipv6 is False: 

309 af = socket.AF_INET 

310 else: 

311 af = socket.AF_UNSPEC 

312 

313 ssl_options = self._get_ssl_options(self.parsed.scheme) 

314 

315 source_ip = None 

316 if self.request.network_interface: 

317 if is_valid_ip(self.request.network_interface): 

318 source_ip = self.request.network_interface 

319 else: 

320 raise ValueError( 

321 "Unrecognized IPv4 or IPv6 address for network_interface, got %r" 

322 % (self.request.network_interface,) 

323 ) 

324 

325 if self.request.connect_timeout and self.request.request_timeout: 

326 timeout = min( 

327 self.request.connect_timeout, self.request.request_timeout 

328 ) 

329 elif self.request.connect_timeout: 

330 timeout = self.request.connect_timeout 

331 elif self.request.request_timeout: 

332 timeout = self.request.request_timeout 

333 else: 

334 timeout = 0 

335 if timeout: 

336 self._timeout = self.io_loop.add_timeout( 

337 self.start_time + timeout, 

338 functools.partial(self._on_timeout, "while connecting"), 

339 ) 

340 stream = await self.tcp_client.connect( 

341 host, 

342 port, 

343 af=af, 

344 ssl_options=ssl_options, 

345 max_buffer_size=self.max_buffer_size, 

346 source_ip=source_ip, 

347 ) 

348 

349 if self.final_callback is None: 

350 # final_callback is cleared if we've hit our timeout. 

351 stream.close() 

352 return 

353 self.stream = stream 

354 self.stream.set_close_callback(self.on_connection_close) 

355 self._remove_timeout() 

356 if self.final_callback is None: 

357 return 

358 if self.request.request_timeout: 

359 self._timeout = self.io_loop.add_timeout( 

360 self.start_time + self.request.request_timeout, 

361 functools.partial(self._on_timeout, "during request"), 

362 ) 

363 if ( 

364 self.request.method not in self._SUPPORTED_METHODS 

365 and not self.request.allow_nonstandard_methods 

366 ): 

367 raise KeyError("unknown method %s" % self.request.method) 

368 for key in ( 

369 "proxy_host", 

370 "proxy_port", 

371 "proxy_username", 

372 "proxy_password", 

373 "proxy_auth_mode", 

374 ): 

375 if getattr(self.request, key, None): 

376 raise NotImplementedError("%s not supported" % key) 

377 if "Connection" not in self.request.headers: 

378 self.request.headers["Connection"] = "close" 

379 if "Host" not in self.request.headers: 

380 if "@" in self.parsed.netloc: 

381 self.request.headers["Host"] = self.parsed.netloc.rpartition("@")[ 

382 -1 

383 ] 

384 else: 

385 self.request.headers["Host"] = self.parsed.netloc 

386 username, password = None, None 

387 if self.parsed.username is not None: 

388 username, password = self.parsed.username, self.parsed.password 

389 elif self.request.auth_username is not None: 

390 username = self.request.auth_username 

391 password = self.request.auth_password or "" 

392 if username is not None: 

393 assert password is not None 

394 if self.request.auth_mode not in (None, "basic"): 

395 raise ValueError("unsupported auth_mode %s", self.request.auth_mode) 

396 self.request.headers["Authorization"] = "Basic " + _unicode( 

397 base64.b64encode( 

398 httputil.encode_username_password(username, password) 

399 ) 

400 ) 

401 if self.request.user_agent: 

402 self.request.headers["User-Agent"] = self.request.user_agent 

403 elif self.request.headers.get("User-Agent") is None: 

404 self.request.headers["User-Agent"] = "Tornado/{}".format(version) 

405 if not self.request.allow_nonstandard_methods: 

406 # Some HTTP methods nearly always have bodies while others 

407 # almost never do. Fail in this case unless the user has 

408 # opted out of sanity checks with allow_nonstandard_methods. 

409 body_expected = self.request.method in ("POST", "PATCH", "PUT") 

410 body_present = ( 

411 self.request.body is not None 

412 or self.request.body_producer is not None 

413 ) 

414 if (body_expected and not body_present) or ( 

415 body_present and not body_expected 

416 ): 

417 raise ValueError( 

418 "Body must %sbe None for method %s (unless " 

419 "allow_nonstandard_methods is true)" 

420 % ("not " if body_expected else "", self.request.method) 

421 ) 

422 if self.request.expect_100_continue: 

423 self.request.headers["Expect"] = "100-continue" 

424 if self.request.body is not None: 

425 # When body_producer is used the caller is responsible for 

426 # setting Content-Length (or else chunked encoding will be used). 

427 self.request.headers["Content-Length"] = str(len(self.request.body)) 

428 if ( 

429 self.request.method == "POST" 

430 and "Content-Type" not in self.request.headers 

431 ): 

432 self.request.headers["Content-Type"] = ( 

433 "application/x-www-form-urlencoded" 

434 ) 

435 if self.request.decompress_response: 

436 self.request.headers["Accept-Encoding"] = "gzip" 

437 req_path = (self.parsed.path or "/") + ( 

438 ("?" + self.parsed.query) if self.parsed.query else "" 

439 ) 

440 self.connection = self._create_connection(stream) 

441 start_line = httputil.RequestStartLine(self.request.method, req_path, "") 

442 self.connection.write_headers(start_line, self.request.headers) 

443 if self.request.expect_100_continue: 

444 await self.connection.read_response(self) 

445 else: 

446 await self._write_body(True) 

447 except Exception: 

448 if not self._handle_exception(*sys.exc_info()): 

449 raise 

450 

451 def _get_ssl_options( 

452 self, scheme: str 

453 ) -> Union[None, Dict[str, Any], ssl.SSLContext]: 

454 if scheme == "https": 

455 if self.request.ssl_options is not None: 

456 return self.request.ssl_options 

457 # If we are using the defaults, don't construct a 

458 # new SSLContext. 

459 if ( 

460 self.request.validate_cert 

461 and self.request.ca_certs is None 

462 and self.request.client_cert is None 

463 and self.request.client_key is None 

464 ): 

465 return _client_ssl_defaults 

466 ssl_ctx = ssl.create_default_context( 

467 ssl.Purpose.SERVER_AUTH, cafile=self.request.ca_certs 

468 ) 

469 if not self.request.validate_cert: 

470 ssl_ctx.check_hostname = False 

471 ssl_ctx.verify_mode = ssl.CERT_NONE 

472 if self.request.client_cert is not None: 

473 ssl_ctx.load_cert_chain( 

474 self.request.client_cert, self.request.client_key 

475 ) 

476 if hasattr(ssl, "OP_NO_COMPRESSION"): 

477 # See netutil.ssl_options_to_context 

478 ssl_ctx.options |= ssl.OP_NO_COMPRESSION 

479 return ssl_ctx 

480 return None 

481 

482 def _on_timeout(self, info: Optional[str] = None) -> None: 

483 """Timeout callback of _HTTPConnection instance. 

484 

485 Raise a `HTTPTimeoutError` when a timeout occurs. 

486 

487 :info string key: More detailed timeout information. 

488 """ 

489 self._timeout = None 

490 error_message = "Timeout {0}".format(info) if info else "Timeout" 

491 if self.final_callback is not None: 

492 self._handle_exception( 

493 HTTPTimeoutError, HTTPTimeoutError(error_message), None 

494 ) 

495 

496 def _remove_timeout(self) -> None: 

497 if self._timeout is not None: 

498 self.io_loop.remove_timeout(self._timeout) 

499 self._timeout = None 

500 

501 def _create_connection(self, stream: IOStream) -> HTTP1Connection: 

502 stream.set_nodelay(True) 

503 connection = HTTP1Connection( 

504 stream, 

505 True, 

506 HTTP1ConnectionParameters( 

507 no_keep_alive=True, 

508 max_header_size=self.max_header_size, 

509 max_body_size=self.max_body_size, 

510 decompress=bool(self.request.decompress_response), 

511 ), 

512 self._sockaddr, 

513 ) 

514 return connection 

515 

516 async def _write_body(self, start_read: bool) -> None: 

517 if self.request.body is not None: 

518 self.connection.write(self.request.body) 

519 elif self.request.body_producer is not None: 

520 fut = self.request.body_producer(self.connection.write) 

521 if fut is not None: 

522 await fut 

523 self.connection.finish() 

524 if start_read: 

525 try: 

526 await self.connection.read_response(self) 

527 except StreamClosedError: 

528 if not self._handle_exception(*sys.exc_info()): 

529 raise 

530 

531 def _release(self) -> None: 

532 if self.release_callback is not None: 

533 release_callback = self.release_callback 

534 self.release_callback = None # type: ignore 

535 release_callback() 

536 

537 def _run_callback(self, response: HTTPResponse) -> None: 

538 self._release() 

539 if self.final_callback is not None: 

540 final_callback = self.final_callback 

541 self.final_callback = None # type: ignore 

542 self.io_loop.add_callback(final_callback, response) 

543 

544 def _handle_exception( 

545 self, 

546 typ: "Optional[Type[BaseException]]", 

547 value: Optional[BaseException], 

548 tb: Optional[TracebackType], 

549 ) -> bool: 

550 if self.final_callback is not None: 

551 self._remove_timeout() 

552 if isinstance(value, StreamClosedError): 

553 if value.real_error is None: 

554 value = HTTPStreamClosedError("Stream closed") 

555 else: 

556 value = value.real_error 

557 self._run_callback( 

558 HTTPResponse( 

559 self.request, 

560 599, 

561 error=value, 

562 request_time=self.io_loop.time() - self.start_time, 

563 start_time=self.start_wall_time, 

564 ) 

565 ) 

566 

567 if hasattr(self, "stream"): 

568 # TODO: this may cause a StreamClosedError to be raised 

569 # by the connection's Future. Should we cancel the 

570 # connection more gracefully? 

571 self.stream.close() 

572 return True 

573 else: 

574 # If our callback has already been called, we are probably 

575 # catching an exception that is not caused by us but rather 

576 # some child of our callback. Rather than drop it on the floor, 

577 # pass it along, unless it's just the stream being closed. 

578 return isinstance(value, StreamClosedError) 

579 

580 def on_connection_close(self) -> None: 

581 if self.final_callback is not None: 

582 message = "Connection closed" 

583 if self.stream.error: 

584 raise self.stream.error 

585 try: 

586 raise HTTPStreamClosedError(message) 

587 except HTTPStreamClosedError: 

588 self._handle_exception(*sys.exc_info()) 

589 

590 async def headers_received( 

591 self, 

592 first_line: Union[httputil.ResponseStartLine, httputil.RequestStartLine], 

593 headers: httputil.HTTPHeaders, 

594 ) -> None: 

595 assert isinstance(first_line, httputil.ResponseStartLine) 

596 if self.request.expect_100_continue and first_line.code == 100: 

597 await self._write_body(False) 

598 return 

599 self.code = first_line.code 

600 self.reason = first_line.reason 

601 self.headers = headers 

602 

603 if self._should_follow_redirect(): 

604 return 

605 

606 if self.request.header_callback is not None: 

607 # Reassemble the start line. 

608 self.request.header_callback("%s %s %s\r\n" % first_line) 

609 for k, v in self.headers.get_all(): 

610 self.request.header_callback("%s: %s\r\n" % (k, v)) 

611 self.request.header_callback("\r\n") 

612 

613 def _should_follow_redirect(self) -> bool: 

614 if self.request.follow_redirects: 

615 assert self.request.max_redirects is not None 

616 return ( 

617 self.code in (301, 302, 303, 307, 308) 

618 and self.request.max_redirects > 0 

619 and self.headers is not None 

620 and self.headers.get("Location") is not None 

621 ) 

622 return False 

623 

624 def finish(self) -> None: 

625 assert self.code is not None 

626 data = b"".join(self.chunks) 

627 self._remove_timeout() 

628 original_request = getattr(self.request, "original_request", self.request) 

629 if self._should_follow_redirect(): 

630 assert isinstance(self.request, _RequestProxy) 

631 assert self.headers is not None 

632 new_request = copy.copy(self.request.request) 

633 new_request.url = urllib.parse.urljoin( 

634 self.request.url, self.headers["Location"] 

635 ) 

636 assert self.request.max_redirects is not None 

637 new_request.max_redirects = self.request.max_redirects - 1 

638 del new_request.headers["Host"] 

639 # https://tools.ietf.org/html/rfc7231#section-6.4 

640 # 

641 # The original HTTP spec said that after a 301 or 302 

642 # redirect, the request method should be preserved. 

643 # However, browsers implemented this by changing the 

644 # method to GET, and the behavior stuck. 303 redirects 

645 # always specified this POST-to-GET behavior, arguably 

646 # for *all* methods, but libcurl < 7.70 only does this 

647 # for POST, while libcurl >= 7.70 does it for other methods. 

648 if (self.code == 303 and self.request.method != "HEAD") or ( 

649 self.code in (301, 302) and self.request.method == "POST" 

650 ): 

651 new_request.method = "GET" 

652 new_request.body = None # type: ignore 

653 for h in [ 

654 "Content-Length", 

655 "Content-Type", 

656 "Content-Encoding", 

657 "Transfer-Encoding", 

658 ]: 

659 try: 

660 del self.request.headers[h] 

661 except KeyError: 

662 pass 

663 new_request.original_request = original_request # type: ignore 

664 final_callback = self.final_callback 

665 self.final_callback = None # type: ignore 

666 self._release() 

667 assert self.client is not None 

668 fut = self.client.fetch(new_request, raise_error=False) 

669 fut.add_done_callback(lambda f: final_callback(f.result())) 

670 self._on_end_request() 

671 return 

672 if self.request.streaming_callback: 

673 buffer = BytesIO() 

674 else: 

675 buffer = BytesIO(data) # TODO: don't require one big string? 

676 response = HTTPResponse( 

677 original_request, 

678 self.code, 

679 reason=getattr(self, "reason", None), 

680 headers=self.headers, 

681 request_time=self.io_loop.time() - self.start_time, 

682 start_time=self.start_wall_time, 

683 buffer=buffer, 

684 effective_url=self.request.url, 

685 ) 

686 self._run_callback(response) 

687 self._on_end_request() 

688 

689 def _on_end_request(self) -> None: 

690 self.stream.close() 

691 

692 def data_received(self, chunk: bytes) -> None: 

693 if self._should_follow_redirect(): 

694 # We're going to follow a redirect so just discard the body. 

695 return 

696 if self.request.streaming_callback is not None: 

697 self.request.streaming_callback(chunk) 

698 else: 

699 self.chunks.append(chunk) 

700 

701 

702if __name__ == "__main__": 

703 AsyncHTTPClient.configure(SimpleAsyncHTTPClient) 

704 main()