Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/simple_httpclient.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

357 statements  

1from tornado.escape import _unicode 

2from tornado import gen, version 

3from tornado.httpclient import ( 

4 HTTPResponse, 

5 HTTPError, 

6 AsyncHTTPClient, 

7 main, 

8 _RequestProxy, 

9 HTTPRequest, 

10) 

11from tornado import httputil 

12from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters 

13from tornado.ioloop import IOLoop 

14from tornado.iostream import StreamClosedError, IOStream 

15from tornado.netutil import ( 

16 Resolver, 

17 OverrideResolver, 

18 _client_ssl_defaults, 

19 is_valid_ip, 

20) 

21from tornado.log import gen_log 

22from tornado.tcpclient import TCPClient 

23 

24import base64 

25import collections 

26import copy 

27import functools 

28import re 

29import socket 

30import ssl 

31import sys 

32import time 

33from io import BytesIO 

34import urllib.parse 

35 

36from typing import Dict, Any, Callable, Optional, Type, Union 

37from types import TracebackType 

38import typing 

39 

40if typing.TYPE_CHECKING: 

41 from typing import Deque, Tuple, List # noqa: F401 

42 

43 

44class HTTPTimeoutError(HTTPError): 

45 """Error raised by SimpleAsyncHTTPClient on timeout. 

46 

47 For historical reasons, this is a subclass of `.HTTPClientError` 

48 which simulates a response code of 599. 

49 

50 .. versionadded:: 5.1 

51 """ 

52 

53 def __init__(self, message: str) -> None: 

54 super().__init__(599, message=message) 

55 

56 def __str__(self) -> str: 

57 return self.message or "Timeout" 

58 

59 

60class HTTPStreamClosedError(HTTPError): 

61 """Error raised by SimpleAsyncHTTPClient when the underlying stream is closed. 

62 

63 When a more specific exception is available (such as `ConnectionResetError`), 

64 it may be raised instead of this one. 

65 

66 For historical reasons, this is a subclass of `.HTTPClientError` 

67 which simulates a response code of 599. 

68 

69 .. versionadded:: 5.1 

70 """ 

71 

72 def __init__(self, message: str) -> None: 

73 super().__init__(599, message=message) 

74 

75 def __str__(self) -> str: 

76 return self.message or "Stream closed" 

77 

78 

79class SimpleAsyncHTTPClient(AsyncHTTPClient): 

80 """Non-blocking HTTP client with no external dependencies. 

81 

82 This class implements an HTTP 1.1 client on top of Tornado's IOStreams. 

83 Some features found in the curl-based AsyncHTTPClient are not yet 

84 supported. In particular, proxies are not supported, connections 

85 are not reused, and callers cannot select the network interface to be 

86 used. 

87 

88 This implementation supports the following arguments, which can be passed 

89 to ``configure()`` to control the global singleton, or to the constructor 

90 when ``force_instance=True``. 

91 

92 ``max_clients`` is the number of concurrent requests that can be 

93 in progress; when this limit is reached additional requests will be 

94 queued. Note that time spent waiting in this queue still counts 

95 against the ``request_timeout``. 

96 

97 ``defaults`` is a dict of parameters that will be used as defaults on all 

98 `.HTTPRequest` objects submitted to this client. 

99 

100 ``hostname_mapping`` is a dictionary mapping hostnames to IP addresses. 

101 It can be used to make local DNS changes when modifying system-wide 

102 settings like ``/etc/hosts`` is not possible or desirable (e.g. in 

103 unittests). ``resolver`` is similar, but using the `.Resolver` interface 

104 instead of a simple mapping. 

105 

106 ``max_buffer_size`` (default 100MB) is the number of bytes 

107 that can be read into memory at once. ``max_body_size`` 

108 (defaults to ``max_buffer_size``) is the largest response body 

109 that the client will accept. Without a 

110 ``streaming_callback``, the smaller of these two limits 

111 applies; with a ``streaming_callback`` only ``max_body_size`` 

112 does. 

113 

114 .. versionchanged:: 4.2 

115 Added the ``max_body_size`` argument. 

116 """ 

117 

118 def initialize( # type: ignore 

119 self, 

120 max_clients: int = 10, 

121 hostname_mapping: Optional[Dict[str, str]] = None, 

122 max_buffer_size: int = 104857600, 

123 resolver: Optional[Resolver] = None, 

124 defaults: Optional[Dict[str, Any]] = None, 

125 max_header_size: Optional[int] = None, 

126 max_body_size: Optional[int] = None, 

127 ) -> None: 

