1from tornado.escape import _unicode
2from tornado import gen, version
3from tornado.httpclient import (
4 HTTPResponse,
5 HTTPError,
6 AsyncHTTPClient,
7 main,
8 _RequestProxy,
9 HTTPRequest,
10)
11from tornado import httputil
12from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
13from tornado.ioloop import IOLoop
14from tornado.iostream import StreamClosedError, IOStream
15from tornado.netutil import (
16 Resolver,
17 OverrideResolver,
18 _client_ssl_defaults,
19 is_valid_ip,
20)
21from tornado.log import gen_log
22from tornado.tcpclient import TCPClient
23
24import base64
25import collections
26import copy
27import functools
28import re
29import socket
30import ssl
31import sys
32import time
33from io import BytesIO
34import urllib.parse
35
36from typing import Dict, Any, Callable, Optional, Type, Union
37from types import TracebackType
38import typing
39
40if typing.TYPE_CHECKING:
41 from typing import Deque, Tuple, List # noqa: F401
42
43
44class HTTPTimeoutError(HTTPError):
45 """Error raised by SimpleAsyncHTTPClient on timeout.
46
47 For historical reasons, this is a subclass of `.HTTPClientError`
48 which simulates a response code of 599.
49
50 .. versionadded:: 5.1
51 """
52
53 def __init__(self, message: str) -> None:
54 super().__init__(599, message=message)
55
56 def __str__(self) -> str:
57 return self.message or "Timeout"
58
59
60class HTTPStreamClosedError(HTTPError):
61 """Error raised by SimpleAsyncHTTPClient when the underlying stream is closed.
62
63 When a more specific exception is available (such as `ConnectionResetError`),
64 it may be raised instead of this one.
65
66 For historical reasons, this is a subclass of `.HTTPClientError`
67 which simulates a response code of 599.
68
69 .. versionadded:: 5.1
70 """
71
72 def __init__(self, message: str) -> None:
73 super().__init__(599, message=message)
74
75 def __str__(self) -> str:
76 return self.message or "Stream closed"
77
78
79class SimpleAsyncHTTPClient(AsyncHTTPClient):
80 """Non-blocking HTTP client with no external dependencies.
81
82 This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
83 Some features found in the curl-based AsyncHTTPClient are not yet
84 supported. In particular, proxies are not supported, connections
85 are not reused, and callers cannot select the network interface to be
86 used.
87
88 This implementation supports the following arguments, which can be passed
89 to ``configure()`` to control the global singleton, or to the constructor
90 when ``force_instance=True``.
91
92 ``max_clients`` is the number of concurrent requests that can be
93 in progress; when this limit is reached additional requests will be
94 queued. Note that time spent waiting in this queue still counts
95 against the ``request_timeout``.
96
97 ``defaults`` is a dict of parameters that will be used as defaults on all
98 `.HTTPRequest` objects submitted to this client.
99
100 ``hostname_mapping`` is a dictionary mapping hostnames to IP addresses.
101 It can be used to make local DNS changes when modifying system-wide
102 settings like ``/etc/hosts`` is not possible or desirable (e.g. in
103 unittests). ``resolver`` is similar, but using the `.Resolver` interface
104 instead of a simple mapping.
105
106 ``max_buffer_size`` (default 100MB) is the number of bytes
107 that can be read into memory at once. ``max_body_size``
108 (defaults to ``max_buffer_size``) is the largest response body
109 that the client will accept. Without a
110 ``streaming_callback``, the smaller of these two limits
111 applies; with a ``streaming_callback`` only ``max_body_size``
112 does.
113
114 .. versionchanged:: 4.2
115 Added the ``max_body_size`` argument.
116 """
117
118 def initialize( # type: ignore
119 self,
120 max_clients: int = 10,
121 hostname_mapping: Optional[Dict[str, str]] = None,
122 max_buffer_size: int = 104857600,
123 resolver: Optional[Resolver] = None,
124 defaults: Optional[Dict[str, Any]] = None,
125 max_header_size: Optional[int] = None,
126 max_body_size: Optional[int] = None,
127 ) -> None:
128 super().initialize(defaults=defaults)
129 self.max_clients = max_clients
130 self.queue = (
131 collections.deque()
132 ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]]
133 self.active = (
134 {}
135 ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]]
136 self.waiting = (
137 {}
138 ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]]
139 self.max_buffer_size = max_buffer_size
140 self.max_header_size = max_header_size
141 self.max_body_size = max_body_size
142 # TCPClient could create a Resolver for us, but we have to do it
143 # ourselves to support hostname_mapping.
144 if resolver:
145 self.resolver = resolver
146 self.own_resolver = False
147 else:
148 self.resolver = Resolver()
149 self.own_resolver = True
150 if hostname_mapping is not None:
151 self.resolver = OverrideResolver(
152 resolver=self.resolver, mapping=hostname_mapping
153 )
154 self.tcp_client = TCPClient(resolver=self.resolver)
155
156 def close(self) -> None:
157 super().close()
158 if self.own_resolver:
159 self.resolver.close()
160 self.tcp_client.close()
161
162 def fetch_impl(
163 self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]
164 ) -> None:
165 key = object()
166 self.queue.append((key, request, callback))
167 assert request.connect_timeout is not None
168 assert request.request_timeout is not None
169 timeout_handle = None
170 if len(self.active) >= self.max_clients:
171 timeout = (
172 min(request.connect_timeout, request.request_timeout)
173 or request.connect_timeout
174 or request.request_timeout
175 ) # min but skip zero
176 if timeout:
177 timeout_handle = self.io_loop.add_timeout(
178 self.io_loop.time() + timeout,
179 functools.partial(self._on_timeout, key, "in request queue"),
180 )
181 self.waiting[key] = (request, callback, timeout_handle)
182 self._process_queue()
183 if self.queue:
184 gen_log.debug(
185 "max_clients limit reached, request queued. "
186 "%d active, %d queued requests." % (len(self.active), len(self.queue))
187 )
188
189 def _process_queue(self) -> None:
190 while self.queue and len(self.active) < self.max_clients:
191 key, request, callback = self.queue.popleft()
192 if key not in self.waiting:
193 continue
194 self._remove_timeout(key)
195 self.active[key] = (request, callback)
196 release_callback = functools.partial(self._release_fetch, key)
197 self._handle_request(request, release_callback, callback)
198
199 def _connection_class(self) -> type:
200 return _HTTPConnection
201
202 def _handle_request(
203 self,
204 request: HTTPRequest,
205 release_callback: Callable[[], None],
206 final_callback: Callable[[HTTPResponse], None],
207 ) -> None:
208 self._connection_class()(
209 self,
210 request,
211 release_callback,
212 final_callback,
213 self.max_buffer_size,
214 self.tcp_client,
215 self.max_header_size,
216 self.max_body_size,
217 )
218
219 def _release_fetch(self, key: object) -> None:
220 del self.active[key]
221 self._process_queue()
222
223 def _remove_timeout(self, key: object) -> None:
224 if key in self.waiting:
225 request, callback, timeout_handle = self.waiting[key]
226 if timeout_handle is not None:
227 self.io_loop.remove_timeout(timeout_handle)
228 del self.waiting[key]
229
230 def _on_timeout(self, key: object, info: Optional[str] = None) -> None:
231 """Timeout callback of request.
232
233 Construct a timeout HTTPResponse when a timeout occurs.
234
235 :arg object key: A simple object to mark the request.
236 :info string key: More detailed timeout information.
237 """
238 request, callback, timeout_handle = self.waiting[key]
239 self.queue.remove((key, request, callback))
240
241 error_message = f"Timeout {info}" if info else "Timeout"
242 timeout_response = HTTPResponse(
243 request,
244 599,
245 error=HTTPTimeoutError(error_message),
246 request_time=self.io_loop.time() - request.start_time,
247 )
248 self.io_loop.add_callback(callback, timeout_response)
249 del self.waiting[key]
250
251
252class _HTTPConnection(httputil.HTTPMessageDelegate):
253 _SUPPORTED_METHODS = {"GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"}
254
255 def __init__(
256 self,
257 client: Optional[SimpleAsyncHTTPClient],
258 request: HTTPRequest,
259 release_callback: Callable[[], None],
260 final_callback: Callable[[HTTPResponse], None],
261 max_buffer_size: int,
262 tcp_client: TCPClient,
263 max_header_size: int,
264 max_body_size: int,
265 ) -> None:
266 self.io_loop = IOLoop.current()
267 self.start_time = self.io_loop.time()
268 self.start_wall_time = time.time()
269 self.client = client
270 self.request = request
271 self.release_callback = release_callback
272 self.final_callback = final_callback
273 self.max_buffer_size = max_buffer_size
274 self.tcp_client = tcp_client
275 self.max_header_size = max_header_size
276 self.max_body_size = max_body_size
277 self.code = None # type: Optional[int]
278 self.headers = None # type: Optional[httputil.HTTPHeaders]
279 self.chunks = [] # type: List[bytes]
280 self._decompressor = None
281 # Timeout handle returned by IOLoop.add_timeout
282 self._timeout = None # type: object
283 self._sockaddr = None
284 IOLoop.current().add_future(
285 gen.convert_yielded(self.run()), lambda f: f.result()
286 )
287
288 async def run(self) -> None:
289 try:
290 self.parsed = urllib.parse.urlsplit(_unicode(self.request.url))
291 if self.parsed.scheme not in ("http", "https"):
292 raise ValueError("Unsupported url scheme: %s" % self.request.url)
293 # urlsplit results have hostname and port results, but they
294 # didn't support ipv6 literals until python 2.7.
295 netloc = self.parsed.netloc
296 if "@" in netloc:
297 userpass, _, netloc = netloc.rpartition("@")
298 host, port = httputil.split_host_and_port(netloc)
299 if port is None:
300 port = 443 if self.parsed.scheme == "https" else 80
301 if re.match(r"^\[.*\]$", host):
302 # raw ipv6 addresses in urls are enclosed in brackets
303 host = host[1:-1]
304 self.parsed_hostname = host # save final host for _on_connect
305
306 if self.request.allow_ipv6 is False:
307 af = socket.AF_INET
308 else:
309 af = socket.AF_UNSPEC
310
311 ssl_options = self._get_ssl_options(self.parsed.scheme)
312
313 source_ip = None
314 if self.request.network_interface:
315 if is_valid_ip(self.request.network_interface):
316 source_ip = self.request.network_interface
317 else:
318 raise ValueError(
319 "Unrecognized IPv4 or IPv6 address for network_interface, got %r"
320 % (self.request.network_interface,)
321 )
322
323 if self.request.connect_timeout and self.request.request_timeout:
324 timeout = min(
325 self.request.connect_timeout, self.request.request_timeout
326 )
327 elif self.request.connect_timeout:
328 timeout = self.request.connect_timeout
329 elif self.request.request_timeout:
330 timeout = self.request.request_timeout
331 else:
332 timeout = 0
333 if timeout:
334 self._timeout = self.io_loop.add_timeout(
335 self.start_time + timeout,
336 functools.partial(self._on_timeout, "while connecting"),
337 )
338 stream = await self.tcp_client.connect(
339 host,
340 port,
341 af=af,
342 ssl_options=ssl_options,
343 max_buffer_size=self.max_buffer_size,
344 source_ip=source_ip,
345 )
346
347 if self.final_callback is None:
348 # final_callback is cleared if we've hit our timeout.
349 stream.close()
350 return
351 self.stream = stream
352 self.stream.set_close_callback(self.on_connection_close)
353 self._remove_timeout()
354 if self.final_callback is None:
355 return
356 if self.request.request_timeout:
357 self._timeout = self.io_loop.add_timeout(
358 self.start_time + self.request.request_timeout,
359 functools.partial(self._on_timeout, "during request"),
360 )
361 if (
362 self.request.method not in self._SUPPORTED_METHODS
363 and not self.request.allow_nonstandard_methods
364 ):
365 raise KeyError("unknown method %s" % self.request.method)
366 for key in (
367 "proxy_host",
368 "proxy_port",
369 "proxy_username",
370 "proxy_password",
371 "proxy_auth_mode",
372 ):
373 if getattr(self.request, key, None):
374 raise NotImplementedError("%s not supported" % key)
375 if "Connection" not in self.request.headers:
376 self.request.headers["Connection"] = "close"
377 if "Host" not in self.request.headers:
378 if "@" in self.parsed.netloc:
379 self.request.headers["Host"] = self.parsed.netloc.rpartition("@")[
380 -1
381 ]
382 else:
383 self.request.headers["Host"] = self.parsed.netloc
384 username, password = None, None
385 if self.parsed.username is not None:
386 username, password = self.parsed.username, self.parsed.password
387 elif self.request.auth_username is not None:
388 username = self.request.auth_username
389 password = self.request.auth_password or ""
390 if username is not None:
391 assert password is not None
392 if self.request.auth_mode not in (None, "basic"):
393 raise ValueError("unsupported auth_mode %s", self.request.auth_mode)
394 self.request.headers["Authorization"] = "Basic " + _unicode(
395 base64.b64encode(
396 httputil.encode_username_password(username, password)
397 )
398 )
399 if self.request.user_agent:
400 self.request.headers["User-Agent"] = self.request.user_agent
401 elif self.request.headers.get("User-Agent") is None:
402 self.request.headers["User-Agent"] = f"Tornado/{version}"
403 if not self.request.allow_nonstandard_methods:
404 # Some HTTP methods nearly always have bodies while others
405 # almost never do. Fail in this case unless the user has
406 # opted out of sanity checks with allow_nonstandard_methods.
407 body_expected = self.request.method in ("POST", "PATCH", "PUT")
408 body_present = (
409 self.request.body is not None
410 or self.request.body_producer is not None
411 )
412 if (body_expected and not body_present) or (
413 body_present and not body_expected
414 ):
415 raise ValueError(
416 "Body must %sbe None for method %s (unless "
417 "allow_nonstandard_methods is true)"
418 % ("not " if body_expected else "", self.request.method)
419 )
420 if self.request.expect_100_continue:
421 self.request.headers["Expect"] = "100-continue"
422 if self.request.body is not None:
423 # When body_producer is used the caller is responsible for
424 # setting Content-Length (or else chunked encoding will be used).
425 self.request.headers["Content-Length"] = str(len(self.request.body))
426 if (
427 self.request.method == "POST"
428 and "Content-Type" not in self.request.headers
429 ):
430 self.request.headers["Content-Type"] = (
431 "application/x-www-form-urlencoded"
432 )
433 if self.request.decompress_response:
434 self.request.headers["Accept-Encoding"] = "gzip"
435 req_path = (self.parsed.path or "/") + (
436 ("?" + self.parsed.query) if self.parsed.query else ""
437 )
438 self.connection = self._create_connection(stream)
439 start_line = httputil.RequestStartLine(self.request.method, req_path, "")
440 self.connection.write_headers(start_line, self.request.headers)
441 if self.request.expect_100_continue:
442 await self.connection.read_response(self)
443 else:
444 await self._write_body(True)
445 except Exception:
446 if not self._handle_exception(*sys.exc_info()):
447 raise
448
449 def _get_ssl_options(
450 self, scheme: str
451 ) -> Union[None, Dict[str, Any], ssl.SSLContext]:
452 if scheme == "https":
453 if self.request.ssl_options is not None:
454 return self.request.ssl_options
455 # If we are using the defaults, don't construct a
456 # new SSLContext.
457 if (
458 self.request.validate_cert
459 and self.request.ca_certs is None
460 and self.request.client_cert is None
461 and self.request.client_key is None
462 ):
463 return _client_ssl_defaults
464 ssl_ctx = ssl.create_default_context(
465 ssl.Purpose.SERVER_AUTH, cafile=self.request.ca_certs
466 )
467 if not self.request.validate_cert:
468 ssl_ctx.check_hostname = False
469 ssl_ctx.verify_mode = ssl.CERT_NONE
470 if self.request.client_cert is not None:
471 ssl_ctx.load_cert_chain(
472 self.request.client_cert, self.request.client_key
473 )
474 if hasattr(ssl, "OP_NO_COMPRESSION"):
475 # See netutil.ssl_options_to_context
476 ssl_ctx.options |= ssl.OP_NO_COMPRESSION
477 return ssl_ctx
478 return None
479
480 def _on_timeout(self, info: Optional[str] = None) -> None:
481 """Timeout callback of _HTTPConnection instance.
482
483 Raise a `HTTPTimeoutError` when a timeout occurs.
484
485 :info string key: More detailed timeout information.
486 """
487 self._timeout = None
488 error_message = f"Timeout {info}" if info else "Timeout"
489 if self.final_callback is not None:
490 self._handle_exception(
491 HTTPTimeoutError, HTTPTimeoutError(error_message), None
492 )
493
494 def _remove_timeout(self) -> None:
495 if self._timeout is not None:
496 self.io_loop.remove_timeout(self._timeout)
497 self._timeout = None
498
499 def _create_connection(self, stream: IOStream) -> HTTP1Connection:
500 stream.set_nodelay(True)
501 connection = HTTP1Connection(
502 stream,
503 True,
504 HTTP1ConnectionParameters(
505 no_keep_alive=True,
506 max_header_size=self.max_header_size,
507 max_body_size=self.max_body_size,
508 decompress=bool(self.request.decompress_response),
509 ),
510 self._sockaddr,
511 )
512 return connection
513
514 async def _write_body(self, start_read: bool) -> None:
515 if self.request.body is not None:
516 self.connection.write(self.request.body)
517 elif self.request.body_producer is not None:
518 fut = self.request.body_producer(self.connection.write)
519 if fut is not None:
520 await fut
521 self.connection.finish()
522 if start_read:
523 try:
524 await self.connection.read_response(self)
525 except StreamClosedError:
526 if not self._handle_exception(*sys.exc_info()):
527 raise
528
529 def _release(self) -> None:
530 if self.release_callback is not None:
531 release_callback = self.release_callback
532 self.release_callback = None # type: ignore
533 release_callback()
534
535 def _run_callback(self, response: HTTPResponse) -> None:
536 self._release()
537 if self.final_callback is not None:
538 final_callback = self.final_callback
539 self.final_callback = None # type: ignore
540 self.io_loop.add_callback(final_callback, response)
541
542 def _handle_exception(
543 self,
544 typ: "Optional[Type[BaseException]]",
545 value: Optional[BaseException],
546 tb: Optional[TracebackType],
547 ) -> bool:
548 if self.final_callback is not None:
549 self._remove_timeout()
550 if isinstance(value, StreamClosedError):
551 if value.real_error is None:
552 value = HTTPStreamClosedError("Stream closed")
553 else:
554 value = value.real_error
555 self._run_callback(
556 HTTPResponse(
557 self.request,
558 599,
559 error=value,
560 request_time=self.io_loop.time() - self.start_time,
561 start_time=self.start_wall_time,
562 )
563 )
564
565 if hasattr(self, "stream"):
566 # TODO: this may cause a StreamClosedError to be raised
567 # by the connection's Future. Should we cancel the
568 # connection more gracefully?
569 self.stream.close()
570 return True
571 else:
572 # If our callback has already been called, we are probably
573 # catching an exception that is not caused by us but rather
574 # some child of our callback. Rather than drop it on the floor,
575 # pass it along, unless it's just the stream being closed.
576 return isinstance(value, StreamClosedError)
577
578 def on_connection_close(self) -> None:
579 if self.final_callback is not None:
580 message = "Connection closed"
581 if self.stream.error:
582 raise self.stream.error
583 try:
584 raise HTTPStreamClosedError(message)
585 except HTTPStreamClosedError:
586 self._handle_exception(*sys.exc_info())
587
588 async def headers_received(
589 self,
590 first_line: Union[httputil.ResponseStartLine, httputil.RequestStartLine],
591 headers: httputil.HTTPHeaders,
592 ) -> None:
593 assert isinstance(first_line, httputil.ResponseStartLine)
594 if self.request.expect_100_continue and first_line.code == 100:
595 await self._write_body(False)
596 return
597 self.code = first_line.code
598 self.reason = first_line.reason
599 self.headers = headers
600
601 if self._should_follow_redirect():
602 return
603
604 if self.request.header_callback is not None:
605 # Reassemble the start line.
606 self.request.header_callback("%s %s %s\r\n" % first_line)
607 for k, v in self.headers.get_all():
608 self.request.header_callback(f"{k}: {v}\r\n")
609 self.request.header_callback("\r\n")
610
611 def _should_follow_redirect(self) -> bool:
612 if self.request.follow_redirects:
613 assert self.request.max_redirects is not None
614 return (
615 self.code in (301, 302, 303, 307, 308)
616 and self.request.max_redirects > 0
617 and self.headers is not None
618 and self.headers.get("Location") is not None
619 )
620 return False
621
622 def finish(self) -> None:
623 assert self.code is not None
624 data = b"".join(self.chunks)
625 self._remove_timeout()
626 original_request = getattr(self.request, "original_request", self.request)
627 if self._should_follow_redirect():
628 assert isinstance(self.request, _RequestProxy)
629 assert self.headers is not None
630 new_request = copy.copy(self.request.request)
631 new_request.url = urllib.parse.urljoin(
632 self.request.url, self.headers["Location"]
633 )
634 assert self.request.max_redirects is not None
635 new_request.max_redirects = self.request.max_redirects - 1
636 del new_request.headers["Host"]
637 # https://tools.ietf.org/html/rfc7231#section-6.4
638 #
639 # The original HTTP spec said that after a 301 or 302
640 # redirect, the request method should be preserved.
641 # However, browsers implemented this by changing the
642 # method to GET, and the behavior stuck. 303 redirects
643 # always specified this POST-to-GET behavior, arguably
644 # for *all* methods, but libcurl < 7.70 only does this
645 # for POST, while libcurl >= 7.70 does it for other methods.
646 if (self.code == 303 and self.request.method != "HEAD") or (
647 self.code in (301, 302) and self.request.method == "POST"
648 ):
649 new_request.method = "GET"
650 new_request.body = None # type: ignore
651 for h in [
652 "Content-Length",
653 "Content-Type",
654 "Content-Encoding",
655 "Transfer-Encoding",
656 ]:
657 try:
658 del self.request.headers[h]
659 except KeyError:
660 pass
661 new_request.original_request = original_request # type: ignore
662 final_callback = self.final_callback
663 self.final_callback = None # type: ignore
664 self._release()
665 assert self.client is not None
666 fut = self.client.fetch(new_request, raise_error=False)
667 fut.add_done_callback(lambda f: final_callback(f.result()))
668 self._on_end_request()
669 return
670 if self.request.streaming_callback:
671 buffer = BytesIO()
672 else:
673 buffer = BytesIO(data) # TODO: don't require one big string?
674 response = HTTPResponse(
675 original_request,
676 self.code,
677 reason=getattr(self, "reason", None),
678 headers=self.headers,
679 request_time=self.io_loop.time() - self.start_time,
680 start_time=self.start_wall_time,
681 buffer=buffer,
682 effective_url=self.request.url,
683 )
684 self._run_callback(response)
685 self._on_end_request()
686
687 def _on_end_request(self) -> None:
688 self.stream.close()
689
690 def data_received(self, chunk: bytes) -> None:
691 if self._should_follow_redirect():
692 # We're going to follow a redirect so just discard the body.
693 return
694 if self.request.streaming_callback is not None:
695 self.request.streaming_callback(chunk)
696 else:
697 self.chunks.append(chunk)
698
699
700if __name__ == "__main__":
701 AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
702 main()