Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/implementations/http.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

448 statements  

1import asyncio 

2import io 

3import logging 

4import re 

5import weakref 

6from copy import copy 

7from urllib.parse import urlparse 

8 

9import aiohttp 

10import yarl 

11 

12from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper 

13from fsspec.callbacks import DEFAULT_CALLBACK 

14from fsspec.exceptions import FSTimeoutError 

15from fsspec.spec import AbstractBufferedFile 

16from fsspec.utils import ( 

17 DEFAULT_BLOCK_SIZE, 

18 glob_translate, 

19 isfilelike, 

20 nullcontext, 

21 tokenize, 

22) 

23 

24from ..caching import AllBytes 

25 

26# https://stackoverflow.com/a/15926317/3821154 

27ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""") 

28ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""") 

29logger = logging.getLogger("fsspec.http") 

30 

31 

32async def get_client(**kwargs): 

33 return aiohttp.ClientSession(**kwargs) 

34 

35 

36class HTTPFileSystem(AsyncFileSystem): 

37 """ 

38 Simple File-System for fetching data via HTTP(S) 

39 

40 ``ls()`` is implemented by loading the parent page and doing a regex 

41 match on the result. If simple_link=True, anything of the form 

42 "http(s)://server.com/stuff?thing=other"; otherwise only links within 

43 HTML href tags will be used. 

44 """ 

45 

46 sep = "/" 

47 

48 def __init__( 

49 self, 

50 simple_links=True, 

51 block_size=None, 

52 same_scheme=True, 

53 size_policy=None, 

54 cache_type="bytes", 

55 cache_options=None, 

56 asynchronous=False, 

57 loop=None, 

58 client_kwargs=None, 

59 get_client=get_client, 

60 encoded=False, 

61 **storage_options, 

62 ): 

63 """ 

64 NB: if this is called async, you must await set_client 

65 

66 Parameters 

67 ---------- 

68 block_size: int 

69 Blocks to read bytes; if 0, will default to raw requests file-like 

70 objects instead of HTTPFile instances 

71 simple_links: bool 

72 If True, will consider both HTML <a> tags and anything that looks 

73 like a URL; if False, will consider only the former. 

74 same_scheme: True 

75 When doing ls/glob, if this is True, only consider paths that have 

76 http/https matching the input URLs. 

77 size_policy: this argument is deprecated 

78 client_kwargs: dict 

79 Passed to aiohttp.ClientSession, see 

80 https://docs.aiohttp.org/en/stable/client_reference.html 

81 For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}`` 

82 get_client: Callable[..., aiohttp.ClientSession] 

83 A callable which takes keyword arguments and constructs 

84 an aiohttp.ClientSession. It's state will be managed by 

85 the HTTPFileSystem class. 

86 storage_options: key-value 

87 Any other parameters passed on to requests 

88 cache_type, cache_options: defaults used in open 

89 """ 

90 super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) 

91 self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE 

92 self.simple_links = simple_links 

93 self.same_schema = same_scheme 

94 self.cache_type = cache_type 

95 self.cache_options = cache_options 

96 self.client_kwargs = client_kwargs or {} 

97 self.get_client = get_client 

98 self.encoded = encoded 

99 self.kwargs = storage_options 

100 self._session = None 

101 

102 # Clean caching-related parameters from `storage_options` 

103 # before propagating them as `request_options` through `self.kwargs`. 

104 # TODO: Maybe rename `self.kwargs` to `self.request_options` to make 

105 # it clearer. 

106 request_options = copy(storage_options) 

107 self.use_listings_cache = request_options.pop("use_listings_cache", False) 

108 request_options.pop("listings_expiry_time", None) 

109 request_options.pop("max_paths", None) 

110 request_options.pop("skip_instance_cache", None) 

111 self.kwargs = request_options 

112 

113 @property 

114 def fsid(self): 

115 return "http" 

116 

117 def encode_url(self, url): 

118 return yarl.URL(url, encoded=self.encoded) 

119 

120 @staticmethod 

121 def close_session(loop, session): 

122 if loop is not None and loop.is_running(): 

123 try: 

