Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/implementations/http.py: 25%

442 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import asyncio 

2import io 

3import logging 

4import re 

5import weakref 

6from copy import copy 

7from urllib.parse import urlparse 

8 

9import aiohttp 

10import requests 

11import yarl 

12 

13from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper 

14from fsspec.callbacks import _DEFAULT_CALLBACK 

15from fsspec.exceptions import FSTimeoutError 

16from fsspec.spec import AbstractBufferedFile 

17from fsspec.utils import ( 

18 DEFAULT_BLOCK_SIZE, 

19 glob_translate, 

20 isfilelike, 

21 nullcontext, 

22 tokenize, 

23) 

24 

25from ..caching import AllBytes 

26 

27# https://stackoverflow.com/a/15926317/3821154 

28ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""") 

29ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""") 

30logger = logging.getLogger("fsspec.http") 

31 

32 

33async def get_client(**kwargs): 

34 return aiohttp.ClientSession(**kwargs) 

35 

36 

37class HTTPFileSystem(AsyncFileSystem): 

38 """ 

39 Simple File-System for fetching data via HTTP(S) 

40 

41 ``ls()`` is implemented by loading the parent page and doing a regex 

42 match on the result. If simple_link=True, anything of the form 

43 "http(s)://server.com/stuff?thing=other"; otherwise only links within 

44 HTML href tags will be used. 

45 """ 

46 

47 sep = "/" 

48 

49 def __init__( 

50 self, 

51 simple_links=True, 

52 block_size=None, 

53 same_scheme=True, 

54 size_policy=None, 

55 cache_type="bytes", 

56 cache_options=None, 

57 asynchronous=False, 

58 loop=None, 

59 client_kwargs=None, 

60 get_client=get_client, 

61 encoded=False, 

62 **storage_options, 

63 ): 

64 """ 

65 NB: if this is called async, you must await set_client 

66 

67 Parameters 

68 ---------- 

69 block_size: int 

70 Blocks to read bytes; if 0, will default to raw requests file-like 

71 objects instead of HTTPFile instances 

72 simple_links: bool 

73 If True, will consider both HTML <a> tags and anything that looks 

74 like a URL; if False, will consider only the former. 

75 same_scheme: True 

76 When doing ls/glob, if this is True, only consider paths that have 

77 http/https matching the input URLs. 

78 size_policy: this argument is deprecated 

79 client_kwargs: dict 

80 Passed to aiohttp.ClientSession, see 

81 https://docs.aiohttp.org/en/stable/client_reference.html 

82 For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}`` 

83 get_client: Callable[..., aiohttp.ClientSession] 

84 A callable which takes keyword arguments and constructs 

85 an aiohttp.ClientSession. It's state will be managed by 

86 the HTTPFileSystem class. 

87 storage_options: key-value 

88 Any other parameters passed on to requests 

89 cache_type, cache_options: defaults used in open 

90 """ 

91 super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) 

92 self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE 

93 self.simple_links = simple_links 

94 self.same_schema = same_scheme 

95 self.cache_type = cache_type 

96 self.cache_options = cache_options 

97 self.client_kwargs = client_kwargs or {} 

98 self.get_client = get_client 

99 self.encoded = encoded 

100 self.kwargs = storage_options 

101 self._session = None 

102 

103 # Clean caching-related parameters from `storage_options` 

104 # before propagating them as `request_options` through `self.kwargs`. 

105 # TODO: Maybe rename `self.kwargs` to `self.request_options` to make 

106 # it clearer. 

107 request_options = copy(storage_options) 

108 self.use_listings_cache = request_options.pop("use_listings_cache", False) 

109 request_options.pop("listings_expiry_time", None) 

110 request_options.pop("max_paths", None) 

111 request_options.pop("skip_instance_cache", None) 

112 self.kwargs = request_options 

113 

114 @property 

115 def fsid(self): 

116 return "http" 

117 

118 def encode_url(self, url): 

119 return yarl.URL(url, encoded=self.encoded) 

120 

121 @staticmethod 

122 def close_session(loop, session): 

