1import asyncio
2import io
3import logging
4import re
5import weakref
6from copy import copy
7from urllib.parse import urlparse
8
9import aiohttp
10import yarl
11
12from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper
13from fsspec.callbacks import DEFAULT_CALLBACK
14from fsspec.exceptions import FSTimeoutError
15from fsspec.spec import AbstractBufferedFile
16from fsspec.utils import (
17 DEFAULT_BLOCK_SIZE,
18 glob_translate,
19 isfilelike,
20 nullcontext,
21 tokenize,
22)
23
24from ..caching import AllBytes
25
26# https://stackoverflow.com/a/15926317/3821154
27ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""")
28ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""")
29logger = logging.getLogger("fsspec.http")
30
31
32async def get_client(**kwargs):
33 return aiohttp.ClientSession(**kwargs)
34
35
36class HTTPFileSystem(AsyncFileSystem):
37 """
38 Simple File-System for fetching data via HTTP(S)
39
40 ``ls()`` is implemented by loading the parent page and doing a regex
41 match on the result. If simple_link=True, anything of the form
42 "http(s)://server.com/stuff?thing=other"; otherwise only links within
43 HTML href tags will be used.
44 """
45
46 protocol = ("http", "https")
47 sep = "/"
48
49 def __init__(
50 self,
51 simple_links=True,
52 block_size=None,
53 same_scheme=True,
54 size_policy=None,
55 cache_type="bytes",
56 cache_options=None,
57 asynchronous=False,
58 loop=None,
59 client_kwargs=None,
60 get_client=get_client,
61 encoded=False,
62 **storage_options,
63 ):
64 """
65 NB: if this is called async, you must await set_client
66
67 Parameters
68 ----------
69 block_size: int
70 Blocks to read bytes; if 0, will default to raw requests file-like
71 objects instead of HTTPFile instances
72 simple_links: bool
73 If True, will consider both HTML <a> tags and anything that looks
74 like a URL; if False, will consider only the former.
75 same_scheme: True
76 When doing ls/glob, if this is True, only consider paths that have
77 http/https matching the input URLs.
78 size_policy: this argument is deprecated
79 client_kwargs: dict
80 Passed to aiohttp.ClientSession, see
81 https://docs.aiohttp.org/en/stable/client_reference.html
82 For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}``
83 get_client: Callable[..., aiohttp.ClientSession]
84 A callable, which takes keyword arguments and constructs
85 an aiohttp.ClientSession. Its state will be managed by
86 the HTTPFileSystem class.
87 storage_options: key-value
88 Any other parameters passed on to requests
89 cache_type, cache_options: defaults used in open()
90 """
91 super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
92 self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
93 self.simple_links = simple_links
94 self.same_schema = same_scheme
95 self.cache_type = cache_type
96 self.cache_options = cache_options
97 self.client_kwargs = client_kwargs or {}
98 self.get_client = get_client
99 self.encoded = encoded
100 self.kwargs = storage_options
101 self._session = None
102
103 # Clean caching-related parameters from `storage_options`
104 # before propagating them as `request_options` through `self.kwargs`.
105 # TODO: Maybe rename `self.kwargs` to `self.request_options` to make
106 # it clearer.
107 request_options = copy(storage_options)
108 self.use_listings_cache = request_options.pop("use_listings_cache", False)
109 request_options.pop("listings_expiry_time", None)
110 request_options.pop("max_paths", None)
111 request_options.pop("skip_instance_cache", None)
112 self.kwargs = request_options
113
114 @property
115 def fsid(self):
116 return "http"
117
118 def encode_url(self, url):
119 return yarl.URL(url, encoded=self.encoded)
120
121 @staticmethod
122 def close_session(loop, session):
123 if loop is not None and loop.is_running():
124 try:
125 sync(loop, session.close, timeout=0.1)
126 return
127 except (TimeoutError, FSTimeoutError, NotImplementedError):
128 pass
129 connector = getattr(session, "_connector", None)
130 if connector is not None:
131 # close after loop is dead
132 connector._close()
133
134 async def set_session(self):
135 if self._session is None:
136 self._session = await self.get_client(loop=self.loop, **self.client_kwargs)
137 if not self.asynchronous:
138 weakref.finalize(self, self.close_session, self.loop, self._session)
139 return self._session
140
141 @classmethod
142 def _strip_protocol(cls, path):
143 """For HTTP, we always want to keep the full URL"""
144 return path
145
146 @classmethod
147 def _parent(cls, path):
148 # override, since _strip_protocol is different for URLs
149 par = super()._parent(path)
150 if len(par) > 7: # "http://..."
151 return par
152 return ""
153
154 async def _ls_real(self, url, detail=True, **kwargs):
155 # ignoring URL-encoded arguments
156 kw = self.kwargs.copy()
157 kw.update(kwargs)
158 logger.debug(url)
159 session = await self.set_session()
160 async with session.get(self.encode_url(url), **self.kwargs) as r:
161 self._raise_not_found_for_status(r, url)
162
163 if "Content-Type" in r.headers:
164 mimetype = r.headers["Content-Type"].partition(";")[0]
165 else:
166 mimetype = None
167
168 if mimetype in ("text/html", None):
169 try:
170 text = await r.text(errors="ignore")
171 if self.simple_links:
172 links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
173 else:
174 links = [u[2] for u in ex.findall(text)]
175 except UnicodeDecodeError:
176 links = [] # binary, not HTML
177 else:
178 links = []
179
180 out = set()
181 parts = urlparse(url)
182 for l in links:
183 if isinstance(l, tuple):
184 l = l[1]
185 if l.startswith("/") and len(l) > 1:
186 # absolute URL on this server
187 l = f"{parts.scheme}://{parts.netloc}{l}"
188 if l.startswith("http"):
189 if self.same_schema and l.startswith(url.rstrip("/") + "/"):
190 out.add(l)
191 elif l.replace("https", "http").startswith(
192 url.replace("https", "http").rstrip("/") + "/"
193 ):
194 # allowed to cross http <-> https
195 out.add(l)
196 else:
197 if l not in ["..", "../"]:
198 # Ignore FTP-like "parent"
199 out.add("/".join([url.rstrip("/"), l.lstrip("/")]))
200 if not out and url.endswith("/"):
201 out = await self._ls_real(url.rstrip("/"), detail=False)
202 if detail:
203 return [
204 {
205 "name": u,
206 "size": None,
207 "type": "directory" if u.endswith("/") else "file",
208 }
209 for u in out
210 ]
211 else:
212 return sorted(out)
213
214 async def _ls(self, url, detail=True, **kwargs):
215 if self.use_listings_cache and url in self.dircache:
216 out = self.dircache[url]
217 else:
218 out = await self._ls_real(url, detail=detail, **kwargs)
219 self.dircache[url] = out
220 return out
221
222 ls = sync_wrapper(_ls)
223
224 def _raise_not_found_for_status(self, response, url):
225 """
226 Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
227 """
228 if response.status == 404:
229 raise FileNotFoundError(url)
230 response.raise_for_status()
231
232 async def _cat_file(self, url, start=None, end=None, **kwargs):
233 kw = self.kwargs.copy()
234 kw.update(kwargs)
235 logger.debug(url)
236
237 if start is not None or end is not None:
238 if start == end:
239 return b""
240 headers = kw.pop("headers", {}).copy()
241
242 headers["Range"] = await self._process_limits(url, start, end)
243 kw["headers"] = headers
244 session = await self.set_session()
245 async with session.get(self.encode_url(url), **kw) as r:
246 out = await r.read()
247 self._raise_not_found_for_status(r, url)
248 return out
249
250 async def _get_file(
251 self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs
252 ):
253 kw = self.kwargs.copy()
254 kw.update(kwargs)
255 logger.debug(rpath)
256 session = await self.set_session()
257 async with session.get(self.encode_url(rpath), **kw) as r:
258 try:
259 size = int(r.headers["content-length"])
260 except (ValueError, KeyError):
261 size = None
262
263 callback.set_size(size)
264 self._raise_not_found_for_status(r, rpath)
265 if isfilelike(lpath):
266 outfile = lpath
267 else:
268 outfile = open(lpath, "wb") # noqa: ASYNC230
269
270 try:
271 chunk = True
272 while chunk:
273 chunk = await r.content.read(chunk_size)
274 outfile.write(chunk)
275 callback.relative_update(len(chunk))
276 finally:
277 if not isfilelike(lpath):
278 outfile.close()
279
280 async def _put_file(
281 self,
282 lpath,
283 rpath,
284 chunk_size=5 * 2**20,
285 callback=DEFAULT_CALLBACK,
286 method="post",
287 mode="overwrite",
288 **kwargs,
289 ):
290 if mode != "overwrite":
291 raise NotImplementedError("Exclusive write")
292
293 async def gen_chunks():
294 # Support passing arbitrary file-like objects
295 # and use them instead of streams.
296 if isinstance(lpath, io.IOBase):
297 context = nullcontext(lpath)
298 use_seek = False # might not support seeking
299 else:
300 context = open(lpath, "rb") # noqa: ASYNC230
301 use_seek = True
302
303 with context as f:
304 if use_seek:
305 callback.set_size(f.seek(0, 2))
306 f.seek(0)
307 else:
308 callback.set_size(getattr(f, "size", None))
309
310 chunk = f.read(chunk_size)
311 while chunk:
312 yield chunk
313 callback.relative_update(len(chunk))
314 chunk = f.read(chunk_size)
315
316 kw = self.kwargs.copy()
317 kw.update(kwargs)
318 session = await self.set_session()
319
320 method = method.lower()
321 if method not in ("post", "put"):
322 raise ValueError(
323 f"method has to be either 'post' or 'put', not: {method!r}"
324 )
325
326 meth = getattr(session, method)
327 async with meth(self.encode_url(rpath), data=gen_chunks(), **kw) as resp:
328 self._raise_not_found_for_status(resp, rpath)
329
330 async def _exists(self, path, strict=False, **kwargs):
331 kw = self.kwargs.copy()
332 kw.update(kwargs)
333 try:
334 logger.debug(path)
335 session = await self.set_session()
336 r = await session.get(self.encode_url(path), **kw)
337 async with r:
338 if strict:
339 self._raise_not_found_for_status(r, path)
340 return r.status < 400
341 except FileNotFoundError:
342 return False
343 except aiohttp.ClientError:
344 if strict:
345 raise
346 return False
347
348 async def _isfile(self, path, **kwargs):
349 return await self._exists(path, **kwargs)
350
351 def _open(
352 self,
353 path,
354 mode="rb",
355 block_size=None,
356 autocommit=None, # XXX: This differs from the base class.
357 cache_type=None,
358 cache_options=None,
359 size=None,
360 **kwargs,
361 ):
362 """Make a file-like object
363
364 Parameters
365 ----------
366 path: str
367 Full URL with protocol
368 mode: string
369 must be "rb"
370 block_size: int or None
371 Bytes to download in one request; use instance value if None. If
372 zero, will return a streaming Requests file-like instance.
373 kwargs: key-value
374 Any other parameters, passed to requests calls
375 """
376 if mode != "rb":
377 raise NotImplementedError
378 block_size = block_size if block_size is not None else self.block_size
379 kw = self.kwargs.copy()
380 kw["asynchronous"] = self.asynchronous
381 kw.update(kwargs)
382 info = {}
383 size = size or info.update(self.info(path, **kwargs)) or info["size"]
384 session = sync(self.loop, self.set_session)
385 if block_size and size and info.get("partial", True):
386 return HTTPFile(
387 self,
388 path,
389 session=session,
390 block_size=block_size,
391 mode=mode,
392 size=size,
393 cache_type=cache_type or self.cache_type,
394 cache_options=cache_options or self.cache_options,
395 loop=self.loop,
396 **kw,
397 )
398 else:
399 return HTTPStreamFile(
400 self,
401 path,
402 mode=mode,
403 loop=self.loop,
404 session=session,
405 **kw,
406 )
407
408 async def open_async(self, path, mode="rb", size=None, **kwargs):
409 session = await self.set_session()
410 if size is None:
411 try:
412 size = (await self._info(path, **kwargs))["size"]
413 except FileNotFoundError:
414 pass
415 return AsyncStreamFile(
416 self,
417 path,
418 loop=self.loop,
419 session=session,
420 size=size,
421 **kwargs,
422 )
423
424 def ukey(self, url):
425 """Unique identifier; assume HTTP files are static, unchanging"""
426 return tokenize(url, self.kwargs, self.protocol)
427
428 async def _info(self, url, **kwargs):
429 """Get info of URL
430
431 Tries to access location via HEAD, and then GET methods, but does
432 not fetch the data.
433
434 It is possible that the server does not supply any size information, in
435 which case size will be given as None (and certain operations on the
436 corresponding file will not work).
437 """
438 info = {}
439 session = await self.set_session()
440
441 for policy in ["head", "get"]:
442 try:
443 info.update(
444 await _file_info(
445 self.encode_url(url),
446 size_policy=policy,
447 session=session,
448 **self.kwargs,
449 **kwargs,
450 )
451 )
452 if info.get("size") is not None:
453 break
454 except Exception as exc:
455 if policy == "get":
456 # If get failed, then raise a FileNotFoundError
457 raise FileNotFoundError(url) from exc
458 logger.debug("", exc_info=exc)
459
460 return {"name": url, "size": None, **info, "type": "file"}
461
462 async def _glob(self, path, maxdepth=None, **kwargs):
463 """
464 Find files by glob-matching.
465
466 This implementation is idntical to the one in AbstractFileSystem,
467 but "?" is not considered as a character for globbing, because it is
468 so common in URLs, often identifying the "query" part.
469 """
470 if maxdepth is not None and maxdepth < 1:
471 raise ValueError("maxdepth must be at least 1")
472 import re
473
474 ends_with_slash = path.endswith("/") # _strip_protocol strips trailing slash
475 path = self._strip_protocol(path)
476 append_slash_to_dirname = ends_with_slash or path.endswith(("/**", "/*"))
477 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
478 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
479
480 min_idx = min(idx_star, idx_brace)
481
482 detail = kwargs.pop("detail", False)
483
484 if not has_magic(path):
485 if await self._exists(path, **kwargs):
486 if not detail:
487 return [path]
488 else:
489 return {path: await self._info(path, **kwargs)}
490 else:
491 if not detail:
492 return [] # glob of non-existent returns empty
493 else:
494 return {}
495 elif "/" in path[:min_idx]:
496 min_idx = path[:min_idx].rindex("/")
497 root = path[: min_idx + 1]
498 depth = path[min_idx + 1 :].count("/") + 1
499 else:
500 root = ""
501 depth = path[min_idx + 1 :].count("/") + 1
502
503 if "**" in path:
504 if maxdepth is not None:
505 idx_double_stars = path.find("**")
506 depth_double_stars = path[idx_double_stars:].count("/") + 1
507 depth = depth - depth_double_stars + maxdepth
508 else:
509 depth = None
510
511 allpaths = await self._find(
512 root, maxdepth=depth, withdirs=True, detail=True, **kwargs
513 )
514
515 pattern = glob_translate(path + ("/" if ends_with_slash else ""))
516 pattern = re.compile(pattern)
517
518 out = {
519 (
520 p.rstrip("/")
521 if not append_slash_to_dirname
522 and info["type"] == "directory"
523 and p.endswith("/")
524 else p
525 ): info
526 for p, info in sorted(allpaths.items())
527 if pattern.match(p.rstrip("/"))
528 }
529
530 if detail:
531 return out
532 else:
533 return list(out)
534
535 async def _isdir(self, path):
536 # override, since all URLs are (also) files
537 try:
538 return bool(await self._ls(path))
539 except (FileNotFoundError, ValueError):
540 return False
541
542 async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
543 """
544 Write bytes to a remote file over HTTP.
545
546 Parameters
547 ----------
548 path : str
549 Target URL where the data should be written
550 value : bytes
551 Data to be written
552 mode : str
553 How to write to the file - 'overwrite' or 'append'
554 **kwargs : dict
555 Additional parameters to pass to the HTTP request
556 """
557 url = self._strip_protocol(path)
558 headers = kwargs.pop("headers", {})
559 headers["Content-Length"] = str(len(value))
560
561 session = await self.set_session()
562
563 async with session.put(url, data=value, headers=headers, **kwargs) as r:
564 r.raise_for_status()
565
566
567class HTTPFile(AbstractBufferedFile):
568 """
569 A file-like object pointing to a remote HTTP(S) resource
570
571 Supports only reading, with read-ahead of a predetermined block-size.
572
573 In the case that the server does not supply the filesize, only reading of
574 the complete file in one go is supported.
575
576 Parameters
577 ----------
578 url: str
579 Full URL of the remote resource, including the protocol
580 session: aiohttp.ClientSession or None
581 All calls will be made within this session, to avoid restarting
582 connections where the server allows this
583 block_size: int or None
584 The amount of read-ahead to do, in bytes. Default is 5MB, or the value
585 configured for the FileSystem creating this file
586 size: None or int
587 If given, this is the size of the file in bytes, and we don't attempt
588 to call the server to find the value.
589 kwargs: all other key-values are passed to requests calls.
590 """
591
592 def __init__(
593 self,
594 fs,
595 url,
596 session=None,
597 block_size=None,
598 mode="rb",
599 cache_type="bytes",
600 cache_options=None,
601 size=None,
602 loop=None,
603 asynchronous=False,
604 **kwargs,
605 ):
606 if mode != "rb":
607 raise NotImplementedError("File mode not supported")
608 self.asynchronous = asynchronous
609 self.loop = loop
610 self.url = url
611 self.session = session
612 self.details = {"name": url, "size": size, "type": "file"}
613 super().__init__(
614 fs=fs,
615 path=url,
616 mode=mode,
617 block_size=block_size,
618 cache_type=cache_type,
619 cache_options=cache_options,
620 **kwargs,
621 )
622
623 def read(self, length=-1):
624 """Read bytes from file
625
626 Parameters
627 ----------
628 length: int
629 Read up to this many bytes. If negative, read all content to end of
630 file. If the server has not supplied the filesize, attempting to
631 read only part of the data will raise a ValueError.
632 """
633 if (
634 (length < 0 and self.loc == 0) # explicit read all
635 # but not when the size is known and fits into a block anyways
636 and not (self.size is not None and self.size <= self.blocksize)
637 ):
638 self._fetch_all()
639 if self.size is None:
640 if length < 0:
641 self._fetch_all()
642 else:
643 length = min(self.size - self.loc, length)
644 return super().read(length)
645
646 async def async_fetch_all(self):
647 """Read whole file in one shot, without caching
648
649 This is only called when position is still at zero,
650 and read() is called without a byte-count.
651 """
652 logger.debug(f"Fetch all for {self}")
653 if not isinstance(self.cache, AllBytes):
654 r = await self.session.get(self.fs.encode_url(self.url), **self.kwargs)
655 async with r:
656 r.raise_for_status()
657 out = await r.read()
658 self.cache = AllBytes(
659 size=len(out), fetcher=None, blocksize=None, data=out
660 )
661 self.size = len(out)
662
663 _fetch_all = sync_wrapper(async_fetch_all)
664
665 def _parse_content_range(self, headers):
666 """Parse the Content-Range header"""
667 s = headers.get("Content-Range", "")
668 m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s)
669 if not m:
670 return None, None, None
671
672 if m[1] == "*":
673 start = end = None
674 else:
675 start, end = [int(x) for x in m[1].split("-")]
676 total = None if m[2] == "*" else int(m[2])
677 return start, end, total
678
679 async def async_fetch_range(self, start, end):
680 """Download a block of data
681
682 The expectation is that the server returns only the requested bytes,
683 with HTTP code 206. If this is not the case, we first check the headers,
684 and then stream the output - if the data size is bigger than we
685 requested, an exception is raised.
686 """
687 logger.debug(f"Fetch range for {self}: {start}-{end}")
688 kwargs = self.kwargs.copy()
689 headers = kwargs.pop("headers", {}).copy()
690 headers["Range"] = f"bytes={start}-{end - 1}"
691 logger.debug(f"{self.url} : {headers['Range']}")
692 r = await self.session.get(
693 self.fs.encode_url(self.url), headers=headers, **kwargs
694 )
695 async with r:
696 if r.status == 416:
697 # range request outside file
698 return b""
699 r.raise_for_status()
700
701 # If the server has handled the range request, it should reply
702 # with status 206 (partial content). But we'll guess that a suitable
703 # Content-Range header or a Content-Length no more than the
704 # requested range also mean we have got the desired range.
705 response_is_range = (
706 r.status == 206
707 or self._parse_content_range(r.headers)[0] == start
708 or int(r.headers.get("Content-Length", end + 1)) <= end - start
709 )
710
711 if response_is_range:
712 # partial content, as expected
713 out = await r.read()
714 elif start > 0:
715 raise ValueError(
716 "The HTTP server doesn't appear to support range requests. "
717 "Only reading this file from the beginning is supported. "
718 "Open with block_size=0 for a streaming file interface."
719 )
720 else:
721 # Response is not a range, but we want the start of the file,
722 # so we can read the required amount anyway.
723 cl = 0
724 out = []
725 while True:
726 chunk = await r.content.read(2**20)
727 # data size unknown, let's read until we have enough
728 if chunk:
729 out.append(chunk)
730 cl += len(chunk)
731 if cl > end - start:
732 break
733 else:
734 break
735 out = b"".join(out)[: end - start]
736 return out
737
738 _fetch_range = sync_wrapper(async_fetch_range)
739
740
741magic_check = re.compile("([*[])")
742
743
744def has_magic(s):
745 match = magic_check.search(s)
746 return match is not None
747
748
749class HTTPStreamFile(AbstractBufferedFile):
750 def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs):
751 self.asynchronous = kwargs.pop("asynchronous", False)
752 self.url = url
753 self.loop = loop
754 self.session = session
755 if mode != "rb":
756 raise ValueError
757 self.details = {"name": url, "size": None}
758 super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs)
759
760 async def cor():
761 r = await self.session.get(self.fs.encode_url(url), **kwargs).__aenter__()
762 self.fs._raise_not_found_for_status(r, url)
763 return r
764
765 self.r = sync(self.loop, cor)
766 self.loop = fs.loop
767
768 def seek(self, loc, whence=0):
769 if loc == 0 and whence == 1:
770 return
771 if loc == self.loc and whence == 0:
772 return
773 raise ValueError("Cannot seek streaming HTTP file")
774
775 async def _read(self, num=-1):
776 out = await self.r.content.read(num)
777 self.loc += len(out)
778 return out
779
780 read = sync_wrapper(_read)
781
782 async def _close(self):
783 self.r.close()
784
785 def close(self):
786 asyncio.run_coroutine_threadsafe(self._close(), self.loop)
787 super().close()
788
789
790class AsyncStreamFile(AbstractAsyncStreamedFile):
791 def __init__(
792 self, fs, url, mode="rb", loop=None, session=None, size=None, **kwargs
793 ):
794 self.url = url
795 self.session = session
796 self.r = None
797 if mode != "rb":
798 raise ValueError
799 self.details = {"name": url, "size": None}
800 self.kwargs = kwargs
801 super().__init__(fs=fs, path=url, mode=mode, cache_type="none")
802 self.size = size
803
804 async def read(self, num=-1):
805 if self.r is None:
806 r = await self.session.get(
807 self.fs.encode_url(self.url), **self.kwargs
808 ).__aenter__()
809 self.fs._raise_not_found_for_status(r, self.url)
810 self.r = r
811 out = await self.r.content.read(num)
812 self.loc += len(out)
813 return out
814
815 async def close(self):
816 if self.r is not None:
817 self.r.close()
818 self.r = None
819 await super().close()
820
821
822async def get_range(session, url, start, end, file=None, **kwargs):
823 # explicit get a range when we know it must be safe
824 kwargs = kwargs.copy()
825 headers = kwargs.pop("headers", {}).copy()
826 headers["Range"] = f"bytes={start}-{end - 1}"
827 r = await session.get(url, headers=headers, **kwargs)
828 r.raise_for_status()
829 async with r:
830 out = await r.read()
831 if file:
832 with open(file, "r+b") as f: # noqa: ASYNC230
833 f.seek(start)
834 f.write(out)
835 else:
836 return out
837
838
839async def _file_info(url, session, size_policy="head", **kwargs):
840 """Call HEAD on the server to get details about the file (size/checksum etc.)
841
842 Default operation is to explicitly allow redirects and use encoding
843 'identity' (no compression) to get the true size of the target.
844 """
845 logger.debug("Retrieve file size for %s", url)
846 kwargs = kwargs.copy()
847 ar = kwargs.pop("allow_redirects", True)
848 head = kwargs.get("headers", {}).copy()
849 head["Accept-Encoding"] = "identity"
850 kwargs["headers"] = head
851
852 info = {}
853 if size_policy == "head":
854 r = await session.head(url, allow_redirects=ar, **kwargs)
855 elif size_policy == "get":
856 r = await session.get(url, allow_redirects=ar, **kwargs)
857 else:
858 raise TypeError(f'size_policy must be "head" or "get", got {size_policy}')
859 async with r:
860 r.raise_for_status()
861
862 if "Content-Length" in r.headers:
863 # Some servers may choose to ignore Accept-Encoding and return
864 # compressed content, in which case the returned size is unreliable.
865 if "Content-Encoding" not in r.headers or r.headers["Content-Encoding"] in [
866 "identity",
867 "",
868 ]:
869 info["size"] = int(r.headers["Content-Length"])
870 elif "Content-Range" in r.headers:
871 info["size"] = int(r.headers["Content-Range"].split("/")[1])
872
873 if "Content-Type" in r.headers:
874 info["mimetype"] = r.headers["Content-Type"].partition(";")[0]
875
876 if r.headers.get("Accept-Ranges") == "none":
877 # Some servers may explicitly discourage partial content requests, but
878 # the lack of "Accept-Ranges" does not always indicate they would fail
879 info["partial"] = False
880
881 info["url"] = str(r.url)
882
883 for checksum_field in ["ETag", "Content-MD5", "Digest", "Last-Modified"]:
884 if r.headers.get(checksum_field):
885 info[checksum_field] = r.headers[checksum_field]
886
887 return info
888
889
890async def _file_size(url, session=None, *args, **kwargs):
891 if session is None:
892 session = await get_client()
893 info = await _file_info(url, session=session, *args, **kwargs)
894 return info.get("size")
895
896
897file_size = sync_wrapper(_file_size)