Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/implementations/http.py: 26%

439 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1from __future__ import absolute_import, division, print_function 

2 

3import asyncio 

4import io 

5import logging 

6import re 

7import weakref 

8from copy import copy 

9from urllib.parse import urlparse 

10 

11import aiohttp 

12import requests 

13import yarl 

14 

15from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper 

16from fsspec.callbacks import _DEFAULT_CALLBACK 

17from fsspec.exceptions import FSTimeoutError 

18from fsspec.spec import AbstractBufferedFile 

19from fsspec.utils import DEFAULT_BLOCK_SIZE, isfilelike, nullcontext, tokenize 

20 

21from ..caching import AllBytes 

22 

23# https://stackoverflow.com/a/15926317/3821154 

24ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""") 

25ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""") 

26logger = logging.getLogger("fsspec.http") 

27 

28 

29async def get_client(**kwargs): 

30 return aiohttp.ClientSession(**kwargs) 

31 

32 

33class HTTPFileSystem(AsyncFileSystem): 

34 """ 

35 Simple File-System for fetching data via HTTP(S) 

36 

37 ``ls()`` is implemented by loading the parent page and doing a regex 

38 match on the result. If simple_link=True, anything of the form 

39 "http(s)://server.com/stuff?thing=other"; otherwise only links within 

40 HTML href tags will be used. 

41 """ 

42 

43 sep = "/" 

44 

45 def __init__( 

46 self, 

47 simple_links=True, 

48 block_size=None, 

49 same_scheme=True, 

50 size_policy=None, 

51 cache_type="bytes", 

52 cache_options=None, 

53 asynchronous=False, 

54 loop=None, 

55 client_kwargs=None, 

56 get_client=get_client, 

57 encoded=False, 

58 **storage_options, 

59 ): 

60 """ 

61 NB: if this is called async, you must await set_client 

62 

63 Parameters 

64 ---------- 

65 block_size: int 

66 Blocks to read bytes; if 0, will default to raw requests file-like 

67 objects instead of HTTPFile instances 

68 simple_links: bool 

69 If True, will consider both HTML <a> tags and anything that looks 

70 like a URL; if False, will consider only the former. 

71 same_scheme: True 

72 When doing ls/glob, if this is True, only consider paths that have 

73 http/https matching the input URLs. 

74 size_policy: this argument is deprecated 

75 client_kwargs: dict 

76 Passed to aiohttp.ClientSession, see 

77 https://docs.aiohttp.org/en/stable/client_reference.html 

78 For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}`` 

79 get_client: Callable[..., aiohttp.ClientSession] 

80 A callable which takes keyword arguments and constructs 

81 an aiohttp.ClientSession. It's state will be managed by 

82 the HTTPFileSystem class. 

83 storage_options: key-value 

84 Any other parameters passed on to requests 

85 cache_type, cache_options: defaults used in open 

86 """ 

87 super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) 

88 self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE 

89 self.simple_links = simple_links 

90 self.same_schema = same_scheme 

91 self.cache_type = cache_type 

92 self.cache_options = cache_options 

93 self.client_kwargs = client_kwargs or {} 

94 self.get_client = get_client 

95 self.encoded = encoded 

96 self.kwargs = storage_options 

97 self._session = None 

98 

99 # Clean caching-related parameters from `storage_options` 

100 # before propagating them as `request_options` through `self.kwargs`. 

101 # TODO: Maybe rename `self.kwargs` to `self.request_options` to make 

102 # it clearer. 

103 request_options = copy(storage_options) 

104 self.use_listings_cache = request_options.pop("use_listings_cache", False) 

105 request_options.pop("listings_expiry_time", None) 

106 request_options.pop("max_paths", None) 

107 request_options.pop("skip_instance_cache", None) 

108 self.kwargs = request_options 

109 

110 @property 

111 def fsid(self): 

112 return "http" 

113 

114 def encode_url(self, url): 

115 return yarl.URL(url, encoded=self.encoded) 

116 

117 @staticmethod 

118 def close_session(loop, session): 