123 if loop is not None and loop.is_running(): 

124 try: 

125 sync(loop, session.close, timeout=0.1) 

126 return 

127 except (TimeoutError, FSTimeoutError): 

128 pass 

129 connector = getattr(session, "_connector", None) 

130 if connector is not None: 

131 # close after loop is dead 

132 connector._close() 

133 

134 async def set_session(self): 

135 if self._session is None: 

136 self._session = await self.get_client(loop=self.loop, **self.client_kwargs) 

137 if not self.asynchronous: 

138 weakref.finalize(self, self.close_session, self.loop, self._session) 

139 return self._session 

140 

141 @classmethod 

142 def _strip_protocol(cls, path): 

143 """For HTTP, we always want to keep the full URL""" 

144 return path 

145 

146 @classmethod 

147 def _parent(cls, path): 

148 # override, since _strip_protocol is different for URLs 

149 par = super()._parent(path) 

150 if len(par) > 7: # "http://..." 

151 return par 

152 return "" 

153 

154 async def _ls_real(self, url, detail=True, **kwargs): 

155 # ignoring URL-encoded arguments 

156 kw = self.kwargs.copy() 

157 kw.update(kwargs) 

158 logger.debug(url) 

159 session = await self.set_session() 

160 async with session.get(self.encode_url(url), **self.kwargs) as r: 

161 self._raise_not_found_for_status(r, url) 

162 text = await r.text() 

163 if self.simple_links: 

164 links = ex2.findall(text) + [u[2] for u in ex.findall(text)] 

165 else: 

166 links = [u[2] for u in ex.findall(text)] 

167 out = set() 

168 parts = urlparse(url) 

169 for l in links: 

170 if isinstance(l, tuple): 

171 l = l[1] 

172 if l.startswith("/") and len(l) > 1: 

173 # absolute URL on this server 

174 l = f"{parts.scheme}://{parts.netloc}{l}" 

175 if l.startswith("http"): 

176 if self.same_schema and l.startswith(url.rstrip("/") + "/"): 

177 out.add(l) 

178 elif l.replace("https", "http").startswith( 

179 url.replace("https", "http").rstrip("/") + "/" 

180 ): 

181 # allowed to cross http <-> https 

182 out.add(l) 

183 else: 

184 if l not in ["..", "../"]: 

185 # Ignore FTP-like "parent" 

186 out.add("/".join([url.rstrip("/"), l.lstrip("/")])) 

187 if not out and url.endswith("/"): 

188 out = await self._ls_real(url.rstrip("/"), detail=False) 

189 if detail: 

190 return [ 

191 { 

192 "name": u, 

193 "size": None, 

194 "type": "directory" if u.endswith("/") else "file", 

195 } 

196 for u in out 

197 ] 

198 else: 

199 return sorted(out) 

200 

201 async def _ls(self, url, detail=True, **kwargs): 

202 if self.use_listings_cache and url in self.dircache: 

203 out = self.dircache[url] 

204 else: 

205 out = await self._ls_real(url, detail=detail, **kwargs) 

206 self.dircache[url] = out 

207 return out 

208 

209 ls = sync_wrapper(_ls) 

210 

211 def _raise_not_found_for_status(self, response, url): 

212 """ 

213 Raises FileNotFoundError for 404s, otherwise uses raise_for_status. 

214 """ 

215 if response.status == 404: 

216 raise FileNotFoundError(url) 

217 response.raise_for_status() 

218 

219 async def _cat_file(self, url, start=None, end=None, **kwargs): 

220 kw = self.kwargs.copy() 

221 kw.update(kwargs) 

222 logger.debug(url) 

223 

224 if start is not None or end is not None: 

225 if start == end: 

226 return b"" 

227 headers = kw.pop("headers", {}).copy() 

228 

229 headers["Range"] = await self._process_limits(url, start, end) 

230 kw["headers"] = headers 

231 session = await self.set_session() 

232 async with session.get(self.encode_url(url), **kw) as r: 

233 out = await r.read() 

234 self._raise_not_found_for_status(r, url) 