128 super().initialize(defaults=defaults) 

129 self.max_clients = max_clients 

130 self.queue = ( 

131 collections.deque() 

132 ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]] 

133 self.active = ( 

134 {} 

135 ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]] 

136 self.waiting = ( 

137 {} 

138 ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]] 

139 self.max_buffer_size = max_buffer_size 

140 self.max_header_size = max_header_size 

141 self.max_body_size = max_body_size 

142 # TCPClient could create a Resolver for us, but we have to do it 

143 # ourselves to support hostname_mapping. 

144 if resolver: 

145 self.resolver = resolver 

146 self.own_resolver = False 

147 else: 

148 self.resolver = Resolver() 

149 self.own_resolver = True 

150 if hostname_mapping is not None: 

151 self.resolver = OverrideResolver( 

152 resolver=self.resolver, mapping=hostname_mapping 

153 ) 

154 self.tcp_client = TCPClient(resolver=self.resolver) 

155 

156 def close(self) -> None: 

157 super().close() 

158 if self.own_resolver: 

159 self.resolver.close() 

160 self.tcp_client.close() 

161 

162 def fetch_impl( 

163 self, request: HTTPRequest, callback: Callable[[HTTPResponse], None] 

164 ) -> None: 

165 key = object() 

166 self.queue.append((key, request, callback)) 

167 assert request.connect_timeout is not None 

168 assert request.request_timeout is not None 

169 timeout_handle = None 

170 if len(self.active) >= self.max_clients: 

171 timeout = ( 

172 min(request.connect_timeout, request.request_timeout) 

173 or request.connect_timeout 

174 or request.request_timeout 

175 ) # min but skip zero 

176 if timeout: 

177 timeout_handle = self.io_loop.add_timeout( 

178 self.io_loop.time() + timeout, 

179 functools.partial(self._on_timeout, key, "in request queue"), 

180 ) 

181 self.waiting[key] = (request, callback, timeout_handle) 

182 self._process_queue() 

183 if self.queue: 

184 gen_log.debug( 

185 "max_clients limit reached, request queued. " 

186 "%d active, %d queued requests." % (len(self.active), len(self.queue)) 

187 ) 

188 

189 def _process_queue(self) -> None: 

190 while self.queue and len(self.active) < self.max_clients: 

191 key, request, callback = self.queue.popleft() 

192 if key not in self.waiting: 

193 continue 

194 self._remove_timeout(key) 

195 self.active[key] = (request, callback) 

196 release_callback = functools.partial(self._release_fetch, key) 

197 self._handle_request(request, release_callback, callback) 

198 

199 def _connection_class(self) -> type: 

200 return _HTTPConnection 

201 

202 def _handle_request( 

203 self, 

204 request: HTTPRequest, 

205 release_callback: Callable[[], None], 

206 final_callback: Callable[[HTTPResponse], None], 

207 ) -> None: 

208 self._connection_class()( 

209 self, 

210 request, 

211 release_callback, 

212 final_callback, 

213 self.max_buffer_size, 

214 self.tcp_client, 

215 self.max_header_size, 

216 self.max_body_size, 

217 ) 

218 

219 def _release_fetch(self, key: object) -> None: 

220 del self.active[key] 

221 self._process_queue() 

222 

223 def _remove_timeout(self, key: object) -> None: 

224 if key in self.waiting: 

225 request, callback, timeout_handle = self.waiting[key] 

226 if timeout_handle is not None: 

227 self.io_loop.remove_timeout(timeout_handle) 

228 del self.waiting[key] 

229 

230 def _on_timeout(self, key: object, info: Optional[str] = None) -> None: 

231 """Timeout callback of request. 

232 

233 Construct a timeout HTTPResponse when a timeout occurs. 

234 

235 :arg object key: A simple object to mark the request. 

236 :info string key: More detailed timeout information. 

237 """ 

238 request, callback, timeout_handle = self.waiting[key] 

239 self.queue.remove((key, request, callback)) 

240 

241 error_message = f"Timeout {info}" if info else "Timeout" 

242 timeout_response = HTTPResponse( 

243 request, 

244 599, 

245 error=HTTPTimeoutError(error_message), 

246 request_time=self.io_loop.time() - request.start_time, 

247 ) 