119 if loop is not None and loop.is_running(): 

120 try: 

121 sync(loop, session.close, timeout=0.1) 

122 return 

123 except (TimeoutError, FSTimeoutError): 

124 pass 

125 connector = getattr(session, "_connector", None) 

126 if connector is not None: 

127 # close after loop is dead 

128 connector._close() 

129 

130 async def set_session(self): 

131 if self._session is None: 

132 self._session = await self.get_client(loop=self.loop, **self.client_kwargs) 

133 if not self.asynchronous: 

134 weakref.finalize(self, self.close_session, self.loop, self._session) 

135 return self._session 

136 

137 @classmethod 

138 def _strip_protocol(cls, path): 

139 """For HTTP, we always want to keep the full URL""" 

140 return path 

141 

142 @classmethod 

143 def _parent(cls, path): 

144 # override, since _strip_protocol is different for URLs 

145 par = super()._parent(path) 

146 if len(par) > 7: # "http://..." 

147 return par 

148 return "" 

149 

150 async def _ls_real(self, url, detail=True, **kwargs): 

151 # ignoring URL-encoded arguments 

152 kw = self.kwargs.copy() 

153 kw.update(kwargs) 

154 logger.debug(url) 

155 session = await self.set_session() 

156 async with session.get(self.encode_url(url), **self.kwargs) as r: 

157 self._raise_not_found_for_status(r, url) 

158 text = await r.text() 

159 if self.simple_links: 

160 links = ex2.findall(text) + [u[2] for u in ex.findall(text)] 

161 else: 

162 links = [u[2] for u in ex.findall(text)] 

163 out = set() 

164 parts = urlparse(url) 

165 for l in links: 

166 if isinstance(l, tuple): 

167 l = l[1] 

168 if l.startswith("/") and len(l) > 1: 

169 # absolute URL on this server 

170 l = parts.scheme + "://" + parts.netloc + l 

171 if l.startswith("http"): 

172 if self.same_schema and l.startswith(url.rstrip("/") + "/"): 

173 out.add(l) 

174 elif l.replace("https", "http").startswith( 

175 url.replace("https", "http").rstrip("/") + "/" 

176 ): 

177 # allowed to cross http <-> https 

178 out.add(l) 

179 else: 

180 if l not in ["..", "../"]: 

181 # Ignore FTP-like "parent" 

182 out.add("/".join([url.rstrip("/"), l.lstrip("/")])) 

183 if not out and url.endswith("/"): 

184 out = await self._ls_real(url.rstrip("/"), detail=False) 

185 if detail: 

186 return [ 

187 { 

188 "name": u, 

189 "size": None, 

190 "type": "directory" if u.endswith("/") else "file", 

191 } 

192 for u in out 

193 ] 

194 else: 

195 return list(sorted(out)) 

196 

197 async def _ls(self, url, detail=True, **kwargs): 

198 

199 if self.use_listings_cache and url in self.dircache: 

200 out = self.dircache[url] 

201 else: 

202 out = await self._ls_real(url, detail=detail, **kwargs) 

203 self.dircache[url] = out 

204 return out 

205 

206 ls = sync_wrapper(_ls) 

207 

208 def _raise_not_found_for_status(self, response, url): 

209 """ 

210 Raises FileNotFoundError for 404s, otherwise uses raise_for_status. 

211 """ 

212 if response.status == 404: 

213 raise FileNotFoundError(url) 

214 response.raise_for_status() 

215 

216 async def _cat_file(self, url, start=None, end=None, **kwargs): 

217 kw = self.kwargs.copy() 

218 kw.update(kwargs) 

219 logger.debug(url) 

220 

221 if start is not None or end is not None: 

222 if start == end: 

223 return b"" 

224 headers = kw.pop("headers", {}).copy() 

225 

226 headers["Range"] = await self._process_limits(url, start, end) 

227 kw["headers"] = headers 

228 session = await self.set_session() 

229 async with session.get(self.encode_url(url), **kw) as r: 

230 out = await r.read() 

231 self._raise_not_found_for_status(r, url) 