235 return out 

236 

237 async def _get_file( 

238 self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs 

239 ): 

240 kw = self.kwargs.copy() 

241 kw.update(kwargs) 

242 logger.debug(rpath) 

243 session = await self.set_session() 

244 async with session.get(self.encode_url(rpath), **kw) as r: 

245 try: 

246 size = int(r.headers["content-length"]) 

247 except (ValueError, KeyError): 

248 size = None 

249 

250 callback.set_size(size) 

251 self._raise_not_found_for_status(r, rpath) 

252 if isfilelike(lpath): 

253 outfile = lpath 

254 else: 

255 outfile = open(lpath, "wb") 

256 

257 try: 

258 chunk = True 

259 while chunk: 

260 chunk = await r.content.read(chunk_size) 

261 outfile.write(chunk) 

262 callback.relative_update(len(chunk)) 

263 finally: 

264 if not isfilelike(lpath): 

265 outfile.close() 

266 

267 async def _put_file( 

268 self, 

269 lpath, 

270 rpath, 

271 chunk_size=5 * 2**20, 

272 callback=_DEFAULT_CALLBACK, 

273 method="post", 

274 **kwargs, 

275 ): 

276 async def gen_chunks(): 

277 # Support passing arbitrary file-like objects 

278 # and use them instead of streams. 

279 if isinstance(lpath, io.IOBase): 

280 context = nullcontext(lpath) 

281 use_seek = False # might not support seeking 

282 else: 

283 context = open(lpath, "rb") 

284 use_seek = True 

285 

286 with context as f: 

287 if use_seek: 

288 callback.set_size(f.seek(0, 2)) 

289 f.seek(0) 

290 else: 

291 callback.set_size(getattr(f, "size", None)) 

292 

293 chunk = f.read(chunk_size) 

294 while chunk: 

295 yield chunk 

296 callback.relative_update(len(chunk)) 

297 chunk = f.read(chunk_size) 

298 

299 kw = self.kwargs.copy() 

300 kw.update(kwargs) 

301 session = await self.set_session() 

302 

303 method = method.lower() 

304 if method not in ("post", "put"): 

305 raise ValueError( 

306 f"method has to be either 'post' or 'put', not: {method!r}" 

307 ) 

308 

309 meth = getattr(session, method) 

310 async with meth(self.encode_url(rpath), data=gen_chunks(), **kw) as resp: 

311 self._raise_not_found_for_status(resp, rpath) 

312 

313 async def _exists(self, path, **kwargs): 

314 kw = self.kwargs.copy() 

315 kw.update(kwargs) 

316 try: 

317 logger.debug(path) 

318 session = await self.set_session() 

319 r = await session.get(self.encode_url(path), **kw) 

320 async with r: 

321 return r.status < 400 

322 except (requests.HTTPError, aiohttp.ClientError): 

323 return False 

324 

325 async def _isfile(self, path, **kwargs): 

326 return await self._exists(path, **kwargs) 

327 

328 def _open( 

329 self, 

330 path, 

331 mode="rb", 

332 block_size=None, 

333 autocommit=None, # XXX: This differs from the base class. 

334 cache_type=None, 

335 cache_options=None, 

336 size=None, 

337 **kwargs, 

338 ): 

339 """Make a file-like object 

340 

341 Parameters 

342 ---------- 

343 path: str 

344 Full URL with protocol 

345 mode: string 

346 must be "rb" 

347 block_size: int or None 

348 Bytes to download in one request; use instance value if None. If 

349 zero, will return a streaming Requests file-like instance. 

350 kwargs: key-value 

351 Any other parameters, passed to requests calls 

352 """ 

353 if mode != "rb": 

354 raise NotImplementedError 

355 block_size = block_size if block_size is not None else self.block_size 

356 kw = self.kwargs.copy() 

357 kw["asynchronous"] = self.asynchronous 

358 kw.update(kwargs) 

359 size = size or self.info(path, **kwargs)["size"] 

360 session = sync(self.loop, self.set_session) 

361 if block_size and size: 