248 self.io_loop.add_callback(callback, timeout_response) 

249 del self.waiting[key] 

250 

251 

252class _HTTPConnection(httputil.HTTPMessageDelegate): 

253 _SUPPORTED_METHODS = {"GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"} 

254 

255 def __init__( 

256 self, 

257 client: Optional[SimpleAsyncHTTPClient], 

258 request: HTTPRequest, 

259 release_callback: Callable[[], None], 

260 final_callback: Callable[[HTTPResponse], None], 

261 max_buffer_size: int, 

262 tcp_client: TCPClient, 

263 max_header_size: int, 

264 max_body_size: int, 

265 ) -> None: 

266 self.io_loop = IOLoop.current() 

267 self.start_time = self.io_loop.time() 

268 self.start_wall_time = time.time() 

269 self.client = client 

270 self.request = request 

271 self.release_callback = release_callback 

272 self.final_callback = final_callback 

273 self.max_buffer_size = max_buffer_size 

274 self.tcp_client = tcp_client 

275 self.max_header_size = max_header_size 

276 self.max_body_size = max_body_size 

277 self.code = None # type: Optional[int] 

278 self.headers = None # type: Optional[httputil.HTTPHeaders] 

279 self.chunks = [] # type: List[bytes] 

280 self._decompressor = None 

281 # Timeout handle returned by IOLoop.add_timeout 

282 self._timeout = None # type: object 

283 self._sockaddr = None 

284 IOLoop.current().add_future( 

285 gen.convert_yielded(self.run()), lambda f: f.result() 

286 ) 

287 

288 async def run(self) -> None: 

289 try: 

290 self.parsed = urllib.parse.urlsplit(_unicode(self.request.url)) 

291 if self.parsed.scheme not in ("http", "https"): 

292 raise ValueError("Unsupported url scheme: %s" % self.request.url) 

293 # urlsplit results have hostname and port results, but they 

294 # didn't support ipv6 literals until python 2.7. 

295 netloc = self.parsed.netloc 

296 if "@" in netloc: 

297 userpass, _, netloc = netloc.rpartition("@") 

298 host, port = httputil.split_host_and_port(netloc) 

299 if port is None: 

300 port = 443 if self.parsed.scheme == "https" else 80 

301 if re.match(r"^\[.*\]$", host): 

302 # raw ipv6 addresses in urls are enclosed in brackets 

303 host = host[1:-1] 

304 self.parsed_hostname = host # save final host for _on_connect 

305 

306 if self.request.allow_ipv6 is False: 

307 af = socket.AF_INET 

308 else: 

309 af = socket.AF_UNSPEC 

310 

311 ssl_options = self._get_ssl_options(self.parsed.scheme) 

312 

313 source_ip = None 

314 if self.request.network_interface: 

315 if is_valid_ip(self.request.network_interface): 

316 source_ip = self.request.network_interface 

317 else: 

318 raise ValueError( 

319 "Unrecognized IPv4 or IPv6 address for network_interface, got %r" 

320 % (self.request.network_interface,) 

321 ) 

322 

323 if self.request.connect_timeout and self.request.request_timeout: 

324 timeout = min( 

325 self.request.connect_timeout, self.request.request_timeout 

326 ) 

327 elif self.request.connect_timeout: 

328 timeout = self.request.connect_timeout 

329 elif self.request.request_timeout: 

330 timeout = self.request.request_timeout 

331 else: 

332 timeout = 0 

333 if timeout: 

334 self._timeout = self.io_loop.add_timeout( 

335 self.start_time + timeout, 

336 functools.partial(self._on_timeout, "while connecting"), 

337 ) 

338 stream = await self.tcp_client.connect( 

339 host, 

340 port, 

341 af=af, 

342 ssl_options=ssl_options, 

343 max_buffer_size=self.max_buffer_size, 

344 source_ip=source_ip, 

345 ) 

346 

347 if self.final_callback is None: 

348 # final_callback is cleared if we've hit our timeout. 

349 stream.close() 

350 return 

351 self.stream = stream 

352 self.stream.set_close_callback(self.on_connection_close) 

353 self._remove_timeout() 

354 if self.final_callback is None: 

355 return 

356 if self.request.request_timeout: 

357 self._timeout = self.io_loop.add_timeout( 

358 self.start_time + self.request.request_timeout, 

359 functools.partial(self._on_timeout, "during request"), 

360 ) 