232 return out 

233 

234 async def _get_file( 

235 self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs 

236 ): 

237 kw = self.kwargs.copy() 

238 kw.update(kwargs) 

239 logger.debug(rpath) 

240 session = await self.set_session() 

241 async with session.get(self.encode_url(rpath), **kw) as r: 

242 try: 

243 size = int(r.headers["content-length"]) 

244 except (ValueError, KeyError): 

245 size = None 

246 

247 callback.set_size(size) 

248 self._raise_not_found_for_status(r, rpath) 

249 if isfilelike(lpath): 

250 outfile = lpath 

251 else: 

252 outfile = open(lpath, "wb") 

253 

254 try: 

255 chunk = True 

256 while chunk: 

257 chunk = await r.content.read(chunk_size) 

258 outfile.write(chunk) 

259 callback.relative_update(len(chunk)) 

260 finally: 

261 if not isfilelike(lpath): 

262 outfile.close() 

263 

264 async def _put_file( 

265 self, 

266 lpath, 

267 rpath, 

268 chunk_size=5 * 2**20, 

269 callback=_DEFAULT_CALLBACK, 

270 method="post", 

271 **kwargs, 

272 ): 

273 async def gen_chunks(): 

274 # Support passing arbitrary file-like objects 

275 # and use them instead of streams. 

276 if isinstance(lpath, io.IOBase): 

277 context = nullcontext(lpath) 

278 use_seek = False # might not support seeking 

279 else: 

280 context = open(lpath, "rb") 

281 use_seek = True 

282 

283 with context as f: 

284 if use_seek: 

285 callback.set_size(f.seek(0, 2)) 

286 f.seek(0) 

287 else: 

288 callback.set_size(getattr(f, "size", None)) 

289 

290 chunk = f.read(chunk_size) 

291 while chunk: 

292 yield chunk 

293 callback.relative_update(len(chunk)) 

294 chunk = f.read(chunk_size) 

295 

296 kw = self.kwargs.copy() 

297 kw.update(kwargs) 

298 session = await self.set_session() 

299 

300 method = method.lower() 

301 if method not in ("post", "put"): 

302 raise ValueError( 

303 f"method has to be either 'post' or 'put', not: {method!r}" 

304 ) 

305 

306 meth = getattr(session, method) 

307 async with meth(rpath, data=gen_chunks(), **kw) as resp: 

308 self._raise_not_found_for_status(resp, rpath) 

309 

310 async def _exists(self, path, **kwargs): 

311 kw = self.kwargs.copy() 

312 kw.update(kwargs) 

313 try: 

314 logger.debug(path) 

315 session = await self.set_session() 

316 r = await session.get(self.encode_url(path), **kw) 

317 async with r: 

318 return r.status < 400 

319 except (requests.HTTPError, aiohttp.ClientError): 

320 return False 

321 

322 async def _isfile(self, path, **kwargs): 

323 return await self._exists(path, **kwargs) 

324 

325 def _open( 

326 self, 

327 path, 

328 mode="rb", 

329 block_size=None, 

330 autocommit=None, # XXX: This differs from the base class. 

331 cache_type=None, 

332 cache_options=None, 

333 size=None, 

334 **kwargs, 

335 ): 

336 """Make a file-like object 

337 

338 Parameters 

339 ---------- 

340 path: str 

341 Full URL with protocol 

342 mode: string 

343 must be "rb" 

344 block_size: int or None 

345 Bytes to download in one request; use instance value if None. If 

346 zero, will return a streaming Requests file-like instance. 

347 kwargs: key-value 

348 Any other parameters, passed to requests calls 

349 """ 

350 if mode != "rb": 

351 raise NotImplementedError 

352 block_size = block_size if block_size is not None else self.block_size 

353 kw = self.kwargs.copy() 

354 kw["asynchronous"] = self.asynchronous 

355 kw.update(kwargs) 

356 size = size or self.info(path, **kwargs)["size"] 

357 session = sync(self.loop, self.set_session) 

358 if block_size and size: 