362 return HTTPFile( 

363 self, 

364 path, 

365 session=session, 

366 block_size=block_size, 

367 mode=mode, 

368 size=size, 

369 cache_type=cache_type or self.cache_type, 

370 cache_options=cache_options or self.cache_options, 

371 loop=self.loop, 

372 **kw, 

373 ) 

374 else: 

375 return HTTPStreamFile( 

376 self, 

377 path, 

378 mode=mode, 

379 loop=self.loop, 

380 session=session, 

381 **kw, 

382 ) 

383 

384 async def open_async(self, path, mode="rb", size=None, **kwargs): 

385 session = await self.set_session() 

386 if size is None: 

387 try: 

388 size = (await self._info(path, **kwargs))["size"] 

389 except FileNotFoundError: 

390 pass 

391 return AsyncStreamFile( 

392 self, 

393 path, 

394 loop=self.loop, 

395 session=session, 

396 size=size, 

397 **kwargs, 

398 ) 

399 

400 def ukey(self, url): 

401 """Unique identifier; assume HTTP files are static, unchanging""" 

402 return tokenize(url, self.kwargs, self.protocol) 

403 

404 async def _info(self, url, **kwargs): 

405 """Get info of URL 

406 

407 Tries to access location via HEAD, and then GET methods, but does 

408 not fetch the data. 

409 

410 It is possible that the server does not supply any size information, in 

411 which case size will be given as None (and certain operations on the 

412 corresponding file will not work). 

413 """ 

414 info = {} 

415 session = await self.set_session() 

416 

417 for policy in ["head", "get"]: 

418 try: 

419 info.update( 

420 await _file_info( 

421 self.encode_url(url), 

422 size_policy=policy, 

423 session=session, 

424 **self.kwargs, 

425 **kwargs, 

426 ) 

427 ) 

428 if info.get("size") is not None: 

429 break 

430 except Exception as exc: 

431 if policy == "get": 

432 # If get failed, then raise a FileNotFoundError 

433 raise FileNotFoundError(url) from exc 

434 logger.debug(str(exc)) 

435 

436 return {"name": url, "size": None, **info, "type": "file"} 

437 

438 async def _glob(self, path, maxdepth=None, **kwargs): 

439 """ 

440 Find files by glob-matching. 

441 

442 This implementation is idntical to the one in AbstractFileSystem, 

443 but "?" is not considered as a character for globbing, because it is 

444 so common in URLs, often identifying the "query" part. 

445 """ 

446 if maxdepth is not None and maxdepth < 1: 

447 raise ValueError("maxdepth must be at least 1") 

448 import re 

449 

450 ends_with_slash = path.endswith("/") # _strip_protocol strips trailing slash 

451 path = self._strip_protocol(path) 

452 append_slash_to_dirname = ends_with_slash or path.endswith("/**") 

453 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

454 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

455 

456 min_idx = min(idx_star, idx_brace) 

457 

458 detail = kwargs.pop("detail", False) 

459 

460 if not has_magic(path): 

461 if await self._exists(path, **kwargs): 

462 if not detail: 

463 return [path] 

464 else: 

465 return {path: await self._info(path, **kwargs)} 

466 else: 

467 if not detail: 

468 return [] # glob of non-existent returns empty 

469 else: 

470 return {} 

471 elif "/" in path[:min_idx]: 

472 min_idx = path[:min_idx].rindex("/") 

473 root = path[: min_idx + 1] 

474 depth = path[min_idx + 1 :].count("/") + 1 

475 else: 

476 root = "" 

477 depth = path[min_idx + 1 :].count("/") + 1 

478 

479 if "**" in path: 

480 if maxdepth is not None: 

481 idx_double_stars = path.find("**") 

482 depth_double_stars = path[idx_double_stars:].count("/") + 1 

483 depth = depth - depth_double_stars + maxdepth 

484 else: 

485 depth = None 

486 

487 allpaths = await self._find( 

488 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

489 ) 

490 

491 pattern = glob_translate(path + ("/" if ends_with_slash else "")) 

492 pattern = re.compile(pattern) 

493 