361 if ( 

362 self.request.method not in self._SUPPORTED_METHODS 

363 and not self.request.allow_nonstandard_methods 

364 ): 

365 raise KeyError("unknown method %s" % self.request.method) 

366 for key in ( 

367 "proxy_host", 

368 "proxy_port", 

369 "proxy_username", 

370 "proxy_password", 

371 "proxy_auth_mode", 

372 ): 

373 if getattr(self.request, key, None): 

374 raise NotImplementedError("%s not supported" % key) 

375 if "Connection" not in self.request.headers: 

376 self.request.headers["Connection"] = "close" 

377 if "Host" not in self.request.headers: 

378 if "@" in self.parsed.netloc: 

379 self.request.headers["Host"] = self.parsed.netloc.rpartition("@")[ 

380 -1 

381 ] 

382 else: 

383 self.request.headers["Host"] = self.parsed.netloc 

384 username, password = None, None 

385 if self.parsed.username is not None: 

386 username, password = self.parsed.username, self.parsed.password 

387 elif self.request.auth_username is not None: 

388 username = self.request.auth_username 

389 password = self.request.auth_password or "" 

390 if username is not None: 

391 assert password is not None 

392 if self.request.auth_mode not in (None, "basic"): 

393 raise ValueError("unsupported auth_mode %s", self.request.auth_mode) 

394 self.request.headers["Authorization"] = "Basic " + _unicode( 

395 base64.b64encode( 

396 httputil.encode_username_password(username, password) 

397 ) 

398 ) 

399 if self.request.user_agent: 

400 self.request.headers["User-Agent"] = self.request.user_agent 

401 elif self.request.headers.get("User-Agent") is None: 

402 self.request.headers["User-Agent"] = f"Tornado/{version}" 

403 if not self.request.allow_nonstandard_methods: 

404 # Some HTTP methods nearly always have bodies while others 

405 # almost never do. Fail in this case unless the user has 

406 # opted out of sanity checks with allow_nonstandard_methods. 

407 body_expected = self.request.method in ("POST", "PATCH", "PUT") 

408 body_present = ( 

409 self.request.body is not None 

410 or self.request.body_producer is not None 

411 ) 

412 if (body_expected and not body_present) or ( 

413 body_present and not body_expected 

414 ): 

415 raise ValueError( 

416 "Body must %sbe None for method %s (unless " 

417 "allow_nonstandard_methods is true)" 

418 % ("not " if body_expected else "", self.request.method) 

419 ) 

420 if self.request.expect_100_continue: 

421 self.request.headers["Expect"] = "100-continue" 

422 if self.request.body is not None: 

423 # When body_producer is used the caller is responsible for 

424 # setting Content-Length (or else chunked encoding will be used). 

425 self.request.headers["Content-Length"] = str(len(self.request.body)) 

426 if ( 

427 self.request.method == "POST" 

428 and "Content-Type" not in self.request.headers 

429 ): 

430 self.request.headers["Content-Type"] = ( 

431 "application/x-www-form-urlencoded" 

432 ) 

433 if self.request.decompress_response: 

434 self.request.headers["Accept-Encoding"] = "gzip" 

435 req_path = (self.parsed.path or "/") + ( 

436 ("?" + self.parsed.query) if self.parsed.query else "" 

437 ) 

438 self.connection = self._create_connection(stream) 

439 start_line = httputil.RequestStartLine(self.request.method, req_path, "") 

440 self.connection.write_headers(start_line, self.request.headers) 

441 if self.request.expect_100_continue: 

442 await self.connection.read_response(self) 

443 else: 

444 await self._write_body(True) 

445 except Exception: 

446 if not self._handle_exception(*sys.exc_info()): 

447 raise 

448 

449 def _get_ssl_options( 

450 self, scheme: str 

451 ) -> Union[None, Dict[str, Any], ssl.SSLContext]: 

452 if scheme == "https": 

453 if self.request.ssl_options is not None: 

454 return self.request.ssl_options 

455 # If we are using the defaults, don't construct a 

456 # new SSLContext. 

457 if ( 

458 self.request.validate_cert 

459 and self.request.ca_certs is None 

460 and self.request.client_cert is None 

461 and self.request.client_key is None 

462 ): 

463 return _client_ssl_defaults 

464 ssl_ctx = ssl.create_default_context( 

465 ssl.Purpose.SERVER_AUTH, cafile=self.request.ca_certs 

466 ) 