359 return HTTPFile( 

360 self, 

361 path, 

362 session=session, 

363 block_size=block_size, 

364 mode=mode, 

365 size=size, 

366 cache_type=cache_type or self.cache_type, 

367 cache_options=cache_options or self.cache_options, 

368 loop=self.loop, 

369 **kw, 

370 ) 

371 else: 

372 return HTTPStreamFile( 

373 self, 

374 path, 

375 mode=mode, 

376 loop=self.loop, 

377 session=session, 

378 **kw, 

379 ) 

380 

381 async def open_async(self, path, mode="rb", size=None, **kwargs): 

382 session = await self.set_session() 

383 if size is None: 

384 try: 

385 size = (await self._info(path, **kwargs))["size"] 

386 except FileNotFoundError: 

387 pass 

388 return AsyncStreamFile( 

389 self, 

390 path, 

391 loop=self.loop, 

392 session=session, 

393 size=size, 

394 **kwargs, 

395 ) 

396 

397 def ukey(self, url): 

398 """Unique identifier; assume HTTP files are static, unchanging""" 

399 return tokenize(url, self.kwargs, self.protocol) 

400 

401 async def _info(self, url, **kwargs): 

402 """Get info of URL 

403 

404 Tries to access location via HEAD, and then GET methods, but does 

405 not fetch the data. 

406 

407 It is possible that the server does not supply any size information, in 

408 which case size will be given as None (and certain operations on the 

409 corresponding file will not work). 

410 """ 

411 info = {} 

412 session = await self.set_session() 

413 

414 for policy in ["head", "get"]: 

415 try: 

416 info.update( 

417 await _file_info( 

418 self.encode_url(url), 

419 size_policy=policy, 

420 session=session, 

421 **self.kwargs, 

422 **kwargs, 

423 ) 

424 ) 

425 if info.get("size") is not None: 

426 break 

427 except Exception as exc: 

428 if policy == "get": 

429 # If get failed, then raise a FileNotFoundError 

430 raise FileNotFoundError(url) from exc 

431 logger.debug(str(exc)) 

432 

433 return {"name": url, "size": None, **info, "type": "file"} 

434 

435 async def _glob(self, path, **kwargs): 

436 """ 

437 Find files by glob-matching. 

438 

439 This implementation is idntical to the one in AbstractFileSystem, 

440 but "?" is not considered as a character for globbing, because it is 

441 so common in URLs, often identifying the "query" part. 

442 """ 

443 import re 

444 

445 ends = path.endswith("/") 

446 path = self._strip_protocol(path) 

447 indstar = path.find("*") if path.find("*") >= 0 else len(path) 

448 indbrace = path.find("[") if path.find("[") >= 0 else len(path) 

449 

450 ind = min(indstar, indbrace) 

451 

452 detail = kwargs.pop("detail", False) 

453 

454 if not has_magic(path): 

455 root = path 

456 depth = 1 

457 if ends: 

458 path += "/*" 

459 elif await self._exists(path): 

460 if not detail: 

461 return [path] 

462 else: 

463 return {path: await self._info(path)} 

464 else: 

465 if not detail: 

466 return [] # glob of non-existent returns empty 

467 else: 

468 return {} 

469 elif "/" in path[:ind]: 

470 ind2 = path[:ind].rindex("/") 

471 root = path[: ind2 + 1] 

472 depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1 

473 else: 

474 root = "" 

475 depth = None if "**" in path else path[ind + 1 :].count("/") + 1 

476 

477 allpaths = await self._find( 

478 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

479 ) 

480 # Escape characters special to python regex, leaving our supported 

481 # special characters in place. 

482 # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html 

483 # for shell globbing details. 

484 pattern = ( 

485 "^" 

486 + ( 

487 path.replace("\\", r"\\") 

488 .replace(".", r"\.") 

489 .replace("+", r"\+") 

490 .replace("//", "/") 

491 .replace("(", r"\(") 

492 .replace(")", r"\)") 

493 .replace("|", r"\|") 

494 .replace("^", r"\^") 

495 .replace("$", r"\$") 

496 .replace("{", r"\{") 

497 .replace("}", r"\}") 

498 .rstrip("/") 

499 ) 

500 + "$" 

501 ) 