124 sync(loop, session.close, timeout=0.1) 

125 return 

126 except (TimeoutError, FSTimeoutError, NotImplementedError): 

127 pass 

128 connector = getattr(session, "_connector", None) 

129 if connector is not None: 

130 # close after loop is dead 

131 connector._close() 

132 

133 async def set_session(self): 

134 if self._session is None: 

135 self._session = await self.get_client(loop=self.loop, **self.client_kwargs) 

136 if not self.asynchronous: 

137 weakref.finalize(self, self.close_session, self.loop, self._session) 

138 return self._session 

139 

140 @classmethod 

141 def _strip_protocol(cls, path): 

142 """For HTTP, we always want to keep the full URL""" 

143 return path 

144 

145 @classmethod 

146 def _parent(cls, path): 

147 # override, since _strip_protocol is different for URLs 

148 par = super()._parent(path) 

149 if len(par) > 7: # "http://..." 

150 return par 

151 return "" 

152 

153 async def _ls_real(self, url, detail=True, **kwargs): 

154 # ignoring URL-encoded arguments 

155 kw = self.kwargs.copy() 

156 kw.update(kwargs) 

157 logger.debug(url) 

158 session = await self.set_session() 

159 async with session.get(self.encode_url(url), **self.kwargs) as r: 

160 self._raise_not_found_for_status(r, url) 

161 try: 

162 text = await r.text() 

163 if self.simple_links: 

164 links = ex2.findall(text) + [u[2] for u in ex.findall(text)] 

165 else: 

166 links = [u[2] for u in ex.findall(text)] 

167 except UnicodeDecodeError: 

168 links = [] # binary, not HTML 

169 out = set() 

170 parts = urlparse(url) 

171 for l in links: 

172 if isinstance(l, tuple): 

173 l = l[1] 

174 if l.startswith("/") and len(l) > 1: 

175 # absolute URL on this server 

176 l = f"{parts.scheme}://{parts.netloc}{l}" 

177 if l.startswith("http"): 

178 if self.same_schema and l.startswith(url.rstrip("/") + "/"): 

179 out.add(l) 

180 elif l.replace("https", "http").startswith( 

181 url.replace("https", "http").rstrip("/") + "/" 

182 ): 

183 # allowed to cross http <-> https 

184 out.add(l) 

185 else: 

186 if l not in ["..", "../"]: 

187 # Ignore FTP-like "parent" 

188 out.add("/".join([url.rstrip("/"), l.lstrip("/")])) 

189 if not out and url.endswith("/"): 

190 out = await self._ls_real(url.rstrip("/"), detail=False) 

191 if detail: 

192 return [ 

193 { 

194 "name": u, 

195 "size": None, 

196 "type": "directory" if u.endswith("/") else "file", 

197 } 

198 for u in out 

199 ] 

200 else: 

201 return sorted(out) 

202 

203 async def _ls(self, url, detail=True, **kwargs): 

204 if self.use_listings_cache and url in self.dircache: 

205 out = self.dircache[url] 

206 else: 

207 out = await self._ls_real(url, detail=detail, **kwargs) 

208 self.dircache[url] = out 

209 return out 

210 

211 ls = sync_wrapper(_ls) 

212 

213 def _raise_not_found_for_status(self, response, url): 

214 """ 

215 Raises FileNotFoundError for 404s, otherwise uses raise_for_status. 

216 """ 

217 if response.status == 404: 

218 raise FileNotFoundError(url) 

219 response.raise_for_status() 

220 

221 async def _cat_file(self, url, start=None, end=None, **kwargs): 

222 kw = self.kwargs.copy() 

223 kw.update(kwargs) 

224 logger.debug(url) 

225 

226 if start is not None or end is not None: 

227 if start == end: 

228 return b"" 

229 headers = kw.pop("headers", {}).copy() 

230 

231 headers["Range"] = await self._process_limits(url, start, end) 

232 kw["headers"] = headers 

233 session = await self.set_session() 

234 async with session.get(self.encode_url(url), **kw) as r: 

235 out = await r.read() 

236 self._raise_not_found_for_status(r, url) 

237 return out 

238 