467 if not self.request.validate_cert: 

468 ssl_ctx.check_hostname = False 

469 ssl_ctx.verify_mode = ssl.CERT_NONE 

470 if self.request.client_cert is not None: 

471 ssl_ctx.load_cert_chain( 

472 self.request.client_cert, self.request.client_key 

473 ) 

474 if hasattr(ssl, "OP_NO_COMPRESSION"): 

475 # See netutil.ssl_options_to_context 

476 ssl_ctx.options |= ssl.OP_NO_COMPRESSION 

477 return ssl_ctx 

478 return None 

479 

480 def _on_timeout(self, info: Optional[str] = None) -> None: 

481 """Timeout callback of _HTTPConnection instance. 

482 

483 Raise a `HTTPTimeoutError` when a timeout occurs. 

484 

485 :info string key: More detailed timeout information. 

486 """ 

487 self._timeout = None 

488 error_message = f"Timeout {info}" if info else "Timeout" 

489 if self.final_callback is not None: 

490 self._handle_exception( 

491 HTTPTimeoutError, HTTPTimeoutError(error_message), None 

492 ) 

493 

494 def _remove_timeout(self) -> None: 

495 if self._timeout is not None: 

496 self.io_loop.remove_timeout(self._timeout) 

497 self._timeout = None 

498 

499 def _create_connection(self, stream: IOStream) -> HTTP1Connection: 

500 stream.set_nodelay(True) 

501 connection = HTTP1Connection( 

502 stream, 

503 True, 

504 HTTP1ConnectionParameters( 

505 no_keep_alive=True, 

506 max_header_size=self.max_header_size, 

507 max_body_size=self.max_body_size, 

508 decompress=bool(self.request.decompress_response), 

509 ), 

510 self._sockaddr, 

511 ) 

512 return connection 

513 

514 async def _write_body(self, start_read: bool) -> None: 

515 if self.request.body is not None: 

516 self.connection.write(self.request.body) 

517 elif self.request.body_producer is not None: 

518 fut = self.request.body_producer(self.connection.write) 

519 if fut is not None: 

520 await fut 

521 self.connection.finish() 

522 if start_read: 

523 try: 

524 await self.connection.read_response(self) 

525 except StreamClosedError: 

526 if not self._handle_exception(*sys.exc_info()): 

527 raise 

528 

529 def _release(self) -> None: 

530 if self.release_callback is not None: 

531 release_callback = self.release_callback 

532 self.release_callback = None # type: ignore 

533 release_callback() 

534 

535 def _run_callback(self, response: HTTPResponse) -> None: 

536 self._release() 

537 if self.final_callback is not None: 

538 final_callback = self.final_callback 

539 self.final_callback = None # type: ignore 

540 self.io_loop.add_callback(final_callback, response) 

541 

542 def _handle_exception( 

543 self, 

544 typ: "Optional[Type[BaseException]]", 

545 value: Optional[BaseException], 

546 tb: Optional[TracebackType], 

547 ) -> bool: 

548 if self.final_callback is not None: 

549 self._remove_timeout() 

550 if isinstance(value, StreamClosedError): 

551 if value.real_error is None: 

552 value = HTTPStreamClosedError("Stream closed") 

553 else: 

554 value = value.real_error 

555 self._run_callback( 

556 HTTPResponse( 

557 self.request, 

558 599, 

559 error=value, 

560 request_time=self.io_loop.time() - self.start_time, 

561 start_time=self.start_wall_time, 

562 ) 

563 ) 

564 

565 if hasattr(self, "stream"): 

566 # TODO: this may cause a StreamClosedError to be raised 

567 # by the connection's Future. Should we cancel the 

568 # connection more gracefully? 

569 self.stream.close() 

570 return True 

571 else: 

572 # If our callback has already been called, we are probably 

573 # catching an exception that is not caused by us but rather 

574 # some child of our callback. Rather than drop it on the floor, 

575 # pass it along, unless it's just the stream being closed. 

576 return isinstance(value, StreamClosedError) 

577 

578 def on_connection_close(self) -> None: 

579 if self.final_callback is not None: 

580 message = "Connection closed" 

581 if self.stream.error: 

582 raise self.stream.error 

583 try: 

584 raise HTTPStreamClosedError(message) 

585 except HTTPStreamClosedError: 

586 self._handle_exception(*sys.exc_info()) 

