1from tornado.escape import _unicode
2from tornado import gen, version
3from tornado.httpclient import (
4 HTTPResponse,
5 HTTPError,
6 AsyncHTTPClient,
7 main,
8 _RequestProxy,
9 HTTPRequest,
10)
11from tornado import httputil
12from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
13from tornado.ioloop import IOLoop
14from tornado.iostream import StreamClosedError, IOStream
15from tornado.netutil import (
16 Resolver,
17 OverrideResolver,
18 _client_ssl_defaults,
19 is_valid_ip,
20)
21from tornado.log import gen_log
22from tornado.tcpclient import TCPClient
23
24import base64
25import collections
26import copy
27import functools
28import re
29import socket
30import ssl
31import sys
32import time
33from io import BytesIO
34import urllib.parse
35
36from typing import Dict, Any, Callable, Optional, Type, Union
37from types import TracebackType
38import typing
39
40if typing.TYPE_CHECKING:
41 from typing import Deque, Tuple, List # noqa: F401
42
43
44class HTTPTimeoutError(HTTPError):
45 """Error raised by SimpleAsyncHTTPClient on timeout.
46
47 For historical reasons, this is a subclass of `.HTTPClientError`
48 which simulates a response code of 599.
49
50 .. versionadded:: 5.1
51 """
52
53 def __init__(self, message: str) -> None:
54 super().__init__(599, message=message)
55
56 def __str__(self) -> str:
57 return self.message or "Timeout"
58
59
60class HTTPStreamClosedError(HTTPError):
61 """Error raised by SimpleAsyncHTTPClient when the underlying stream is closed.
62
63 When a more specific exception is available (such as `ConnectionResetError`),
64 it may be raised instead of this one.
65
66 For historical reasons, this is a subclass of `.HTTPClientError`
67 which simulates a response code of 599.
68
69 .. versionadded:: 5.1
70 """
71
72 def __init__(self, message: str) -> None:
73 super().__init__(599, message=message)
74
75 def __str__(self) -> str:
76 return self.message or "Stream closed"
77
78
79class SimpleAsyncHTTPClient(AsyncHTTPClient):
80 """Non-blocking HTTP client with no external dependencies.
81
82 This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
83 Some features found in the curl-based AsyncHTTPClient are not yet
84 supported. In particular, proxies are not supported, connections
85 are not reused, and callers cannot select the network interface to be
86 used.
87
88 This implementation supports the following arguments, which can be passed
89 to ``configure()`` to control the global singleton, or to the constructor
90 when ``force_instance=True``.
91
92 ``max_clients`` is the number of concurrent requests that can be
93 in progress; when this limit is reached additional requests will be
94 queued. Note that time spent waiting in this queue still counts
95 against the ``request_timeout``.
96
97 ``defaults`` is a dict of parameters that will be used as defaults on all
98 `.HTTPRequest` objects submitted to this client.
99
100 ``hostname_mapping`` is a dictionary mapping hostnames to IP addresses.
101 It can be used to make local DNS changes when modifying system-wide
102 settings like ``/etc/hosts`` is not possible or desirable (e.g. in
103 unittests). ``resolver`` is similar, but using the `.Resolver` interface
104 instead of a simple mapping.
105
106 ``max_buffer_size`` (default 100MB) is the number of bytes
107 that can be read into memory at once. ``max_body_size``
108 (defaults to ``max_buffer_size``) is the largest response body
109 that the client will accept. Without a
110 ``streaming_callback``, the smaller of these two limits
111 applies; with a ``streaming_callback`` only ``max_body_size``
112 does.
113
114 .. versionchanged:: 4.2
115 Added the ``max_body_size`` argument.
116 """
117
118 def initialize( # type: ignore
119 self,
120 max_clients: int = 10,
121 hostname_mapping: Optional[Dict[str, str]] = None,
122 max_buffer_size: int = 104857600,
123 resolver: Optional[Resolver] = None,
124 defaults: Optional[Dict[str, Any]] = None,
125 max_header_size: Optional[int] = None,
126 max_body_size: Optional[int] = None,
127 ) -> None:
128 super().initialize(defaults=defaults)
129 self.max_clients = max_clients
130 self.queue = (
131 collections.deque()
132 ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]]
133 self.active = (
134 {}
135 ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]]
136 self.waiting = (
137 {}
138 ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]]
139 self.max_buffer_size = max_buffer_size
140 self.max_header_size = max_header_size
141 self.max_body_size = max_body_size
142 # TCPClient could create a Resolver for us, but we have to do it
143 # ourselves to support hostname_mapping.
144 if resolver:
145 self.resolver = resolver
146 self.own_resolver = False
147 else:
148 self.resolver = Resolver()
149 self.own_resolver = True
150 if hostname_mapping is not None:
151 self.resolver = OverrideResolver(
152 resolver=self.resolver, mapping=hostname_mapping
153 )
154 self.tcp_client = TCPClient(resolver=self.resolver)
155
156 def close(self) -> None:
157 super().close()
158 if self.own_resolver:
159 self.resolver.close()
160 self.tcp_client.close()
161
162 def fetch_impl(
163 self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]
164 ) -> None:
165 key = object()
166 self.queue.append((key, request, callback))
167 assert request.connect_timeout is not None
168 assert request.request_timeout is not None
169 timeout_handle = None
170 if len(self.active) >= self.max_clients:
171 timeout = (
172 min(request.connect_timeout, request.request_timeout)
173 or request.connect_timeout
174 or request.request_timeout
175 ) # min but skip zero
176 if timeout:
177 timeout_handle = self.io_loop.add_timeout(
178 self.io_loop.time() + timeout,
179 functools.partial(self._on_timeout, key, "in request queue"),
180 )
181 self.waiting[key] = (request, callback, timeout_handle)
182 self._process_queue()
183 if self.queue:
184 gen_log.debug(
185 "max_clients limit reached, request queued. "
186 "%d active, %d queued requests." % (len(self.active), len(self.queue))
187 )
188
189 def _process_queue(self) -> None:
190 while self.queue and len(self.active) < self.max_clients:
191 key, request, callback = self.queue.popleft()
192 if key not in self.waiting:
193 continue
194 self._remove_timeout(key)
195 self.active[key] = (request, callback)
196 release_callback = functools.partial(self._release_fetch, key)
197 self._handle_request(request, release_callback, callback)
198
199 def _connection_class(self) -> type:
200 return _HTTPConnection
201
202 def _handle_request(
203 self,
204 request: HTTPRequest,
205 release_callback: Callable[[], None],
206 final_callback: Callable[[HTTPResponse], None],
207 ) -> None:
208 self._connection_class()(
209 self,
210 request,
211 release_callback,
212 final_callback,
213 self.max_buffer_size,
214 self.tcp_client,
215 self.max_header_size,
216 self.max_body_size,
217 )
218
219 def _release_fetch(self, key: object) -> None:
220 del self.active[key]
221 self._process_queue()
222
223 def _remove_timeout(self, key: object) -> None:
224 if key in self.waiting:
225 request, callback, timeout_handle = self.waiting[key]
226 if timeout_handle is not None:
227 self.io_loop.remove_timeout(timeout_handle)
228 del self.waiting[key]
229
230 def _on_timeout(self, key: object, info: Optional[str] = None) -> None:
231 """Timeout callback of request.
232
233 Construct a timeout HTTPResponse when a timeout occurs.
234
235 :arg object key: A simple object to mark the request.
236 :info string key: More detailed timeout information.
237 """
238 request, callback, timeout_handle = self.waiting[key]
239 self.queue.remove((key, request, callback))
240
241 error_message = "Timeout {0}".format(info) if info else "Timeout"
242 timeout_response = HTTPResponse(
243 request,
244 599,
245 error=HTTPTimeoutError(error_message),
246 request_time=self.io_loop.time() - request.start_time,
247 )
248 self.io_loop.add_callback(callback, timeout_response)
249 del self.waiting[key]
250
251
252class _HTTPConnection(httputil.HTTPMessageDelegate):
253 _SUPPORTED_METHODS = set(
254 ["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
255 )
256
257 def __init__(
258 self,
259 client: Optional[SimpleAsyncHTTPClient],
260 request: HTTPRequest,
261 release_callback: Callable[[], None],
262 final_callback: Callable[[HTTPResponse], None],
263 max_buffer_size: int,
264 tcp_client: TCPClient,
265 max_header_size: int,
266 max_body_size: int,
267 ) -> None:
268 self.io_loop = IOLoop.current()
269 self.start_time = self.io_loop.time()
270 self.start_wall_time = time.time()
271 self.client = client
272 self.request = request
273 self.release_callback = release_callback
274 self.final_callback = final_callback
275 self.max_buffer_size = max_buffer_size
276 self.tcp_client = tcp_client
277 self.max_header_size = max_header_size
278 self.max_body_size = max_body_size
279 self.code = None # type: Optional[int]
280 self.headers = None # type: Optional[httputil.HTTPHeaders]
281 self.chunks = [] # type: List[bytes]
282 self._decompressor = None
283 # Timeout handle returned by IOLoop.add_timeout
284 self._timeout = None # type: object
285 self._sockaddr = None
286 IOLoop.current().add_future(
287 gen.convert_yielded(self.run()), lambda f: f.result()
288 )
289
290 async def run(self) -> None:
291 try:
292 self.parsed = urllib.parse.urlsplit(_unicode(self.request.url))
293 if self.parsed.scheme not in ("http", "https"):
294 raise ValueError("Unsupported url scheme: %s" % self.request.url)
295 # urlsplit results have hostname and port results, but they
296 # didn't support ipv6 literals until python 2.7.
297 netloc = self.parsed.netloc
298 if "@" in netloc:
299 userpass, _, netloc = netloc.rpartition("@")
300 host, port = httputil.split_host_and_port(netloc)
301 if port is None:
302 port = 443 if self.parsed.scheme == "https" else 80
303 if re.match(r"^\[.*\]$", host):
304 # raw ipv6 addresses in urls are enclosed in brackets
305 host = host[1:-1]
306 self.parsed_hostname = host # save final host for _on_connect
307
308 if self.request.allow_ipv6 is False:
309 af = socket.AF_INET
310 else:
311 af = socket.AF_UNSPEC
312
313 ssl_options = self._get_ssl_options(self.parsed.scheme)
314
315 source_ip = None
316 if self.request.network_interface:
317 if is_valid_ip(self.request.network_interface):
318 source_ip = self.request.network_interface
319 else:
320 raise ValueError(
321 "Unrecognized IPv4 or IPv6 address for network_interface, got %r"
322 % (self.request.network_interface,)
323 )
324
325 if self.request.connect_timeout and self.request.request_timeout:
326 timeout = min(
327 self.request.connect_timeout, self.request.request_timeout
328 )
329 elif self.request.connect_timeout:
330 timeout = self.request.connect_timeout
331 elif self.request.request_timeout:
332 timeout = self.request.request_timeout
333 else:
334 timeout = 0
335 if timeout:
336 self._timeout = self.io_loop.add_timeout(
337 self.start_time + timeout,
338 functools.partial(self._on_timeout, "while connecting"),
339 )
340 stream = await self.tcp_client.connect(
341 host,
342 port,
343 af=af,
344 ssl_options=ssl_options,
345 max_buffer_size=self.max_buffer_size,
346 source_ip=source_ip,
347 )
348
349 if self.final_callback is None:
350 # final_callback is cleared if we've hit our timeout.
351 stream.close()
352 return
353 self.stream = stream
354 self.stream.set_close_callback(self.on_connection_close)
355 self._remove_timeout()
356 if self.final_callback is None:
357 return
358 if self.request.request_timeout:
359 self._timeout = self.io_loop.add_timeout(
360 self.start_time + self.request.request_timeout,
361 functools.partial(self._on_timeout, "during request"),
362 )
363 if (
364 self.request.method not in self._SUPPORTED_METHODS
365 and not self.request.allow_nonstandard_methods
366 ):
367 raise KeyError("unknown method %s" % self.request.method)
368 for key in (
369 "proxy_host",
370 "proxy_port",
371 "proxy_username",
372 "proxy_password",
373 "proxy_auth_mode",
374 ):
375 if getattr(self.request, key, None):
376 raise NotImplementedError("%s not supported" % key)
377 if "Connection" not in self.request.headers:
378 self.request.headers["Connection"] = "close"
379 if "Host" not in self.request.headers:
380 if "@" in self.parsed.netloc:
381 self.request.headers["Host"] = self.parsed.netloc.rpartition("@")[
382 -1
383 ]
384 else:
385 self.request.headers["Host"] = self.parsed.netloc
386 username, password = None, None
387 if self.parsed.username is not None:
388 username, password = self.parsed.username, self.parsed.password
389 elif self.request.auth_username is not None:
390 username = self.request.auth_username
391 password = self.request.auth_password or ""
392 if username is not None:
393 assert password is not None
394 if self.request.auth_mode not in (None, "basic"):
395 raise ValueError("unsupported auth_mode %s", self.request.auth_mode)
396 self.request.headers["Authorization"] = "Basic " + _unicode(
397 base64.b64encode(
398 httputil.encode_username_password(username, password)
399 )
400 )
401 if self.request.user_agent:
402 self.request.headers["User-Agent"] = self.request.user_agent
403 elif self.request.headers.get("User-Agent") is None:
404 self.request.headers["User-Agent"] = "Tornado/{}".format(version)
405 if not self.request.allow_nonstandard_methods:
406 # Some HTTP methods nearly always have bodies while others
407 # almost never do. Fail in this case unless the user has
408 # opted out of sanity checks with allow_nonstandard_methods.
409 body_expected = self.request.method in ("POST", "PATCH", "PUT")
410 body_present = (
411 self.request.body is not None
412 or self.request.body_producer is not None
413 )
414 if (body_expected and not body_present) or (
415 body_present and not body_expected
416 ):
417 raise ValueError(
418 "Body must %sbe None for method %s (unless "
419 "allow_nonstandard_methods is true)"
420 % ("not " if body_expected else "", self.request.method)
421 )
422 if self.request.expect_100_continue:
423 self.request.headers["Expect"] = "100-continue"
424 if self.request.body is not None:
425 # When body_producer is used the caller is responsible for
426 # setting Content-Length (or else chunked encoding will be used).
427 self.request.headers["Content-Length"] = str(len(self.request.body))
428 if (
429 self.request.method == "POST"
430 and "Content-Type" not in self.request.headers
431 ):
432 self.request.headers["Content-Type"] = (
433 "application/x-www-form-urlencoded"
434 )
435 if self.request.decompress_response:
436 self.request.headers["Accept-Encoding"] = "gzip"
437 req_path = (self.parsed.path or "/") + (
438 ("?" + self.parsed.query) if self.parsed.query else ""
439 )
440 self.connection = self._create_connection(stream)
441 start_line = httputil.RequestStartLine(self.request.method, req_path, "")
442 self.connection.write_headers(start_line, self.request.headers)
443 if self.request.expect_100_continue:
444 await self.connection.read_response(self)
445 else:
446 await self._write_body(True)
447 except Exception:
448 if not self._handle_exception(*sys.exc_info()):
449 raise
450
451 def _get_ssl_options(
452 self, scheme: str
453 ) -> Union[None, Dict[str, Any], ssl.SSLContext]:
454 if scheme == "https":
455 if self.request.ssl_options is not None:
456 return self.request.ssl_options
457 # If we are using the defaults, don't construct a
458 # new SSLContext.
459 if (
460 self.request.validate_cert
461 and self.request.ca_certs is None
462 and self.request.client_cert is None
463 and self.request.client_key is None
464 ):
465 return _client_ssl_defaults
466 ssl_ctx = ssl.create_default_context(
467 ssl.Purpose.SERVER_AUTH, cafile=self.request.ca_certs
468 )
469 if not self.request.validate_cert:
470 ssl_ctx.check_hostname = False
471 ssl_ctx.verify_mode = ssl.CERT_NONE
472 if self.request.client_cert is not None:
473 ssl_ctx.load_cert_chain(
474 self.request.client_cert, self.request.client_key
475 )
476 if hasattr(ssl, "OP_NO_COMPRESSION"):
477 # See netutil.ssl_options_to_context
478 ssl_ctx.options |= ssl.OP_NO_COMPRESSION
479 return ssl_ctx
480 return None
481
482 def _on_timeout(self, info: Optional[str] = None) -> None:
483 """Timeout callback of _HTTPConnection instance.
484
485 Raise a `HTTPTimeoutError` when a timeout occurs.
486
487 :info string key: More detailed timeout information.
488 """
489 self._timeout = None
490 error_message = "Timeout {0}".format(info) if info else "Timeout"
491 if self.final_callback is not None:
492 self._handle_exception(
493 HTTPTimeoutError, HTTPTimeoutError(error_message), None
494 )
495
496 def _remove_timeout(self) -> None:
497 if self._timeout is not None:
498 self.io_loop.remove_timeout(self._timeout)
499 self._timeout = None
500
501 def _create_connection(self, stream: IOStream) -> HTTP1Connection:
502 stream.set_nodelay(True)
503 connection = HTTP1Connection(
504 stream,
505 True,
506 HTTP1ConnectionParameters(
507 no_keep_alive=True,
508 max_header_size=self.max_header_size,
509 max_body_size=self.max_body_size,
510 decompress=bool(self.request.decompress_response),
511 ),
512 self._sockaddr,
513 )
514 return connection
515
516 async def _write_body(self, start_read: bool) -> None:
517 if self.request.body is not None:
518 self.connection.write(self.request.body)
519 elif self.request.body_producer is not None:
520 fut = self.request.body_producer(self.connection.write)
521 if fut is not None:
522 await fut
523 self.connection.finish()
524 if start_read:
525 try:
526 await self.connection.read_response(self)
527 except StreamClosedError:
528 if not self._handle_exception(*sys.exc_info()):
529 raise
530
531 def _release(self) -> None:
532 if self.release_callback is not None:
533 release_callback = self.release_callback
534 self.release_callback = None # type: ignore
535 release_callback()
536
537 def _run_callback(self, response: HTTPResponse) -> None:
538 self._release()
539 if self.final_callback is not None:
540 final_callback = self.final_callback
541 self.final_callback = None # type: ignore
542 self.io_loop.add_callback(final_callback, response)
543
544 def _handle_exception(
545 self,
546 typ: "Optional[Type[BaseException]]",
547 value: Optional[BaseException],
548 tb: Optional[TracebackType],
549 ) -> bool:
550 if self.final_callback is not None:
551 self._remove_timeout()
552 if isinstance(value, StreamClosedError):
553 if value.real_error is None:
554 value = HTTPStreamClosedError("Stream closed")
555 else:
556 value = value.real_error
557 self._run_callback(
558 HTTPResponse(
559 self.request,
560 599,
561 error=value,
562 request_time=self.io_loop.time() - self.start_time,
563 start_time=self.start_wall_time,
564 )
565 )
566
567 if hasattr(self, "stream"):
568 # TODO: this may cause a StreamClosedError to be raised
569 # by the connection's Future. Should we cancel the
570 # connection more gracefully?
571 self.stream.close()
572 return True
573 else:
574 # If our callback has already been called, we are probably
575 # catching an exception that is not caused by us but rather
576 # some child of our callback. Rather than drop it on the floor,
577 # pass it along, unless it's just the stream being closed.
578 return isinstance(value, StreamClosedError)
579
580 def on_connection_close(self) -> None:
581 if self.final_callback is not None:
582 message = "Connection closed"
583 if self.stream.error:
584 raise self.stream.error
585 try:
586 raise HTTPStreamClosedError(message)
587 except HTTPStreamClosedError:
588 self._handle_exception(*sys.exc_info())
589
590 async def headers_received(
591 self,
592 first_line: Union[httputil.ResponseStartLine, httputil.RequestStartLine],
593 headers: httputil.HTTPHeaders,
594 ) -> None:
595 assert isinstance(first_line, httputil.ResponseStartLine)
596 if self.request.expect_100_continue and first_line.code == 100:
597 await self._write_body(False)
598 return
599 self.code = first_line.code
600 self.reason = first_line.reason
601 self.headers = headers
602
603 if self._should_follow_redirect():
604 return
605
606 if self.request.header_callback is not None:
607 # Reassemble the start line.
608 self.request.header_callback("%s %s %s\r\n" % first_line)
609 for k, v in self.headers.get_all():
610 self.request.header_callback("%s: %s\r\n" % (k, v))
611 self.request.header_callback("\r\n")
612
613 def _should_follow_redirect(self) -> bool:
614 if self.request.follow_redirects:
615 assert self.request.max_redirects is not None
616 return (
617 self.code in (301, 302, 303, 307, 308)
618 and self.request.max_redirects > 0
619 and self.headers is not None
620 and self.headers.get("Location") is not None
621 )
622 return False
623
624 def finish(self) -> None:
625 assert self.code is not None
626 data = b"".join(self.chunks)
627 self._remove_timeout()
628 original_request = getattr(self.request, "original_request", self.request)
629 if self._should_follow_redirect():
630 assert isinstance(self.request, _RequestProxy)
631 assert self.headers is not None
632 new_request = copy.copy(self.request.request)
633 new_request.url = urllib.parse.urljoin(
634 self.request.url, self.headers["Location"]
635 )
636 assert self.request.max_redirects is not None
637 new_request.max_redirects = self.request.max_redirects - 1
638 del new_request.headers["Host"]
639 # https://tools.ietf.org/html/rfc7231#section-6.4
640 #
641 # The original HTTP spec said that after a 301 or 302
642 # redirect, the request method should be preserved.
643 # However, browsers implemented this by changing the
644 # method to GET, and the behavior stuck. 303 redirects
645 # always specified this POST-to-GET behavior, arguably
646 # for *all* methods, but libcurl < 7.70 only does this
647 # for POST, while libcurl >= 7.70 does it for other methods.
648 if (self.code == 303 and self.request.method != "HEAD") or (
649 self.code in (301, 302) and self.request.method == "POST"
650 ):
651 new_request.method = "GET"
652 new_request.body = None # type: ignore
653 for h in [
654 "Content-Length",
655 "Content-Type",
656 "Content-Encoding",
657 "Transfer-Encoding",
658 ]:
659 try:
660 del self.request.headers[h]
661 except KeyError:
662 pass
663 new_request.original_request = original_request # type: ignore
664 final_callback = self.final_callback
665 self.final_callback = None # type: ignore
666 self._release()
667 assert self.client is not None
668 fut = self.client.fetch(new_request, raise_error=False)
669 fut.add_done_callback(lambda f: final_callback(f.result()))
670 self._on_end_request()
671 return
672 if self.request.streaming_callback:
673 buffer = BytesIO()
674 else:
675 buffer = BytesIO(data) # TODO: don't require one big string?
676 response = HTTPResponse(
677 original_request,
678 self.code,
679 reason=getattr(self, "reason", None),
680 headers=self.headers,
681 request_time=self.io_loop.time() - self.start_time,
682 start_time=self.start_wall_time,
683 buffer=buffer,
684 effective_url=self.request.url,
685 )
686 self._run_callback(response)
687 self._on_end_request()
688
689 def _on_end_request(self) -> None:
690 self.stream.close()
691
692 def data_received(self, chunk: bytes) -> None:
693 if self._should_follow_redirect():
694 # We're going to follow a redirect so just discard the body.
695 return
696 if self.request.streaming_callback is not None:
697 self.request.streaming_callback(chunk)
698 else:
699 self.chunks.append(chunk)
700
701
702if __name__ == "__main__":
703 AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
704 main()