494 out = { 

495 p: info 

496 for p, info in sorted(allpaths.items()) 

497 if pattern.match( 

498 ( 

499 p + "/" 

500 if append_slash_to_dirname and info["type"] == "directory" 

501 else p 

502 ) 

503 ) 

504 } 

505 

506 if detail: 

507 return out 

508 else: 

509 return list(out) 

510 

511 async def _isdir(self, path): 

512 # override, since all URLs are (also) files 

513 try: 

514 return bool(await self._ls(path)) 

515 except (FileNotFoundError, ValueError): 

516 return False 

517 

518 

519class HTTPFile(AbstractBufferedFile): 

520 """ 

521 A file-like object pointing to a remove HTTP(S) resource 

522 

523 Supports only reading, with read-ahead of a predermined block-size. 

524 

525 In the case that the server does not supply the filesize, only reading of 

526 the complete file in one go is supported. 

527 

528 Parameters 

529 ---------- 

530 url: str 

531 Full URL of the remote resource, including the protocol 

532 session: requests.Session or None 

533 All calls will be made within this session, to avoid restarting 

534 connections where the server allows this 

535 block_size: int or None 

536 The amount of read-ahead to do, in bytes. Default is 5MB, or the value 

537 configured for the FileSystem creating this file 

538 size: None or int 

539 If given, this is the size of the file in bytes, and we don't attempt 

540 to call the server to find the value. 

541 kwargs: all other key-values are passed to requests calls. 

542 """ 

543 

544 def __init__( 

545 self, 

546 fs, 

547 url, 

548 session=None, 

549 block_size=None, 

550 mode="rb", 

551 cache_type="bytes", 

552 cache_options=None, 

553 size=None, 

554 loop=None, 

555 asynchronous=False, 

556 **kwargs, 

557 ): 

558 if mode != "rb": 

559 raise NotImplementedError("File mode not supported") 

560 self.asynchronous = asynchronous 

561 self.url = url 

562 self.session = session 

563 self.details = {"name": url, "size": size, "type": "file"} 

564 super().__init__( 

565 fs=fs, 

566 path=url, 

567 mode=mode, 

568 block_size=block_size, 

569 cache_type=cache_type, 

570 cache_options=cache_options, 

571 **kwargs, 

572 ) 

573 self.loop = loop 

574 

575 def read(self, length=-1): 

576 """Read bytes from file 

577 

578 Parameters 

579 ---------- 

580 length: int 

581 Read up to this many bytes. If negative, read all content to end of 

582 file. If the server has not supplied the filesize, attempting to 

583 read only part of the data will raise a ValueError. 

584 """ 

585 if ( 

586 (length < 0 and self.loc == 0) # explicit read all 

587 # but not when the size is known and fits into a block anyways 

588 and not (self.size is not None and self.size <= self.blocksize) 

589 ): 

590 self._fetch_all() 

591 if self.size is None: 

592 if length < 0: 

593 self._fetch_all() 

594 else: 

595 length = min(self.size - self.loc, length) 

596 return super().read(length) 

597 

598 async def async_fetch_all(self): 

599 """Read whole file in one shot, without caching 

600 

601 This is only called when position is still at zero, 

602 and read() is called without a byte-count. 

603 """ 

604 logger.debug(f"Fetch all for {self}") 

605 if not isinstance(self.cache, AllBytes): 

606 r = await self.session.get(self.fs.encode_url(self.url), **self.kwargs) 

607 async with r: 

608 r.raise_for_status() 

609 out = await r.read() 

610 self.cache = AllBytes( 

611 size=len(out), fetcher=None, blocksize=None, data=out 

612 ) 

613 self.size = len(out) 

614 

615 _fetch_all = sync_wrapper(async_fetch_all) 

616 

617 def _parse_content_range(self, headers): 

618 """Parse the Content-Range header""" 

619 s = headers.get("Content-Range", "") 

620 m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s) 

621 if not m: 

622 return None, None, None 

623 

624 if m[1] == "*": 

625 start = end = None 

626 else: 