502 pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern) 

503 pattern = re.sub("[*]", "[^/]*", pattern) 

504 pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*")) 

505 out = { 

506 p: allpaths[p] 

507 for p in sorted(allpaths) 

508 if pattern.match(p.replace("//", "/").rstrip("/")) 

509 } 

510 if detail: 

511 return out 

512 else: 

513 return list(out) 

514 

515 async def _isdir(self, path): 

516 # override, since all URLs are (also) files 

517 try: 

518 return bool(await self._ls(path)) 

519 except (FileNotFoundError, ValueError): 

520 return False 

521 

522 

523class HTTPFile(AbstractBufferedFile): 

524 """ 

525 A file-like object pointing to a remove HTTP(S) resource 

526 

527 Supports only reading, with read-ahead of a predermined block-size. 

528 

529 In the case that the server does not supply the filesize, only reading of 

530 the complete file in one go is supported. 

531 

532 Parameters 

533 ---------- 

534 url: str 

535 Full URL of the remote resource, including the protocol 

536 session: requests.Session or None 

537 All calls will be made within this session, to avoid restarting 

538 connections where the server allows this 

539 block_size: int or None 

540 The amount of read-ahead to do, in bytes. Default is 5MB, or the value 

541 configured for the FileSystem creating this file 

542 size: None or int 

543 If given, this is the size of the file in bytes, and we don't attempt 

544 to call the server to find the value. 

545 kwargs: all other key-values are passed to requests calls. 

546 """ 

547 

548 def __init__( 

549 self, 

550 fs, 

551 url, 

552 session=None, 

553 block_size=None, 

554 mode="rb", 

555 cache_type="bytes", 

556 cache_options=None, 

557 size=None, 

558 loop=None, 

559 asynchronous=False, 

560 **kwargs, 

561 ): 

562 if mode != "rb": 

563 raise NotImplementedError("File mode not supported") 

564 self.asynchronous = asynchronous 

565 self.url = url 

566 self.session = session 

567 self.details = {"name": url, "size": size, "type": "file"} 

568 super().__init__( 

569 fs=fs, 

570 path=url, 

571 mode=mode, 

572 block_size=block_size, 

573 cache_type=cache_type, 

574 cache_options=cache_options, 

575 **kwargs, 

576 ) 

577 self.loop = loop 

578 

579 def read(self, length=-1): 

580 """Read bytes from file 

581 

582 Parameters 

583 ---------- 

584 length: int 

585 Read up to this many bytes. If negative, read all content to end of 

586 file. If the server has not supplied the filesize, attempting to 

587 read only part of the data will raise a ValueError. 

588 """ 

589 if ( 

590 (length < 0 and self.loc == 0) # explicit read all 

591 # but not when the size is known and fits into a block anyways 

592 and not (self.size is not None and self.size <= self.blocksize) 

593 ): 

594 self._fetch_all() 

595 if self.size is None: 

596 if length < 0: 

597 self._fetch_all() 

598 else: 

599 length = min(self.size - self.loc, length) 

600 return super().read(length) 

601 

602 async def async_fetch_all(self): 

603 """Read whole file in one shot, without caching 

604 

605 This is only called when position is still at zero, 

606 and read() is called without a byte-count. 

607 """ 

608 logger.debug(f"Fetch all for {self}") 

609 if not isinstance(self.cache, AllBytes): 

610 r = await self.session.get(self.fs.encode_url(self.url), **self.kwargs) 

611 async with r: 

612 r.raise_for_status() 

613 out = await r.read() 

614 self.cache = AllBytes( 

615 size=len(out), fetcher=None, blocksize=None, data=out 

616 ) 

617 self.size = len(out) 

618 

619 _fetch_all = sync_wrapper(async_fetch_all) 

620 

621 def _parse_content_range(self, headers): 

622 """Parse the Content-Range header""" 