239 async def _get_file( 

240 self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs 

241 ): 

242 kw = self.kwargs.copy() 

243 kw.update(kwargs) 

244 logger.debug(rpath) 

245 session = await self.set_session() 

246 async with session.get(self.encode_url(rpath), **kw) as r: 

247 try: 

248 size = int(r.headers["content-length"]) 

249 except (ValueError, KeyError): 

250 size = None 

251 

252 callback.set_size(size) 

253 self._raise_not_found_for_status(r, rpath) 

254 if isfilelike(lpath): 

255 outfile = lpath 

256 else: 

257 outfile = open(lpath, "wb") # noqa: ASYNC101 

258 

259 try: 

260 chunk = True 

261 while chunk: 

262 chunk = await r.content.read(chunk_size) 

263 outfile.write(chunk) 

264 callback.relative_update(len(chunk)) 

265 finally: 

266 if not isfilelike(lpath): 

267 outfile.close() 

268 

269 async def _put_file( 

270 self, 

271 lpath, 

272 rpath, 

273 chunk_size=5 * 2**20, 

274 callback=DEFAULT_CALLBACK, 

275 method="post", 

276 **kwargs, 

277 ): 

278 async def gen_chunks(): 

279 # Support passing arbitrary file-like objects 

280 # and use them instead of streams. 

281 if isinstance(lpath, io.IOBase): 

282 context = nullcontext(lpath) 

283 use_seek = False # might not support seeking 

284 else: 

285 context = open(lpath, "rb") # noqa: ASYNC101 

286 use_seek = True 

287 

288 with context as f: 

289 if use_seek: 

290 callback.set_size(f.seek(0, 2)) 

291 f.seek(0) 

292 else: 

293 callback.set_size(getattr(f, "size", None)) 

294 

295 chunk = f.read(chunk_size) 

296 while chunk: 

297 yield chunk 

298 callback.relative_update(len(chunk)) 

299 chunk = f.read(chunk_size) 

300 

301 kw = self.kwargs.copy() 

302 kw.update(kwargs) 

303 session = await self.set_session() 

304 

305 method = method.lower() 

306 if method not in ("post", "put"): 

307 raise ValueError( 

308 f"method has to be either 'post' or 'put', not: {method!r}" 

309 ) 

310 

311 meth = getattr(session, method) 

312 async with meth(self.encode_url(rpath), data=gen_chunks(), **kw) as resp: 

313 self._raise_not_found_for_status(resp, rpath) 

314 

315 async def _exists(self, path, **kwargs): 

316 kw = self.kwargs.copy() 

317 kw.update(kwargs) 

318 try: 

319 logger.debug(path) 

320 session = await self.set_session() 

321 r = await session.get(self.encode_url(path), **kw) 

322 async with r: 

323 return r.status < 400 

324 except aiohttp.ClientError: 

325 return False 

326 

327 async def _isfile(self, path, **kwargs): 

328 return await self._exists(path, **kwargs) 

329 

330 def _open( 

331 self, 

332 path, 

333 mode="rb", 

334 block_size=None, 

335 autocommit=None, # XXX: This differs from the base class. 

336 cache_type=None, 

337 cache_options=None, 

338 size=None, 

339 **kwargs, 

340 ): 

341 """Make a file-like object 

342 

343 Parameters 

344 ---------- 

345 path: str 

346 Full URL with protocol 

347 mode: string 

348 must be "rb" 

349 block_size: int or None 

350 Bytes to download in one request; use instance value if None. If 

351 zero, will return a streaming Requests file-like instance. 

352 kwargs: key-value 

353 Any other parameters, passed to requests calls 

354 """ 

355 if mode != "rb": 

356 raise NotImplementedError 

357 block_size = block_size if block_size is not None else self.block_size 

358 kw = self.kwargs.copy() 

359 kw["asynchronous"] = self.asynchronous 

360 kw.update(kwargs) 

361 size = size or self.info(path, **kwargs)["size"] 

362 session = sync(self.loop, self.set_session) 

363 if block_size and size: 