627 start, end = [int(x) for x in m[1].split("-")] 

628 total = None if m[2] == "*" else int(m[2]) 

629 return start, end, total 

630 

631 async def async_fetch_range(self, start, end): 

632 """Download a block of data 

633 

634 The expectation is that the server returns only the requested bytes, 

635 with HTTP code 206. If this is not the case, we first check the headers, 

636 and then stream the output - if the data size is bigger than we 

637 requested, an exception is raised. 

638 """ 

639 logger.debug(f"Fetch range for {self}: {start}-{end}") 

640 kwargs = self.kwargs.copy() 

641 headers = kwargs.pop("headers", {}).copy() 

642 headers["Range"] = f"bytes={start}-{end - 1}" 

643 logger.debug(f"{self.url} : {headers['Range']}") 

644 r = await self.session.get( 

645 self.fs.encode_url(self.url), headers=headers, **kwargs 

646 ) 

647 async with r: 

648 if r.status == 416: 

649 # range request outside file 

650 return b"" 

651 r.raise_for_status() 

652 

653 # If the server has handled the range request, it should reply 

654 # with status 206 (partial content). But we'll guess that a suitable 

655 # Content-Range header or a Content-Length no more than the 

656 # requested range also mean we have got the desired range. 

657 response_is_range = ( 

658 r.status == 206 

659 or self._parse_content_range(r.headers)[0] == start 

660 or int(r.headers.get("Content-Length", end + 1)) <= end - start 

661 ) 

662 

663 if response_is_range: 

664 # partial content, as expected 

665 out = await r.read() 

666 elif start > 0: 

667 raise ValueError( 

668 "The HTTP server doesn't appear to support range requests. " 

669 "Only reading this file from the beginning is supported. " 

670 "Open with block_size=0 for a streaming file interface." 

671 ) 

672 else: 

673 # Response is not a range, but we want the start of the file, 

674 # so we can read the required amount anyway. 

675 cl = 0 

676 out = [] 

677 while True: 

678 chunk = await r.content.read(2**20) 

679 # data size unknown, let's read until we have enough 

680 if chunk: 

681 out.append(chunk) 

682 cl += len(chunk) 

683 if cl > end - start: 

684 break 

685 else: 

686 break 

687 out = b"".join(out)[: end - start] 

688 return out 

689 

690 _fetch_range = sync_wrapper(async_fetch_range) 

691 

692 def __reduce__(self): 

693 return ( 

694 reopen, 

695 ( 

696 self.fs, 

697 self.url, 

698 self.mode, 

699 self.blocksize, 

700 self.cache.name if self.cache else "none", 

701 self.size, 

702 ), 

703 ) 

704 

705 

706def reopen(fs, url, mode, blocksize, cache_type, size=None): 

707 return fs.open( 

708 url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size 

709 ) 

710 

711 

712magic_check = re.compile("([*[])") 

713 

714 

715def has_magic(s): 

716 match = magic_check.search(s) 

717 return match is not None 

718 

719 

720class HTTPStreamFile(AbstractBufferedFile): 

721 def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs): 

722 self.asynchronous = kwargs.pop("asynchronous", False) 

723 self.url = url 

724 self.loop = loop 

725 self.session = session 

726 if mode != "rb": 

727 raise ValueError 

728 self.details = {"name": url, "size": None} 

729 super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs) 

730 

731 async def cor(): 

732 r = await self.session.get(self.fs.encode_url(url), **kwargs).__aenter__() 

733 self.fs._raise_not_found_for_status(r, url) 

734 return r 

735 

736 self.r = sync(self.loop, cor) 

737 

738 def seek(self, loc, whence=0): 

739 if loc == 0 and whence == 1: 

740 return 

741 if loc == self.loc and whence == 0: 

742 return 

743 raise ValueError("Cannot seek streaming HTTP file") 

744 

745 async def _read(self, num=-1): 

746 out = await self.r.content.read(num) 

747 self.loc += len(out) 

748 return out 

749 

750 read = sync_wrapper(_read) 

751 

752 async def _close(self): 

753 self.r.close() 