587 

588 async def headers_received( 

589 self, 

590 first_line: Union[httputil.ResponseStartLine, httputil.RequestStartLine], 

591 headers: httputil.HTTPHeaders, 

592 ) -> None: 

593 assert isinstance(first_line, httputil.ResponseStartLine) 

594 if self.request.expect_100_continue and first_line.code == 100: 

595 await self._write_body(False) 

596 return 

597 self.code = first_line.code 

598 self.reason = first_line.reason 

599 self.headers = headers 

600 

601 if self._should_follow_redirect(): 

602 return 

603 

604 if self.request.header_callback is not None: 

605 # Reassemble the start line. 

606 self.request.header_callback("%s %s %s\r\n" % first_line) 

607 for k, v in self.headers.get_all(): 

608 self.request.header_callback(f"{k}: {v}\r\n") 

609 self.request.header_callback("\r\n") 

610 

611 def _should_follow_redirect(self) -> bool: 

612 if self.request.follow_redirects: 

613 assert self.request.max_redirects is not None 

614 return ( 

615 self.code in (301, 302, 303, 307, 308) 

616 and self.request.max_redirects > 0 

617 and self.headers is not None 

618 and self.headers.get("Location") is not None 

619 ) 

620 return False 

621 

622 def finish(self) -> None: 

623 assert self.code is not None 

624 data = b"".join(self.chunks) 

625 self._remove_timeout() 

626 original_request = getattr(self.request, "original_request", self.request) 

627 if self._should_follow_redirect(): 

628 assert isinstance(self.request, _RequestProxy) 

629 assert self.headers is not None 

630 new_request = copy.copy(self.request.request) 

631 new_request.url = urllib.parse.urljoin( 

632 self.request.url, self.headers["Location"] 

633 ) 

634 assert self.request.max_redirects is not None 

635 new_request.max_redirects = self.request.max_redirects - 1 

636 del new_request.headers["Host"] 

637 # https://tools.ietf.org/html/rfc7231#section-6.4 

638 # 

639 # The original HTTP spec said that after a 301 or 302 

640 # redirect, the request method should be preserved. 

641 # However, browsers implemented this by changing the 

642 # method to GET, and the behavior stuck. 303 redirects 

643 # always specified this POST-to-GET behavior, arguably 

644 # for *all* methods, but libcurl < 7.70 only does this 

645 # for POST, while libcurl >= 7.70 does it for other methods. 

646 if (self.code == 303 and self.request.method != "HEAD") or ( 

647 self.code in (301, 302) and self.request.method == "POST" 

648 ): 

649 new_request.method = "GET" 

650 new_request.body = None # type: ignore 

651 for h in [ 

652 "Content-Length", 

653 "Content-Type", 

654 "Content-Encoding", 

655 "Transfer-Encoding", 

656 ]: 

657 try: 

658 del self.request.headers[h] 

659 except KeyError: 

660 pass 

661 new_request.original_request = original_request # type: ignore 

662 final_callback = self.final_callback 

663 self.final_callback = None # type: ignore 

664 self._release() 

665 assert self.client is not None 

666 fut = self.client.fetch(new_request, raise_error=False) 

667 fut.add_done_callback(lambda f: final_callback(f.result())) 

668 self._on_end_request() 

669 return 

670 if self.request.streaming_callback: 

671 buffer = BytesIO() 

672 else: 

673 buffer = BytesIO(data) # TODO: don't require one big string? 

674 response = HTTPResponse( 

675 original_request, 

676 self.code, 

677 reason=getattr(self, "reason", None), 

678 headers=self.headers, 

679 request_time=self.io_loop.time() - self.start_time, 

680 start_time=self.start_wall_time, 

681 buffer=buffer, 

682 effective_url=self.request.url, 

683 ) 

684 self._run_callback(response) 

685 self._on_end_request() 

686 

687 def _on_end_request(self) -> None: 

688 self.stream.close() 

689 

690 def data_received(self, chunk: bytes) -> None: 

691 if self._should_follow_redirect(): 

692 # We're going to follow a redirect so just discard the body. 

693 return 

694 if self.request.streaming_callback is not None: 

695 self.request.streaming_callback(chunk) 

696 else: 

697 self.chunks.append(chunk) 

698 

699 

700if __name__ == "__main__": 

701 AsyncHTTPClient.configure(SimpleAsyncHTTPClient) 

702 main()