364 return HTTPFile( 

365 self, 

366 path, 

367 session=session, 

368 block_size=block_size, 

369 mode=mode, 

370 size=size, 

371 cache_type=cache_type or self.cache_type, 

372 cache_options=cache_options or self.cache_options, 

373 loop=self.loop, 

374 **kw, 

375 ) 

376 else: 

377 return HTTPStreamFile( 

378 self, 

379 path, 

380 mode=mode, 

381 loop=self.loop, 

382 session=session, 

383 **kw, 

384 ) 

385 

386 async def open_async(self, path, mode="rb", size=None, **kwargs): 

387 session = await self.set_session() 

388 if size is None: 

389 try: 

390 size = (await self._info(path, **kwargs))["size"] 

391 except FileNotFoundError: 

392 pass 

393 return AsyncStreamFile( 

394 self, 

395 path, 

396 loop=self.loop, 

397 session=session, 

398 size=size, 

399 **kwargs, 

400 ) 

401 

402 def ukey(self, url): 

403 """Unique identifier; assume HTTP files are static, unchanging""" 

404 return tokenize(url, self.kwargs, self.protocol) 

405 

406 async def _info(self, url, **kwargs): 

407 """Get info of URL 

408 

409 Tries to access location via HEAD, and then GET methods, but does 

410 not fetch the data. 

411 

412 It is possible that the server does not supply any size information, in 

413 which case size will be given as None (and certain operations on the 

414 corresponding file will not work). 

415 """ 

416 info = {} 

417 session = await self.set_session() 

418 

419 for policy in ["head", "get"]: 

420 try: 

421 info.update( 

422 await _file_info( 

423 self.encode_url(url), 

424 size_policy=policy, 

425 session=session, 

426 **self.kwargs, 

427 **kwargs, 

428 ) 

429 ) 

430 if info.get("size") is not None: 

431 break 

432 except Exception as exc: 

433 if policy == "get": 

434 # If get failed, then raise a FileNotFoundError 

435 raise FileNotFoundError(url) from exc 

436 logger.debug("", exc_info=exc) 

437 

438 return {"name": url, "size": None, **info, "type": "file"} 

439 

440 async def _glob(self, path, maxdepth=None, **kwargs): 

441 """ 

442 Find files by glob-matching. 

443 

444 This implementation is idntical to the one in AbstractFileSystem, 

445 but "?" is not considered as a character for globbing, because it is 

446 so common in URLs, often identifying the "query" part. 

447 """ 

448 if maxdepth is not None and maxdepth < 1: 

449 raise ValueError("maxdepth must be at least 1") 

450 import re 

451 

452 ends_with_slash = path.endswith("/") # _strip_protocol strips trailing slash 

453 path = self._strip_protocol(path) 

454 append_slash_to_dirname = ends_with_slash or path.endswith(("/**", "/*")) 

455 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

456 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

457 

458 min_idx = min(idx_star, idx_brace) 

459 

460 detail = kwargs.pop("detail", False) 

461 

462 if not has_magic(path): 

463 if await self._exists(path, **kwargs): 

464 if not detail: 

465 return [path] 

466 else: 

467 return {path: await self._info(path, **kwargs)} 

468 else: 

469 if not detail: 

470 return [] # glob of non-existent returns empty 

471 else: 

472 return {} 

473 elif "/" in path[:min_idx]: 

474 min_idx = path[:min_idx].rindex("/") 

475 root = path[: min_idx + 1] 

476 depth = path[min_idx + 1 :].count("/") + 1 

477 else: 

478 root = "" 

479 depth = path[min_idx + 1 :].count("/") + 1 

480 

481 if "**" in path: 

482 if maxdepth is not None: 

483 idx_double_stars = path.find("**") 

484 depth_double_stars = path[idx_double_stars:].count("/") + 1 

485 depth = depth - depth_double_stars + maxdepth 

486 else: 

487 depth = None 

488 

489 allpaths = await self._find( 

490 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

491 ) 

492 

493 pattern = glob_translate(path + ("/" if ends_with_slash else "")) 

494 pattern = re.compile(pattern) 

495 