623 s = headers.get("Content-Range", "") 

624 m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s) 

625 if not m: 

626 return None, None, None 

627 

628 if m[1] == "*": 

629 start = end = None 

630 else: 

631 start, end = [int(x) for x in m[1].split("-")] 

632 total = None if m[2] == "*" else int(m[2]) 

633 return start, end, total 

634 

635 async def async_fetch_range(self, start, end): 

636 """Download a block of data 

637 

638 The expectation is that the server returns only the requested bytes, 

639 with HTTP code 206. If this is not the case, we first check the headers, 

640 and then stream the output - if the data size is bigger than we 

641 requested, an exception is raised. 

642 """ 

643 logger.debug(f"Fetch range for {self}: {start}-{end}") 

644 kwargs = self.kwargs.copy() 

645 headers = kwargs.pop("headers", {}).copy() 

646 headers["Range"] = "bytes=%i-%i" % (start, end - 1) 

647 logger.debug(str(self.url) + " : " + headers["Range"]) 

648 r = await self.session.get( 

649 self.fs.encode_url(self.url), headers=headers, **kwargs 

650 ) 

651 async with r: 

652 if r.status == 416: 

653 # range request outside file 

654 return b"" 

655 r.raise_for_status() 

656 

657 # If the server has handled the range request, it should reply 

658 # with status 206 (partial content). But we'll guess that a suitable 

659 # Content-Range header or a Content-Length no more than the 

660 # requested range also mean we have got the desired range. 

661 response_is_range = ( 

662 r.status == 206 

663 or self._parse_content_range(r.headers)[0] == start 

664 or int(r.headers.get("Content-Length", end + 1)) <= end - start 

665 ) 

666 

667 if response_is_range: 

668 # partial content, as expected 

669 out = await r.read() 

670 elif start > 0: 

671 raise ValueError( 

672 "The HTTP server doesn't appear to support range requests. " 

673 "Only reading this file from the beginning is supported. " 

674 "Open with block_size=0 for a streaming file interface." 

675 ) 

676 else: 

677 # Response is not a range, but we want the start of the file, 

678 # so we can read the required amount anyway. 

679 cl = 0 

680 out = [] 

681 while True: 

682 chunk = await r.content.read(2**20) 

683 # data size unknown, let's read until we have enough 

684 if chunk: 

685 out.append(chunk) 

686 cl += len(chunk) 

687 if cl > end - start: 

688 break 

689 else: 

690 break 

691 out = b"".join(out)[: end - start] 

692 return out 

693 

694 _fetch_range = sync_wrapper(async_fetch_range) 

695 

696 def __reduce__(self): 

697 return ( 

698 reopen, 

699 ( 

700 self.fs, 

701 self.url, 

702 self.mode, 

703 self.blocksize, 

704 self.cache.name if self.cache else "none", 

705 self.size, 

706 ), 

707 ) 

708 

709 

710def reopen(fs, url, mode, blocksize, cache_type, size=None): 

711 return fs.open( 

712 url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size 

713 ) 

714 

715 

716magic_check = re.compile("([*[])") 

717 

718 

719def has_magic(s): 

720 match = magic_check.search(s) 

721 return match is not None 

722 

723 

724class HTTPStreamFile(AbstractBufferedFile): 

725 def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs): 

726 self.asynchronous = kwargs.pop("asynchronous", False) 

727 self.url = url 

728 self.loop = loop 

729 self.session = session 

730 if mode != "rb": 

731 raise ValueError 

732 self.details = {"name": url, "size": None} 

733 super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs) 

734 

735 async def cor(): 

736 r = await self.session.get(self.fs.encode_url(url), **kwargs).__aenter__() 

737 self.fs._raise_not_found_for_status(r, url) 

738 return r 

739 

740 self.r = sync(self.loop, cor) 

741 

742 def seek(self, loc, whence=0): 

743 if loc == 0 and whence == 1: 

744 return 

745 if loc == self.loc and whence == 0: 

746 return 

747 raise ValueError("Cannot seek streaming HTTP file") 

748 

