1import asyncio
2import io
3import logging
4import re
5import weakref
6from copy import copy
7from urllib.parse import urlparse
8
9import aiohttp
10import yarl
11
12from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper
13from fsspec.callbacks import DEFAULT_CALLBACK
14from fsspec.exceptions import FSTimeoutError
15from fsspec.spec import AbstractBufferedFile
16from fsspec.utils import (
17 DEFAULT_BLOCK_SIZE,
18 glob_translate,
19 isfilelike,
20 nullcontext,
21 tokenize,
22)
23
24from ..caching import AllBytes
25
26# https://stackoverflow.com/a/15926317/3821154
27ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""")
28ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""")
29logger = logging.getLogger("fsspec.http")
30
31
32async def get_client(**kwargs):
33 return aiohttp.ClientSession(**kwargs)
34
35
36class HTTPFileSystem(AsyncFileSystem):
37 """
38 Simple File-System for fetching data via HTTP(S)
39
40 ``ls()`` is implemented by loading the parent page and doing a regex
41 match on the result. If simple_link=True, anything of the form
42 "http(s)://server.com/stuff?thing=other"; otherwise only links within
43 HTML href tags will be used.
44 """
45
46 protocol = ("http", "https")
47 sep = "/"
48
49 def __init__(
50 self,
51 simple_links=True,
52 block_size=None,
53 same_scheme=True,
54 size_policy=None,
55 cache_type="bytes",
56 cache_options=None,
57 asynchronous=False,
58 loop=None,
59 client_kwargs=None,
60 get_client=get_client,
61 encoded=False,
62 **storage_options,
63 ):
64 """
65 NB: if this is called async, you must await set_client
66
67 Parameters
68 ----------
69 block_size: int
70 Blocks to read bytes; if 0, will default to raw requests file-like
71 objects instead of HTTPFile instances
72 simple_links: bool
73 If True, will consider both HTML <a> tags and anything that looks
74 like a URL; if False, will consider only the former.
75 same_scheme: True
76 When doing ls/glob, if this is True, only consider paths that have
77 http/https matching the input URLs.
78 size_policy: this argument is deprecated
79 client_kwargs: dict
80 Passed to aiohttp.ClientSession, see
81 https://docs.aiohttp.org/en/stable/client_reference.html
82 For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}``
83 get_client: Callable[..., aiohttp.ClientSession]
84 A callable, which takes keyword arguments and constructs
85 an aiohttp.ClientSession. Its state will be managed by
86 the HTTPFileSystem class.
87 storage_options: key-value
88 Any other parameters passed on to requests
89 cache_type, cache_options: defaults used in open()
90 """
91 super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
92 self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
93 self.simple_links = simple_links
94 self.same_schema = same_scheme
95 self.cache_type = cache_type
96 self.cache_options = cache_options
97 self.client_kwargs = client_kwargs or {}
98 self.get_client = get_client
99 self.encoded = encoded
100 self.kwargs = storage_options
101 self._session = None
102
103 # Clean caching-related parameters from `storage_options`
104 # before propagating them as `request_options` through `self.kwargs`.
105 # TODO: Maybe rename `self.kwargs` to `self.request_options` to make
106 # it clearer.
107 request_options = copy(storage_options)
108 self.use_listings_cache = request_options.pop("use_listings_cache", False)
109 request_options.pop("listings_expiry_time", None)
110 request_options.pop("max_paths", None)
111 request_options.pop("skip_instance_cache", None)
112 self.kwargs = request_options
113
114 @property
115 def fsid(self):
116 return "http"
117
118 def encode_url(self, url):
119 return yarl.URL(url, encoded=self.encoded)
120
121 @staticmethod
122 def close_session(loop, session):
123 if loop is not None and loop.is_running():
124 try:
125 sync(loop, session.close, timeout=0.1)
126 return
127 except (TimeoutError, FSTimeoutError, NotImplementedError):
128 pass
129 connector = getattr(session, "_connector", None)
130 if connector is not None:
131 # close after loop is dead
132 connector._close()
133
134 async def set_session(self):
135 if self._session is None:
136 self._session = await self.get_client(loop=self.loop, **self.client_kwargs)
137 if not self.asynchronous:
138 weakref.finalize(self, self.close_session, self.loop, self._session)
139 return self._session
140
141 @classmethod
142 def _strip_protocol(cls, path):
143 """For HTTP, we always want to keep the full URL"""
144 return path
145
146 @classmethod
147 def _parent(cls, path):
148 # override, since _strip_protocol is different for URLs
149 par = super()._parent(path)
150 if len(par) > 7: # "http://..."
151 return par
152 return ""
153
154 async def _ls_real(self, url, detail=True, **kwargs):
155 # ignoring URL-encoded arguments
156 kw = self.kwargs.copy()
157 kw.update(kwargs)
158 logger.debug(url)
159 session = await self.set_session()
160 async with session.get(self.encode_url(url), **self.kwargs) as r:
161 self._raise_not_found_for_status(r, url)
162
163 if "Content-Type" in r.headers:
164 mimetype = r.headers["Content-Type"].partition(";")[0]
165 else:
166 mimetype = None
167
168 if mimetype in ("text/html", None):
169 try:
170 text = await r.text(errors="ignore")
171 if self.simple_links:
172 links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
173 else:
174 links = [u[2] for u in ex.findall(text)]
175 except UnicodeDecodeError:
176 links = [] # binary, not HTML
177 else:
178 links = []
179
180 out = set()
181 parts = urlparse(url)
182 for l in links:
183 if isinstance(l, tuple):
184 l = l[1]
185 if l.startswith("/") and len(l) > 1:
186 # absolute URL on this server
187 l = f"{parts.scheme}://{parts.netloc}{l}"
188 if l.startswith("http"):
189 if self.same_schema and l.startswith(url.rstrip("/") + "/"):
190 out.add(l)
191 elif l.replace("https", "http").startswith(
192 url.replace("https", "http").rstrip("/") + "/"
193 ):
194 # allowed to cross http <-> https
195 out.add(l)
196 else:
197 if l not in ["..", "../"]:
198 # Ignore FTP-like "parent"
199 out.add("/".join([url.rstrip("/"), l.lstrip("/")]))
200 if not out and url.endswith("/"):
201 out = await self._ls_real(url.rstrip("/"), detail=False)
202 if detail:
203 return [
204 {
205 "name": u,
206 "size": None,
207 "type": "directory" if u.endswith("/") else "file",
208 }
209 for u in out
210 ]
211 else:
212 return sorted(out)
213
214 async def _ls(self, url, detail=True, **kwargs):
215 if self.use_listings_cache and url in self.dircache:
216 out = self.dircache[url]
217 else:
218 out = await self._ls_real(url, detail=detail, **kwargs)
219 self.dircache[url] = out
220 return out
221
222 ls = sync_wrapper(_ls)
223
224 def _raise_not_found_for_status(self, response, url):
225 """
226 Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
227 """
228 if response.status == 404:
229 raise FileNotFoundError(url)
230 response.raise_for_status()
231
232 async def _cat_file(self, url, start=None, end=None, **kwargs):
233 kw = self.kwargs.copy()
234 kw.update(kwargs)
235 logger.debug(url)
236
237 if start is not None or end is not None:
238 if start == end:
239 return b""
240 headers = kw.pop("headers", {}).copy()
241
242 headers["Range"] = await self._process_limits(url, start, end)
243 kw["headers"] = headers
244 session = await self.set_session()
245 async with session.get(self.encode_url(url), **kw) as r:
246 out = await r.read()
247 self._raise_not_found_for_status(r, url)
248 return out
249
250 async def _get_file(
251 self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs
252 ):
253 kw = self.kwargs.copy()
254 kw.update(kwargs)
255 logger.debug(rpath)
256 session = await self.set_session()
257 async with session.get(self.encode_url(rpath), **kw) as r:
258 try:
259 size = int(r.headers["content-length"])
260 except (ValueError, KeyError):
261 size = None
262
263 callback.set_size(size)
264 self._raise_not_found_for_status(r, rpath)
265 if isfilelike(lpath):
266 outfile = lpath
267 else:
268 outfile = open(lpath, "wb") # noqa: ASYNC230
269
270 try:
271 chunk = True
272 while chunk:
273 chunk = await r.content.read(chunk_size)
274 outfile.write(chunk)
275 callback.relative_update(len(chunk))
276 finally:
277 if not isfilelike(lpath):
278 outfile.close()
279
280 async def _put_file(
281 self,
282 lpath,
283 rpath,
284 chunk_size=5 * 2**20,
285 callback=DEFAULT_CALLBACK,
286 method="post",
287 mode="overwrite",
288 **kwargs,
289 ):
290 if mode != "overwrite":
291 raise NotImplementedError("Exclusive write")
292
293 async def gen_chunks():
294 # Support passing arbitrary file-like objects
295 # and use them instead of streams.
296 if isinstance(lpath, io.IOBase):
297 context = nullcontext(lpath)
298 use_seek = False # might not support seeking
299 else:
300 context = open(lpath, "rb") # noqa: ASYNC230
301 use_seek = True
302
303 with context as f:
304 if use_seek:
305 callback.set_size(f.seek(0, 2))
306 f.seek(0)
307 else:
308 callback.set_size(getattr(f, "size", None))
309
310 chunk = f.read(chunk_size)
311 while chunk:
312 yield chunk
313 callback.relative_update(len(chunk))
314 chunk = f.read(chunk_size)
315
316 kw = self.kwargs.copy()
317 kw.update(kwargs)
318 session = await self.set_session()
319
320 method = method.lower()
321 if method not in ("post", "put"):
322 raise ValueError(
323 f"method has to be either 'post' or 'put', not: {method!r}"
324 )
325
326 meth = getattr(session, method)
327 async with meth(self.encode_url(rpath), data=gen_chunks(), **kw) as resp:
328 self._raise_not_found_for_status(resp, rpath)
329
330 async def _exists(self, path, **kwargs):
331 kw = self.kwargs.copy()
332 kw.update(kwargs)
333 try:
334 logger.debug(path)
335 session = await self.set_session()
336 r = await session.get(self.encode_url(path), **kw)
337 async with r:
338 return r.status < 400
339 except aiohttp.ClientError:
340 return False
341
342 async def _isfile(self, path, **kwargs):
343 return await self._exists(path, **kwargs)
344
345 def _open(
346 self,
347 path,
348 mode="rb",
349 block_size=None,
350 autocommit=None, # XXX: This differs from the base class.
351 cache_type=None,
352 cache_options=None,
353 size=None,
354 **kwargs,
355 ):
356 """Make a file-like object
357
358 Parameters
359 ----------
360 path: str
361 Full URL with protocol
362 mode: string
363 must be "rb"
364 block_size: int or None
365 Bytes to download in one request; use instance value if None. If
366 zero, will return a streaming Requests file-like instance.
367 kwargs: key-value
368 Any other parameters, passed to requests calls
369 """
370 if mode != "rb":
371 raise NotImplementedError
372 block_size = block_size if block_size is not None else self.block_size
373 kw = self.kwargs.copy()
374 kw["asynchronous"] = self.asynchronous
375 kw.update(kwargs)
376 info = {}
377 size = size or info.update(self.info(path, **kwargs)) or info["size"]
378 session = sync(self.loop, self.set_session)
379 if block_size and size and info.get("partial", True):
380 return HTTPFile(
381 self,
382 path,
383 session=session,
384 block_size=block_size,
385 mode=mode,
386 size=size,
387 cache_type=cache_type or self.cache_type,
388 cache_options=cache_options or self.cache_options,
389 loop=self.loop,
390 **kw,
391 )
392 else:
393 return HTTPStreamFile(
394 self,
395 path,
396 mode=mode,
397 loop=self.loop,
398 session=session,
399 **kw,
400 )
401
402 async def open_async(self, path, mode="rb", size=None, **kwargs):
403 session = await self.set_session()
404 if size is None:
405 try:
406 size = (await self._info(path, **kwargs))["size"]
407 except FileNotFoundError:
408 pass
409 return AsyncStreamFile(
410 self,
411 path,
412 loop=self.loop,
413 session=session,
414 size=size,
415 **kwargs,
416 )
417
418 def ukey(self, url):
419 """Unique identifier; assume HTTP files are static, unchanging"""
420 return tokenize(url, self.kwargs, self.protocol)
421
422 async def _info(self, url, **kwargs):
423 """Get info of URL
424
425 Tries to access location via HEAD, and then GET methods, but does
426 not fetch the data.
427
428 It is possible that the server does not supply any size information, in
429 which case size will be given as None (and certain operations on the
430 corresponding file will not work).
431 """
432 info = {}
433 session = await self.set_session()
434
435 for policy in ["head", "get"]:
436 try:
437 info.update(
438 await _file_info(
439 self.encode_url(url),
440 size_policy=policy,
441 session=session,
442 **self.kwargs,
443 **kwargs,
444 )
445 )
446 if info.get("size") is not None:
447 break
448 except Exception as exc:
449 if policy == "get":
450 # If get failed, then raise a FileNotFoundError
451 raise FileNotFoundError(url) from exc
452 logger.debug("", exc_info=exc)
453
454 return {"name": url, "size": None, **info, "type": "file"}
455
456 async def _glob(self, path, maxdepth=None, **kwargs):
457 """
458 Find files by glob-matching.
459
460 This implementation is idntical to the one in AbstractFileSystem,
461 but "?" is not considered as a character for globbing, because it is
462 so common in URLs, often identifying the "query" part.
463 """
464 if maxdepth is not None and maxdepth < 1:
465 raise ValueError("maxdepth must be at least 1")
466 import re
467
468 ends_with_slash = path.endswith("/") # _strip_protocol strips trailing slash
469 path = self._strip_protocol(path)
470 append_slash_to_dirname = ends_with_slash or path.endswith(("/**", "/*"))
471 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
472 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
473
474 min_idx = min(idx_star, idx_brace)
475
476 detail = kwargs.pop("detail", False)
477
478 if not has_magic(path):
479 if await self._exists(path, **kwargs):
480 if not detail:
481 return [path]
482 else:
483 return {path: await self._info(path, **kwargs)}
484 else:
485 if not detail:
486 return [] # glob of non-existent returns empty
487 else:
488 return {}
489 elif "/" in path[:min_idx]:
490 min_idx = path[:min_idx].rindex("/")
491 root = path[: min_idx + 1]
492 depth = path[min_idx + 1 :].count("/") + 1
493 else:
494 root = ""
495 depth = path[min_idx + 1 :].count("/") + 1
496
497 if "**" in path:
498 if maxdepth is not None:
499 idx_double_stars = path.find("**")
500 depth_double_stars = path[idx_double_stars:].count("/") + 1
501 depth = depth - depth_double_stars + maxdepth
502 else:
503 depth = None
504
505 allpaths = await self._find(
506 root, maxdepth=depth, withdirs=True, detail=True, **kwargs
507 )
508
509 pattern = glob_translate(path + ("/" if ends_with_slash else ""))
510 pattern = re.compile(pattern)
511
512 out = {
513 (
514 p.rstrip("/")
515 if not append_slash_to_dirname
516 and info["type"] == "directory"
517 and p.endswith("/")
518 else p
519 ): info
520 for p, info in sorted(allpaths.items())
521 if pattern.match(p.rstrip("/"))
522 }
523
524 if detail:
525 return out
526 else:
527 return list(out)
528
529 async def _isdir(self, path):
530 # override, since all URLs are (also) files
531 try:
532 return bool(await self._ls(path))
533 except (FileNotFoundError, ValueError):
534 return False
535
536 async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
537 """
538 Write bytes to a remote file over HTTP.
539
540 Parameters
541 ----------
542 path : str
543 Target URL where the data should be written
544 value : bytes
545 Data to be written
546 mode : str
547 How to write to the file - 'overwrite' or 'append'
548 **kwargs : dict
549 Additional parameters to pass to the HTTP request
550 """
551 url = self._strip_protocol(path)
552 headers = kwargs.pop("headers", {})
553 headers["Content-Length"] = str(len(value))
554
555 session = await self.set_session()
556
557 async with session.put(url, data=value, headers=headers, **kwargs) as r:
558 r.raise_for_status()
559
560
561class HTTPFile(AbstractBufferedFile):
562 """
563 A file-like object pointing to a remote HTTP(S) resource
564
565 Supports only reading, with read-ahead of a predetermined block-size.
566
567 In the case that the server does not supply the filesize, only reading of
568 the complete file in one go is supported.
569
570 Parameters
571 ----------
572 url: str
573 Full URL of the remote resource, including the protocol
574 session: aiohttp.ClientSession or None
575 All calls will be made within this session, to avoid restarting
576 connections where the server allows this
577 block_size: int or None
578 The amount of read-ahead to do, in bytes. Default is 5MB, or the value
579 configured for the FileSystem creating this file
580 size: None or int
581 If given, this is the size of the file in bytes, and we don't attempt
582 to call the server to find the value.
583 kwargs: all other key-values are passed to requests calls.
584 """
585
586 def __init__(
587 self,
588 fs,
589 url,
590 session=None,
591 block_size=None,
592 mode="rb",
593 cache_type="bytes",
594 cache_options=None,
595 size=None,
596 loop=None,
597 asynchronous=False,
598 **kwargs,
599 ):
600 if mode != "rb":
601 raise NotImplementedError("File mode not supported")
602 self.asynchronous = asynchronous
603 self.loop = loop
604 self.url = url
605 self.session = session
606 self.details = {"name": url, "size": size, "type": "file"}
607 super().__init__(
608 fs=fs,
609 path=url,
610 mode=mode,
611 block_size=block_size,
612 cache_type=cache_type,
613 cache_options=cache_options,
614 **kwargs,
615 )
616
617 def read(self, length=-1):
618 """Read bytes from file
619
620 Parameters
621 ----------
622 length: int
623 Read up to this many bytes. If negative, read all content to end of
624 file. If the server has not supplied the filesize, attempting to
625 read only part of the data will raise a ValueError.
626 """
627 if (
628 (length < 0 and self.loc == 0) # explicit read all
629 # but not when the size is known and fits into a block anyways
630 and not (self.size is not None and self.size <= self.blocksize)
631 ):
632 self._fetch_all()
633 if self.size is None:
634 if length < 0:
635 self._fetch_all()
636 else:
637 length = min(self.size - self.loc, length)
638 return super().read(length)
639
640 async def async_fetch_all(self):
641 """Read whole file in one shot, without caching
642
643 This is only called when position is still at zero,
644 and read() is called without a byte-count.
645 """
646 logger.debug(f"Fetch all for {self}")
647 if not isinstance(self.cache, AllBytes):
648 r = await self.session.get(self.fs.encode_url(self.url), **self.kwargs)
649 async with r:
650 r.raise_for_status()
651 out = await r.read()
652 self.cache = AllBytes(
653 size=len(out), fetcher=None, blocksize=None, data=out
654 )
655 self.size = len(out)
656
657 _fetch_all = sync_wrapper(async_fetch_all)
658
659 def _parse_content_range(self, headers):
660 """Parse the Content-Range header"""
661 s = headers.get("Content-Range", "")
662 m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s)
663 if not m:
664 return None, None, None
665
666 if m[1] == "*":
667 start = end = None
668 else:
669 start, end = [int(x) for x in m[1].split("-")]
670 total = None if m[2] == "*" else int(m[2])
671 return start, end, total
672
673 async def async_fetch_range(self, start, end):
674 """Download a block of data
675
676 The expectation is that the server returns only the requested bytes,
677 with HTTP code 206. If this is not the case, we first check the headers,
678 and then stream the output - if the data size is bigger than we
679 requested, an exception is raised.
680 """
681 logger.debug(f"Fetch range for {self}: {start}-{end}")
682 kwargs = self.kwargs.copy()
683 headers = kwargs.pop("headers", {}).copy()
684 headers["Range"] = f"bytes={start}-{end - 1}"
685 logger.debug(f"{self.url} : {headers['Range']}")
686 r = await self.session.get(
687 self.fs.encode_url(self.url), headers=headers, **kwargs
688 )
689 async with r:
690 if r.status == 416:
691 # range request outside file
692 return b""
693 r.raise_for_status()
694
695 # If the server has handled the range request, it should reply
696 # with status 206 (partial content). But we'll guess that a suitable
697 # Content-Range header or a Content-Length no more than the
698 # requested range also mean we have got the desired range.
699 response_is_range = (
700 r.status == 206
701 or self._parse_content_range(r.headers)[0] == start
702 or int(r.headers.get("Content-Length", end + 1)) <= end - start
703 )
704
705 if response_is_range:
706 # partial content, as expected
707 out = await r.read()
708 elif start > 0:
709 raise ValueError(
710 "The HTTP server doesn't appear to support range requests. "
711 "Only reading this file from the beginning is supported. "
712 "Open with block_size=0 for a streaming file interface."
713 )
714 else:
715 # Response is not a range, but we want the start of the file,
716 # so we can read the required amount anyway.
717 cl = 0
718 out = []
719 while True:
720 chunk = await r.content.read(2**20)
721 # data size unknown, let's read until we have enough
722 if chunk:
723 out.append(chunk)
724 cl += len(chunk)
725 if cl > end - start:
726 break
727 else:
728 break
729 out = b"".join(out)[: end - start]
730 return out
731
732 _fetch_range = sync_wrapper(async_fetch_range)
733
734
735magic_check = re.compile("([*[])")
736
737
738def has_magic(s):
739 match = magic_check.search(s)
740 return match is not None
741
742
743class HTTPStreamFile(AbstractBufferedFile):
744 def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs):
745 self.asynchronous = kwargs.pop("asynchronous", False)
746 self.url = url
747 self.loop = loop
748 self.session = session
749 if mode != "rb":
750 raise ValueError
751 self.details = {"name": url, "size": None}
752 super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs)
753
754 async def cor():
755 r = await self.session.get(self.fs.encode_url(url), **kwargs).__aenter__()
756 self.fs._raise_not_found_for_status(r, url)
757 return r
758
759 self.r = sync(self.loop, cor)
760 self.loop = fs.loop
761
762 def seek(self, loc, whence=0):
763 if loc == 0 and whence == 1:
764 return
765 if loc == self.loc and whence == 0:
766 return
767 raise ValueError("Cannot seek streaming HTTP file")
768
769 async def _read(self, num=-1):
770 out = await self.r.content.read(num)
771 self.loc += len(out)
772 return out
773
774 read = sync_wrapper(_read)
775
776 async def _close(self):
777 self.r.close()
778
779 def close(self):
780 asyncio.run_coroutine_threadsafe(self._close(), self.loop)
781 super().close()
782
783
784class AsyncStreamFile(AbstractAsyncStreamedFile):
785 def __init__(
786 self, fs, url, mode="rb", loop=None, session=None, size=None, **kwargs
787 ):
788 self.url = url
789 self.session = session
790 self.r = None
791 if mode != "rb":
792 raise ValueError
793 self.details = {"name": url, "size": None}
794 self.kwargs = kwargs
795 super().__init__(fs=fs, path=url, mode=mode, cache_type="none")
796 self.size = size
797
798 async def read(self, num=-1):
799 if self.r is None:
800 r = await self.session.get(
801 self.fs.encode_url(self.url), **self.kwargs
802 ).__aenter__()
803 self.fs._raise_not_found_for_status(r, self.url)
804 self.r = r
805 out = await self.r.content.read(num)
806 self.loc += len(out)
807 return out
808
809 async def close(self):
810 if self.r is not None:
811 self.r.close()
812 self.r = None
813 await super().close()
814
815
816async def get_range(session, url, start, end, file=None, **kwargs):
817 # explicit get a range when we know it must be safe
818 kwargs = kwargs.copy()
819 headers = kwargs.pop("headers", {}).copy()
820 headers["Range"] = f"bytes={start}-{end - 1}"
821 r = await session.get(url, headers=headers, **kwargs)
822 r.raise_for_status()
823 async with r:
824 out = await r.read()
825 if file:
826 with open(file, "r+b") as f: # noqa: ASYNC230
827 f.seek(start)
828 f.write(out)
829 else:
830 return out
831
832
833async def _file_info(url, session, size_policy="head", **kwargs):
834 """Call HEAD on the server to get details about the file (size/checksum etc.)
835
836 Default operation is to explicitly allow redirects and use encoding
837 'identity' (no compression) to get the true size of the target.
838 """
839 logger.debug("Retrieve file size for %s", url)
840 kwargs = kwargs.copy()
841 ar = kwargs.pop("allow_redirects", True)
842 head = kwargs.get("headers", {}).copy()
843 head["Accept-Encoding"] = "identity"
844 kwargs["headers"] = head
845
846 info = {}
847 if size_policy == "head":
848 r = await session.head(url, allow_redirects=ar, **kwargs)
849 elif size_policy == "get":
850 r = await session.get(url, allow_redirects=ar, **kwargs)
851 else:
852 raise TypeError(f'size_policy must be "head" or "get", got {size_policy}')
853 async with r:
854 r.raise_for_status()
855
856 if "Content-Length" in r.headers:
857 # Some servers may choose to ignore Accept-Encoding and return
858 # compressed content, in which case the returned size is unreliable.
859 if "Content-Encoding" not in r.headers or r.headers["Content-Encoding"] in [
860 "identity",
861 "",
862 ]:
863 info["size"] = int(r.headers["Content-Length"])
864 elif "Content-Range" in r.headers:
865 info["size"] = int(r.headers["Content-Range"].split("/")[1])
866
867 if "Content-Type" in r.headers:
868 info["mimetype"] = r.headers["Content-Type"].partition(";")[0]
869
870 if r.headers.get("Accept-Ranges") == "none":
871 # Some servers may explicitly discourage partial content requests, but
872 # the lack of "Accept-Ranges" does not always indicate they would fail
873 info["partial"] = False
874
875 info["url"] = str(r.url)
876
877 for checksum_field in ["ETag", "Content-MD5", "Digest", "Last-Modified"]:
878 if r.headers.get(checksum_field):
879 info[checksum_field] = r.headers[checksum_field]
880
881 return info
882
883
884async def _file_size(url, session=None, *args, **kwargs):
885 if session is None:
886 session = await get_client()
887 info = await _file_info(url, session=session, *args, **kwargs)
888 return info.get("size")
889
890
891file_size = sync_wrapper(_file_size)