496 out = { 

497 ( 

498 p.rstrip("/") 

499 if not append_slash_to_dirname 

500 and info["type"] == "directory" 

501 and p.endswith("/") 

502 else p 

503 ): info 

504 for p, info in sorted(allpaths.items()) 

505 if pattern.match(p.rstrip("/")) 

506 } 

507 

508 if detail: 

509 return out 

510 else: 

511 return list(out) 

512 

513 async def _isdir(self, path): 

514 # override, since all URLs are (also) files 

515 try: 

516 return bool(await self._ls(path)) 

517 except (FileNotFoundError, ValueError): 

518 return False 

519 

520 

521class HTTPFile(AbstractBufferedFile): 

522 """ 

523 A file-like object pointing to a remove HTTP(S) resource 

524 

525 Supports only reading, with read-ahead of a predermined block-size. 

526 

527 In the case that the server does not supply the filesize, only reading of 

528 the complete file in one go is supported. 

529 

530 Parameters 

531 ---------- 

532 url: str 

533 Full URL of the remote resource, including the protocol 

534 session: aiohttp.ClientSession or None 

535 All calls will be made within this session, to avoid restarting 

536 connections where the server allows this 

537 block_size: int or None 

538 The amount of read-ahead to do, in bytes. Default is 5MB, or the value 

539 configured for the FileSystem creating this file 

540 size: None or int 

541 If given, this is the size of the file in bytes, and we don't attempt 

542 to call the server to find the value. 

543 kwargs: all other key-values are passed to requests calls. 

544 """ 

545 

546 def __init__( 

547 self, 

548 fs, 

549 url, 

550 session=None, 

551 block_size=None, 

552 mode="rb", 

553 cache_type="bytes", 

554 cache_options=None, 

555 size=None, 

556 loop=None, 

557 asynchronous=False, 

558 **kwargs, 

559 ): 

560 if mode != "rb": 

561 raise NotImplementedError("File mode not supported") 

562 self.asynchronous = asynchronous 

563 self.loop = loop 

564 self.url = url 

565 self.session = session 

566 self.details = {"name": url, "size": size, "type": "file"} 

567 super().__init__( 

568 fs=fs, 

569 path=url, 

570 mode=mode, 

571 block_size=block_size, 

572 cache_type=cache_type, 

573 cache_options=cache_options, 

574 **kwargs, 

575 ) 

576 

577 def read(self, length=-1): 

578 """Read bytes from file 

579 

580 Parameters 

581 ---------- 

582 length: int 

583 Read up to this many bytes. If negative, read all content to end of 

584 file. If the server has not supplied the filesize, attempting to 

585 read only part of the data will raise a ValueError. 

586 """ 

587 if ( 

588 (length < 0 and self.loc == 0) # explicit read all 

589 # but not when the size is known and fits into a block anyways 

590 and not (self.size is not None and self.size <= self.blocksize) 

591 ): 

592 self._fetch_all() 

593 if self.size is None: 

594 if length < 0: 

595 self._fetch_all() 

596 else: 

597 length = min(self.size - self.loc, length) 

598 return super().read(length) 

599 

600 async def async_fetch_all(self): 

601 """Read whole file in one shot, without caching 

602 

603 This is only called when position is still at zero, 

604 and read() is called without a byte-count. 

605 """ 

606 logger.debug(f"Fetch all for {self}") 

607 if not isinstance(self.cache, AllBytes): 

608 r = await self.session.get(self.fs.encode_url(self.url), **self.kwargs) 

609 async with r: 

610 r.raise_for_status() 

611 out = await r.read() 

612 self.cache = AllBytes( 

613 size=len(out), fetcher=None, blocksize=None, data=out 

614 ) 

615 self.size = len(out) 

616 

617 _fetch_all = sync_wrapper(async_fetch_all) 

618 

619 def _parse_content_range(self, headers): 

620 """Parse the Content-Range header""" 

621 s = headers.get("Content-Range", "") 

622 m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s) 

623 if not m: 

624 return None, None, None 

625 

626 if m[1] == "*": 

627 start = end = None 

628 else: 

629 start, end = [int(x) for x in m[1].split("-")] 

630 total = None if m[2] == "*" else int(m[2]) 

631 return start, end, total 