749 async def _read(self, num=-1): 

750 out = await self.r.content.read(num) 

751 self.loc += len(out) 

752 return out 

753 

754 read = sync_wrapper(_read) 

755 

756 async def _close(self): 

757 self.r.close() 

758 

759 def close(self): 

760 asyncio.run_coroutine_threadsafe(self._close(), self.loop) 

761 super().close() 

762 

763 def __reduce__(self): 

764 return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name) 

765 

766 

767class AsyncStreamFile(AbstractAsyncStreamedFile): 

768 def __init__( 

769 self, fs, url, mode="rb", loop=None, session=None, size=None, **kwargs 

770 ): 

771 self.url = url 

772 self.session = session 

773 self.r = None 

774 if mode != "rb": 

775 raise ValueError 

776 self.details = {"name": url, "size": None} 

777 self.kwargs = kwargs 

778 super().__init__(fs=fs, path=url, mode=mode, cache_type="none") 

779 self.size = size 

780 

781 async def read(self, num=-1): 

782 if self.r is None: 

783 r = await self.session.get( 

784 self.fs.encode_url(self.url), **self.kwargs 

785 ).__aenter__() 

786 self.fs._raise_not_found_for_status(r, self.url) 

787 self.r = r 

788 out = await self.r.content.read(num) 

789 self.loc += len(out) 

790 return out 

791 

792 async def close(self): 

793 if self.r is not None: 

794 self.r.close() 

795 self.r = None 

796 await super().close() 

797 

798 

799async def get_range(session, url, start, end, file=None, **kwargs): 

800 # explicit get a range when we know it must be safe 

801 kwargs = kwargs.copy() 

802 headers = kwargs.pop("headers", {}).copy() 

803 headers["Range"] = "bytes=%i-%i" % (start, end - 1) 

804 r = await session.get(url, headers=headers, **kwargs) 

805 r.raise_for_status() 

806 async with r: 

807 out = await r.read() 

808 if file: 

809 with open(file, "rb+") as f: 

810 f.seek(start) 

811 f.write(out) 

812 else: 

813 return out 

814 

815 

816async def _file_info(url, session, size_policy="head", **kwargs): 

817 """Call HEAD on the server to get details about the file (size/checksum etc.) 

818 

819 Default operation is to explicitly allow redirects and use encoding 

820 'identity' (no compression) to get the true size of the target. 

821 """ 

822 logger.debug("Retrieve file size for %s" % url) 

823 kwargs = kwargs.copy() 

824 ar = kwargs.pop("allow_redirects", True) 

825 head = kwargs.get("headers", {}).copy() 

826 head["Accept-Encoding"] = "identity" 

827 kwargs["headers"] = head 

828 

829 info = {} 

830 if size_policy == "head": 

831 r = await session.head(url, allow_redirects=ar, **kwargs) 

832 elif size_policy == "get": 

833 r = await session.get(url, allow_redirects=ar, **kwargs) 

834 else: 

835 raise TypeError('size_policy must be "head" or "get", got %s' "" % size_policy) 

836 async with r: 

837 r.raise_for_status() 

838 

839 # TODO: 

840 # recognise lack of 'Accept-Ranges', 

841 # or 'Accept-Ranges': 'none' (not 'bytes') 

842 # to mean streaming only, no random access => return None 

843 if "Content-Length" in r.headers: 

844 info["size"] = int(r.headers["Content-Length"]) 

845 elif "Content-Range" in r.headers: 

846 info["size"] = int(r.headers["Content-Range"].split("/")[1]) 

847 

848 for checksum_field in ["ETag", "Content-MD5", "Digest"]: 

849 if r.headers.get(checksum_field): 

850 info[checksum_field] = r.headers[checksum_field] 

851 

852 return info 

853 

854 

855async def _file_size(url, session=None, *args, **kwargs): 

856 if session is None: 

857 session = await get_client() 

858 info = await _file_info(url, session=session, *args, **kwargs) 

859 return info.get("size") 

860 

861 

862file_size = sync_wrapper(_file_size)