754 

755 def close(self): 

756 asyncio.run_coroutine_threadsafe(self._close(), self.loop) 

757 super().close() 

758 

759 def __reduce__(self): 

760 return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name) 

761 

762 

763class AsyncStreamFile(AbstractAsyncStreamedFile): 

764 def __init__( 

765 self, fs, url, mode="rb", loop=None, session=None, size=None, **kwargs 

766 ): 

767 self.url = url 

768 self.session = session 

769 self.r = None 

770 if mode != "rb": 

771 raise ValueError 

772 self.details = {"name": url, "size": None} 

773 self.kwargs = kwargs 

774 super().__init__(fs=fs, path=url, mode=mode, cache_type="none") 

775 self.size = size 

776 

777 async def read(self, num=-1): 

778 if self.r is None: 

779 r = await self.session.get( 

780 self.fs.encode_url(self.url), **self.kwargs 

781 ).__aenter__() 

782 self.fs._raise_not_found_for_status(r, self.url) 

783 self.r = r 

784 out = await self.r.content.read(num) 

785 self.loc += len(out) 

786 return out 

787 

788 async def close(self): 

789 if self.r is not None: 

790 self.r.close() 

791 self.r = None 

792 await super().close() 

793 

794 

795async def get_range(session, url, start, end, file=None, **kwargs): 

796 # explicit get a range when we know it must be safe 

797 kwargs = kwargs.copy() 

798 headers = kwargs.pop("headers", {}).copy() 

799 headers["Range"] = f"bytes={start}-{end - 1}" 

800 r = await session.get(url, headers=headers, **kwargs) 

801 r.raise_for_status() 

802 async with r: 

803 out = await r.read() 

804 if file: 

805 with open(file, "r+b") as f: 

806 f.seek(start) 

807 f.write(out) 

808 else: 

809 return out 

810 

811 

812async def _file_info(url, session, size_policy="head", **kwargs): 

813 """Call HEAD on the server to get details about the file (size/checksum etc.) 

814 

815 Default operation is to explicitly allow redirects and use encoding 

816 'identity' (no compression) to get the true size of the target. 

817 """ 

818 logger.debug("Retrieve file size for %s", url) 

819 kwargs = kwargs.copy() 

820 ar = kwargs.pop("allow_redirects", True) 

821 head = kwargs.get("headers", {}).copy() 

822 head["Accept-Encoding"] = "identity" 

823 kwargs["headers"] = head 

824 

825 info = {} 

826 if size_policy == "head": 

827 r = await session.head(url, allow_redirects=ar, **kwargs) 

828 elif size_policy == "get": 

829 r = await session.get(url, allow_redirects=ar, **kwargs) 

830 else: 

831 raise TypeError(f'size_policy must be "head" or "get", got {size_policy}') 

832 async with r: 

833 r.raise_for_status() 

834 

835 # TODO: 

836 # recognise lack of 'Accept-Ranges', 

837 # or 'Accept-Ranges': 'none' (not 'bytes') 

838 # to mean streaming only, no random access => return None 

839 if "Content-Length" in r.headers: 

840 # Some servers may choose to ignore Accept-Encoding and return 

841 # compressed content, in which case the returned size is unreliable. 

842 if "Content-Encoding" not in r.headers or r.headers["Content-Encoding"] in [ 

843 "identity", 

844 "", 

845 ]: 

846 info["size"] = int(r.headers["Content-Length"]) 

847 elif "Content-Range" in r.headers: 

848 info["size"] = int(r.headers["Content-Range"].split("/")[1]) 

849 

850 for checksum_field in ["ETag", "Content-MD5", "Digest"]: 

851 if r.headers.get(checksum_field): 

852 info[checksum_field] = r.headers[checksum_field] 

853 

854 return info 

855 

856 

857async def _file_size(url, session=None, *args, **kwargs): 

858 if session is None: 

859 session = await get_client() 

860 info = await _file_info(url, session=session, *args, **kwargs) 

861 return info.get("size") 

862 

863 

864file_size = sync_wrapper(_file_size)