632 

633 async def async_fetch_range(self, start, end): 

634 """Download a block of data 

635 

636 The expectation is that the server returns only the requested bytes, 

637 with HTTP code 206. If this is not the case, we first check the headers, 

638 and then stream the output - if the data size is bigger than we 

639 requested, an exception is raised. 

640 """ 

641 logger.debug(f"Fetch range for {self}: {start}-{end}") 

642 kwargs = self.kwargs.copy() 

643 headers = kwargs.pop("headers", {}).copy() 

644 headers["Range"] = f"bytes={start}-{end - 1}" 

645 logger.debug(f"{self.url} : {headers['Range']}") 

646 r = await self.session.get( 

647 self.fs.encode_url(self.url), headers=headers, **kwargs 

648 ) 

649 async with r: 

650 if r.status == 416: 

651 # range request outside file 

652 return b"" 

653 r.raise_for_status() 

654 

655 # If the server has handled the range request, it should reply 

656 # with status 206 (partial content). But we'll guess that a suitable 

657 # Content-Range header or a Content-Length no more than the 

658 # requested range also mean we have got the desired range. 

659 response_is_range = ( 

660 r.status == 206 

661 or self._parse_content_range(r.headers)[0] == start 

662 or int(r.headers.get("Content-Length", end + 1)) <= end - start 

663 ) 

664 

665 if response_is_range: 

666 # partial content, as expected 

667 out = await r.read() 

668 elif start > 0: 

669 raise ValueError( 

670 "The HTTP server doesn't appear to support range requests. " 

671 "Only reading this file from the beginning is supported. " 

672 "Open with block_size=0 for a streaming file interface." 

673 ) 

674 else: 

675 # Response is not a range, but we want the start of the file, 

676 # so we can read the required amount anyway. 

677 cl = 0 

678 out = [] 

679 while True: 

680 chunk = await r.content.read(2**20) 

681 # data size unknown, let's read until we have enough 

682 if chunk: 

683 out.append(chunk) 

684 cl += len(chunk) 

685 if cl > end - start: 

686 break 

687 else: 

688 break 

689 out = b"".join(out)[: end - start] 

690 return out 

691 

692 _fetch_range = sync_wrapper(async_fetch_range) 

693 

694 def __reduce__(self): 

695 return ( 

696 reopen, 

697 ( 

698 self.fs, 

699 self.url, 

700 self.mode, 

701 self.blocksize, 

702 self.cache.name if self.cache else "none", 

703 self.size, 

704 ), 

705 ) 

706 

707 

708def reopen(fs, url, mode, blocksize, cache_type, size=None): 

709 return fs.open( 

710 url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size 

711 ) 

712 

713 

714magic_check = re.compile("([*[])") 

715 

716 

717def has_magic(s): 

718 match = magic_check.search(s) 

719 return match is not None 

720 

721 

722class HTTPStreamFile(AbstractBufferedFile): 

723 def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs): 

724 self.asynchronous = kwargs.pop("asynchronous", False) 

725 self.url = url 

726 self.loop = loop 

727 self.session = session 

728 if mode != "rb": 

729 raise ValueError 

730 self.details = {"name": url, "size": None} 

731 super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs) 

732 

733 async def cor(): 

734 r = await self.session.get(self.fs.encode_url(url), **kwargs).__aenter__() 

735 self.fs._raise_not_found_for_status(r, url) 

736 return r 

737 

738 self.r = sync(self.loop, cor) 

739 self.loop = fs.loop 

740 

741 def seek(self, loc, whence=0): 

742 if loc == 0 and whence == 1: 

743 return 

744 if loc == self.loc and whence == 0: 

745 return 

746 raise ValueError("Cannot seek streaming HTTP file") 

747 

748 async def _read(self, num=-1): 

749 out = await self.r.content.read(num) 

750 self.loc += len(out) 

751 return out 

752 

753 read = sync_wrapper(_read) 

754 

755 async def _close(self): 

756 self.r.close() 

757 

758 def close(self): 

759 asyncio.run_coroutine_threadsafe(self._close(), self.loop) 

760 super().close() 

761 

762 def __reduce__(self): 

763 return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name) 

764 

765 

766class AsyncStreamFile(AbstractAsyncStreamedFile): 

767 def __init__( 

768 self, fs, url, mode="rb", loop=None, session=None, size=None, **kwargs 

769 ): 

770 self.url = url 

771 self.session = session 

772 self.r = None 

773 if mode != "rb": 

774 raise ValueError 

775 self.details = {"name": url, "size": None} 

776 self.kwargs = kwargs 

777 super().__init__(fs=fs, path=url, mode=mode, cache_type="none") 

778 self.size = size 

779 

780 async def read(self, num=-1): 

781 if self.r is None: 

782 r = await self.session.get( 

783 self.fs.encode_url(self.url), **self.kwargs 

784 ).__aenter__() 

785 self.fs._raise_not_found_for_status(r, self.url) 

786 self.r = r 

787 out = await self.r.content.read(num) 

788 self.loc += len(out) 

789 return out 

790 

791 async def close(self): 

792 if self.r is not None: 

793 self.r.close() 

794 self.r = None 

795 await super().close() 

796 

797 

798async def get_range(session, url, start, end, file=None, **kwargs): 

799 # explicit get a range when we know it must be safe 

800 kwargs = kwargs.copy() 

801 headers = kwargs.pop("headers", {}).copy() 

802 headers["Range"] = f"bytes={start}-{end - 1}" 

803 r = await session.get(url, headers=headers, **kwargs) 

804 r.raise_for_status() 

805 async with r: 

806 out = await r.read() 

807 if file: 

808 with open(file, "r+b") as f: # noqa: ASYNC101 

809 f.seek(start) 

810 f.write(out) 

811 else: 

812 return out 

813 

814 

815async def _file_info(url, session, size_policy="head", **kwargs): 

816 """Call HEAD on the server to get details about the file (size/checksum etc.) 

817 

818 Default operation is to explicitly allow redirects and use encoding 

819 'identity' (no compression) to get the true size of the target. 

820 """ 

821 logger.debug("Retrieve file size for %s", url) 

822 kwargs = kwargs.copy() 

823 ar = kwargs.pop("allow_redirects", True) 

824 head = kwargs.get("headers", {}).copy() 

825 head["Accept-Encoding"] = "identity" 

826 kwargs["headers"] = head 

827 

828 info = {} 

829 if size_policy == "head": 

830 r = await session.head(url, allow_redirects=ar, **kwargs) 

831 elif size_policy == "get": 

832 r = await session.get(url, allow_redirects=ar, **kwargs) 

833 else: 

834 raise TypeError(f'size_policy must be "head" or "get", got {size_policy}') 

835 async with r: 

836 r.raise_for_status() 

837 

838 # TODO: 

839 # recognise lack of 'Accept-Ranges', 

840 # or 'Accept-Ranges': 'none' (not 'bytes') 

841 # to mean streaming only, no random access => return None 

842 if "Content-Length" in r.headers: 

843 # Some servers may choose to ignore Accept-Encoding and return 

844 # compressed content, in which case the returned size is unreliable. 

845 if "Content-Encoding" not in r.headers or r.headers["Content-Encoding"] in [ 

846 "identity", 

847 "", 

848 ]: 

849 info["size"] = int(r.headers["Content-Length"]) 

850 elif "Content-Range" in r.headers: 

851 info["size"] = int(r.headers["Content-Range"].split("/")[1]) 

852 

853 if "Content-Type" in r.headers: 

854 info["mimetype"] = r.headers["Content-Type"].partition(";")[0] 

855 

856 info["url"] = str(r.url) 

857 

858 for checksum_field in ["ETag", "Content-MD5", "Digest"]: 

859 if r.headers.get(checksum_field): 

860 info[checksum_field] = r.headers[checksum_field] 

861 

862 return info 

863 

864 

865async def _file_size(url, session=None, *args, **kwargs): 

866 if session is None: 

867 session = await get_client() 

868 info = await _file_info(url, session=session, *args, **kwargs) 

869 return info.get("size") 

870 

871 

872file_size = sync_wrapper(_file_size)