Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/implementations/http.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

467 statements  

1import asyncio 

2import io 

3import logging 

4import re 

5import weakref 

6from copy import copy 

7from urllib.parse import urlparse 

8 

9import aiohttp 

10import yarl 

11 

12from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper 

13from fsspec.callbacks import DEFAULT_CALLBACK 

14from fsspec.exceptions import FSTimeoutError 

15from fsspec.spec import AbstractBufferedFile 

16from fsspec.utils import ( 

17 DEFAULT_BLOCK_SIZE, 

18 glob_translate, 

19 isfilelike, 

20 nullcontext, 

21 tokenize, 

22) 

23 

24from ..caching import AllBytes 

25 

26# https://stackoverflow.com/a/15926317/3821154 

27ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""") 

28ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""") 

29logger = logging.getLogger("fsspec.http") 

30 

31 

32async def get_client(**kwargs): 

33 return aiohttp.ClientSession(**kwargs) 

34 

35 

36class HTTPFileSystem(AsyncFileSystem): 

37 """ 

38 Simple File-System for fetching data via HTTP(S) 

39 

40 ``ls()`` is implemented by loading the parent page and doing a regex 

41 match on the result. If simple_link=True, anything of the form 

42 "http(s)://server.com/stuff?thing=other"; otherwise only links within 

43 HTML href tags will be used. 

44 """ 

45 

46 protocol = ("http", "https") 

47 sep = "/" 

48 

49 def __init__( 

50 self, 

51 simple_links=True, 

52 block_size=None, 

53 same_scheme=True, 

54 size_policy=None, 

55 cache_type="bytes", 

56 cache_options=None, 

57 asynchronous=False, 

58 loop=None, 

59 client_kwargs=None, 

60 get_client=get_client, 

61 encoded=False, 

62 **storage_options, 

63 ): 

64 """ 

65 NB: if this is called async, you must await set_client 

66 

67 Parameters 

68 ---------- 

69 block_size: int 

70 Blocks to read bytes; if 0, will default to raw requests file-like 

71 objects instead of HTTPFile instances 

72 simple_links: bool 

73 If True, will consider both HTML <a> tags and anything that looks 

74 like a URL; if False, will consider only the former. 

75 same_scheme: True 

76 When doing ls/glob, if this is True, only consider paths that have 

77 http/https matching the input URLs. 

78 size_policy: this argument is deprecated 

79 client_kwargs: dict 

80 Passed to aiohttp.ClientSession, see 

81 https://docs.aiohttp.org/en/stable/client_reference.html 

82 For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}`` 

83 get_client: Callable[..., aiohttp.ClientSession] 

84 A callable, which takes keyword arguments and constructs 

85 an aiohttp.ClientSession. Its state will be managed by 

86 the HTTPFileSystem class. 

87 storage_options: key-value 

88 Any other parameters passed on to requests 

89 cache_type, cache_options: defaults used in open() 

90 """ 

91 super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) 

92 self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE 

93 self.simple_links = simple_links 

94 self.same_schema = same_scheme 

95 self.cache_type = cache_type 

96 self.cache_options = cache_options 

97 self.client_kwargs = client_kwargs or {} 

98 self.get_client = get_client 

99 self.encoded = encoded 

100 self.kwargs = storage_options 

101 self._session = None 

102 

103 # Clean caching-related parameters from `storage_options` 

104 # before propagating them as `request_options` through `self.kwargs`. 

105 # TODO: Maybe rename `self.kwargs` to `self.request_options` to make 

106 # it clearer. 

107 request_options = copy(storage_options) 

108 self.use_listings_cache = request_options.pop("use_listings_cache", False) 

109 request_options.pop("listings_expiry_time", None) 

110 request_options.pop("max_paths", None) 

111 request_options.pop("skip_instance_cache", None) 

112 self.kwargs = request_options 

113 

114 @property 

115 def fsid(self): 

116 return "http" 

117 

118 def encode_url(self, url): 

119 return yarl.URL(url, encoded=self.encoded) 

120 

121 @staticmethod 

122 def close_session(loop, session): 

123 if loop is not None and loop.is_running(): 

124 try: 

125 sync(loop, session.close, timeout=0.1) 

126 return 

127 except (TimeoutError, FSTimeoutError, NotImplementedError): 

128 pass 

129 connector = getattr(session, "_connector", None) 

130 if connector is not None: 

131 # close after loop is dead 

132 connector._close() 

133 

134 async def set_session(self): 

135 if self._session is None: 

136 self._session = await self.get_client(loop=self.loop, **self.client_kwargs) 

137 if not self.asynchronous: 

138 weakref.finalize(self, self.close_session, self.loop, self._session) 

139 return self._session 

140 

141 @classmethod 

142 def _strip_protocol(cls, path): 

143 """For HTTP, we always want to keep the full URL""" 

144 return path 

145 

146 @classmethod 

147 def _parent(cls, path): 

148 # override, since _strip_protocol is different for URLs 

149 par = super()._parent(path) 

150 if len(par) > 7: # "http://..." 

151 return par 

152 return "" 

153 

154 async def _ls_real(self, url, detail=True, **kwargs): 

155 # ignoring URL-encoded arguments 

156 kw = self.kwargs.copy() 

157 kw.update(kwargs) 

158 logger.debug(url) 

159 session = await self.set_session() 

160 async with session.get(self.encode_url(url), **self.kwargs) as r: 

161 self._raise_not_found_for_status(r, url) 

162 

163 if "Content-Type" in r.headers: 

164 mimetype = r.headers["Content-Type"].partition(";")[0] 

165 else: 

166 mimetype = None 

167 

168 if mimetype in ("text/html", None): 

169 try: 

170 text = await r.text(errors="ignore") 

171 if self.simple_links: 

172 links = ex2.findall(text) + [u[2] for u in ex.findall(text)] 

173 else: 

174 links = [u[2] for u in ex.findall(text)] 

175 except UnicodeDecodeError: 

176 links = [] # binary, not HTML 

177 else: 

178 links = [] 

179 

180 out = set() 

181 parts = urlparse(url) 

182 for l in links: 

183 if isinstance(l, tuple): 

184 l = l[1] 

185 if l.startswith("/") and len(l) > 1: 

186 # absolute URL on this server 

187 l = f"{parts.scheme}://{parts.netloc}{l}" 

188 if l.startswith("http"): 

189 if self.same_schema and l.startswith(url.rstrip("/") + "/"): 

190 out.add(l) 

191 elif l.replace("https", "http").startswith( 

192 url.replace("https", "http").rstrip("/") + "/" 

193 ): 

194 # allowed to cross http <-> https 

195 out.add(l) 

196 else: 

197 if l not in ["..", "../"]: 

198 # Ignore FTP-like "parent" 

199 out.add("/".join([url.rstrip("/"), l.lstrip("/")])) 

200 if not out and url.endswith("/"): 

201 out = await self._ls_real(url.rstrip("/"), detail=False) 

202 if detail: 

203 return [ 

204 { 

205 "name": u, 

206 "size": None, 

207 "type": "directory" if u.endswith("/") else "file", 

208 } 

209 for u in out 

210 ] 

211 else: 

212 return sorted(out) 

213 

214 async def _ls(self, url, detail=True, **kwargs): 

215 if self.use_listings_cache and url in self.dircache: 

216 out = self.dircache[url] 

217 else: 

218 out = await self._ls_real(url, detail=detail, **kwargs) 

219 self.dircache[url] = out 

220 return out 

221 

222 ls = sync_wrapper(_ls) 

223 

224 def _raise_not_found_for_status(self, response, url): 

225 """ 

226 Raises FileNotFoundError for 404s, otherwise uses raise_for_status. 

227 """ 

228 if response.status == 404: 

229 raise FileNotFoundError(url) 

230 response.raise_for_status() 

231 

232 async def _cat_file(self, url, start=None, end=None, **kwargs): 

233 kw = self.kwargs.copy() 

234 kw.update(kwargs) 

235 logger.debug(url) 

236 

237 if start is not None or end is not None: 

238 if start == end: 

239 return b"" 

240 headers = kw.pop("headers", {}).copy() 

241 

242 headers["Range"] = await self._process_limits(url, start, end) 

243 kw["headers"] = headers 

244 session = await self.set_session() 

245 async with session.get(self.encode_url(url), **kw) as r: 

246 out = await r.read() 

247 self._raise_not_found_for_status(r, url) 

248 return out 

249 

250 async def _get_file( 

251 self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs 

252 ): 

253 kw = self.kwargs.copy() 

254 kw.update(kwargs) 

255 logger.debug(rpath) 

256 session = await self.set_session() 

257 async with session.get(self.encode_url(rpath), **kw) as r: 

258 try: 

259 size = int(r.headers["content-length"]) 

260 except (ValueError, KeyError): 

261 size = None 

262 

263 callback.set_size(size) 

264 self._raise_not_found_for_status(r, rpath) 

265 if isfilelike(lpath): 

266 outfile = lpath 

267 else: 

268 outfile = open(lpath, "wb") # noqa: ASYNC230 

269 

270 try: 

271 chunk = True 

272 while chunk: 

273 chunk = await r.content.read(chunk_size) 

274 outfile.write(chunk) 

275 callback.relative_update(len(chunk)) 

276 finally: 

277 if not isfilelike(lpath): 

278 outfile.close() 

279 

280 async def _put_file( 

281 self, 

282 lpath, 

283 rpath, 

284 chunk_size=5 * 2**20, 

285 callback=DEFAULT_CALLBACK, 

286 method="post", 

287 mode="overwrite", 

288 **kwargs, 

289 ): 

290 if mode != "overwrite": 

291 raise NotImplementedError("Exclusive write") 

292 

293 async def gen_chunks(): 

294 # Support passing arbitrary file-like objects 

295 # and use them instead of streams. 

296 if isinstance(lpath, io.IOBase): 

297 context = nullcontext(lpath) 

298 use_seek = False # might not support seeking 

299 else: 

300 context = open(lpath, "rb") # noqa: ASYNC230 

301 use_seek = True 

302 

303 with context as f: 

304 if use_seek: 

305 callback.set_size(f.seek(0, 2)) 

306 f.seek(0) 

307 else: 

308 callback.set_size(getattr(f, "size", None)) 

309 

310 chunk = f.read(chunk_size) 

311 while chunk: 

312 yield chunk 

313 callback.relative_update(len(chunk)) 

314 chunk = f.read(chunk_size) 

315 

316 kw = self.kwargs.copy() 

317 kw.update(kwargs) 

318 session = await self.set_session() 

319 

320 method = method.lower() 

321 if method not in ("post", "put"): 

322 raise ValueError( 

323 f"method has to be either 'post' or 'put', not: {method!r}" 

324 ) 

325 

326 meth = getattr(session, method) 

327 async with meth(self.encode_url(rpath), data=gen_chunks(), **kw) as resp: 

328 self._raise_not_found_for_status(resp, rpath) 

329 

330 async def _exists(self, path, strict=False, **kwargs): 

331 kw = self.kwargs.copy() 

332 kw.update(kwargs) 

333 try: 

334 logger.debug(path) 

335 session = await self.set_session() 

336 r = await session.get(self.encode_url(path), **kw) 

337 async with r: 

338 if strict: 

339 self._raise_not_found_for_status(r, path) 

340 return r.status < 400 

341 except FileNotFoundError: 

342 return False 

343 except aiohttp.ClientError: 

344 if strict: 

345 raise 

346 return False 

347 

348 async def _isfile(self, path, **kwargs): 

349 return await self._exists(path, **kwargs) 

350 

351 def _open( 

352 self, 

353 path, 

354 mode="rb", 

355 block_size=None, 

356 autocommit=None, # XXX: This differs from the base class. 

357 cache_type=None, 

358 cache_options=None, 

359 size=None, 

360 **kwargs, 

361 ): 

362 """Make a file-like object 

363 

364 Parameters 

365 ---------- 

366 path: str 

367 Full URL with protocol 

368 mode: string 

369 must be "rb" 

370 block_size: int or None 

371 Bytes to download in one request; use instance value if None. If 

372 zero, will return a streaming Requests file-like instance. 

373 kwargs: key-value 

374 Any other parameters, passed to requests calls 

375 """ 

376 if mode != "rb": 

377 raise NotImplementedError 

378 block_size = block_size if block_size is not None else self.block_size 

379 kw = self.kwargs.copy() 

380 kw["asynchronous"] = self.asynchronous 

381 kw.update(kwargs) 

382 info = {} 

383 size = size or info.update(self.info(path, **kwargs)) or info["size"] 

384 session = sync(self.loop, self.set_session) 

385 if block_size and size and info.get("partial", True): 

386 return HTTPFile( 

387 self, 

388 path, 

389 session=session, 

390 block_size=block_size, 

391 mode=mode, 

392 size=size, 

393 cache_type=cache_type or self.cache_type, 

394 cache_options=cache_options or self.cache_options, 

395 loop=self.loop, 

396 **kw, 

397 ) 

398 else: 

399 return HTTPStreamFile( 

400 self, 

401 path, 

402 mode=mode, 

403 loop=self.loop, 

404 session=session, 

405 **kw, 

406 ) 

407 

408 async def open_async(self, path, mode="rb", size=None, **kwargs): 

409 session = await self.set_session() 

410 if size is None: 

411 try: 

412 size = (await self._info(path, **kwargs))["size"] 

413 except FileNotFoundError: 

414 pass 

415 return AsyncStreamFile( 

416 self, 

417 path, 

418 loop=self.loop, 

419 session=session, 

420 size=size, 

421 **kwargs, 

422 ) 

423 

424 def ukey(self, url): 

425 """Unique identifier; assume HTTP files are static, unchanging""" 

426 return tokenize(url, self.kwargs, self.protocol) 

427 

428 async def _info(self, url, **kwargs): 

429 """Get info of URL 

430 

431 Tries to access location via HEAD, and then GET methods, but does 

432 not fetch the data. 

433 

434 It is possible that the server does not supply any size information, in 

435 which case size will be given as None (and certain operations on the 

436 corresponding file will not work). 

437 """ 

438 info = {} 

439 session = await self.set_session() 

440 

441 for policy in ["head", "get"]: 

442 try: 

443 info.update( 

444 await _file_info( 

445 self.encode_url(url), 

446 size_policy=policy, 

447 session=session, 

448 **self.kwargs, 

449 **kwargs, 

450 ) 

451 ) 

452 if info.get("size") is not None: 

453 break 

454 except Exception as exc: 

455 if policy == "get": 

456 # If get failed, then raise a FileNotFoundError 

457 raise FileNotFoundError(url) from exc 

458 logger.debug("", exc_info=exc) 

459 

460 return {"name": url, "size": None, **info, "type": "file"} 

461 

462 async def _glob(self, path, maxdepth=None, **kwargs): 

463 """ 

464 Find files by glob-matching. 

465 

466 This implementation is idntical to the one in AbstractFileSystem, 

467 but "?" is not considered as a character for globbing, because it is 

468 so common in URLs, often identifying the "query" part. 

469 """ 

470 if maxdepth is not None and maxdepth < 1: 

471 raise ValueError("maxdepth must be at least 1") 

472 import re 

473 

474 ends_with_slash = path.endswith("/") # _strip_protocol strips trailing slash 

475 path = self._strip_protocol(path) 

476 append_slash_to_dirname = ends_with_slash or path.endswith(("/**", "/*")) 

477 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

478 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

479 

480 min_idx = min(idx_star, idx_brace) 

481 

482 detail = kwargs.pop("detail", False) 

483 

484 if not has_magic(path): 

485 if await self._exists(path, **kwargs): 

486 if not detail: 

487 return [path] 

488 else: 

489 return {path: await self._info(path, **kwargs)} 

490 else: 

491 if not detail: 

492 return [] # glob of non-existent returns empty 

493 else: 

494 return {} 

495 elif "/" in path[:min_idx]: 

496 min_idx = path[:min_idx].rindex("/") 

497 root = path[: min_idx + 1] 

498 depth = path[min_idx + 1 :].count("/") + 1 

499 else: 

500 root = "" 

501 depth = path[min_idx + 1 :].count("/") + 1 

502 

503 if "**" in path: 

504 if maxdepth is not None: 

505 idx_double_stars = path.find("**") 

506 depth_double_stars = path[idx_double_stars:].count("/") + 1 

507 depth = depth - depth_double_stars + maxdepth 

508 else: 

509 depth = None 

510 

511 allpaths = await self._find( 

512 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

513 ) 

514 

515 pattern = glob_translate(path + ("/" if ends_with_slash else "")) 

516 pattern = re.compile(pattern) 

517 

518 out = { 

519 ( 

520 p.rstrip("/") 

521 if not append_slash_to_dirname 

522 and info["type"] == "directory" 

523 and p.endswith("/") 

524 else p 

525 ): info 

526 for p, info in sorted(allpaths.items()) 

527 if pattern.match(p.rstrip("/")) 

528 } 

529 

530 if detail: 

531 return out 

532 else: 

533 return list(out) 

534 

535 async def _isdir(self, path): 

536 # override, since all URLs are (also) files 

537 try: 

538 return bool(await self._ls(path)) 

539 except (FileNotFoundError, ValueError): 

540 return False 

541 

542 async def _pipe_file(self, path, value, mode="overwrite", **kwargs): 

543 """ 

544 Write bytes to a remote file over HTTP. 

545 

546 Parameters 

547 ---------- 

548 path : str 

549 Target URL where the data should be written 

550 value : bytes 

551 Data to be written 

552 mode : str 

553 How to write to the file - 'overwrite' or 'append' 

554 **kwargs : dict 

555 Additional parameters to pass to the HTTP request 

556 """ 

557 url = self._strip_protocol(path) 

558 headers = kwargs.pop("headers", {}) 

559 headers["Content-Length"] = str(len(value)) 

560 

561 session = await self.set_session() 

562 

563 async with session.put(url, data=value, headers=headers, **kwargs) as r: 

564 r.raise_for_status() 

565 

566 

567class HTTPFile(AbstractBufferedFile): 

568 """ 

569 A file-like object pointing to a remote HTTP(S) resource 

570 

571 Supports only reading, with read-ahead of a predetermined block-size. 

572 

573 In the case that the server does not supply the filesize, only reading of 

574 the complete file in one go is supported. 

575 

576 Parameters 

577 ---------- 

578 url: str 

579 Full URL of the remote resource, including the protocol 

580 session: aiohttp.ClientSession or None 

581 All calls will be made within this session, to avoid restarting 

582 connections where the server allows this 

583 block_size: int or None 

584 The amount of read-ahead to do, in bytes. Default is 5MB, or the value 

585 configured for the FileSystem creating this file 

586 size: None or int 

587 If given, this is the size of the file in bytes, and we don't attempt 

588 to call the server to find the value. 

589 kwargs: all other key-values are passed to requests calls. 

590 """ 

591 

592 def __init__( 

593 self, 

594 fs, 

595 url, 

596 session=None, 

597 block_size=None, 

598 mode="rb", 

599 cache_type="bytes", 

600 cache_options=None, 

601 size=None, 

602 loop=None, 

603 asynchronous=False, 

604 **kwargs, 

605 ): 

606 if mode != "rb": 

607 raise NotImplementedError("File mode not supported") 

608 self.asynchronous = asynchronous 

609 self.loop = loop 

610 self.url = url 

611 self.session = session 

612 self.details = {"name": url, "size": size, "type": "file"} 

613 super().__init__( 

614 fs=fs, 

615 path=url, 

616 mode=mode, 

617 block_size=block_size, 

618 cache_type=cache_type, 

619 cache_options=cache_options, 

620 **kwargs, 

621 ) 

622 

623 def read(self, length=-1): 

624 """Read bytes from file 

625 

626 Parameters 

627 ---------- 

628 length: int 

629 Read up to this many bytes. If negative, read all content to end of 

630 file. If the server has not supplied the filesize, attempting to 

631 read only part of the data will raise a ValueError. 

632 """ 

633 if ( 

634 (length < 0 and self.loc == 0) # explicit read all 

635 # but not when the size is known and fits into a block anyways 

636 and not (self.size is not None and self.size <= self.blocksize) 

637 ): 

638 self._fetch_all() 

639 if self.size is None: 

640 if length < 0: 

641 self._fetch_all() 

642 else: 

643 length = min(self.size - self.loc, length) 

644 return super().read(length) 

645 

646 async def async_fetch_all(self): 

647 """Read whole file in one shot, without caching 

648 

649 This is only called when position is still at zero, 

650 and read() is called without a byte-count. 

651 """ 

652 logger.debug(f"Fetch all for {self}") 

653 if not isinstance(self.cache, AllBytes): 

654 r = await self.session.get(self.fs.encode_url(self.url), **self.kwargs) 

655 async with r: 

656 r.raise_for_status() 

657 out = await r.read() 

658 self.cache = AllBytes( 

659 size=len(out), fetcher=None, blocksize=None, data=out 

660 ) 

661 self.size = len(out) 

662 

663 _fetch_all = sync_wrapper(async_fetch_all) 

664 

665 def _parse_content_range(self, headers): 

666 """Parse the Content-Range header""" 

667 s = headers.get("Content-Range", "") 

668 m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s) 

669 if not m: 

670 return None, None, None 

671 

672 if m[1] == "*": 

673 start = end = None 

674 else: 

675 start, end = [int(x) for x in m[1].split("-")] 

676 total = None if m[2] == "*" else int(m[2]) 

677 return start, end, total 

678 

679 async def async_fetch_range(self, start, end): 

680 """Download a block of data 

681 

682 The expectation is that the server returns only the requested bytes, 

683 with HTTP code 206. If this is not the case, we first check the headers, 

684 and then stream the output - if the data size is bigger than we 

685 requested, an exception is raised. 

686 """ 

687 logger.debug(f"Fetch range for {self}: {start}-{end}") 

688 kwargs = self.kwargs.copy() 

689 headers = kwargs.pop("headers", {}).copy() 

690 headers["Range"] = f"bytes={start}-{end - 1}" 

691 logger.debug(f"{self.url} : {headers['Range']}") 

692 r = await self.session.get( 

693 self.fs.encode_url(self.url), headers=headers, **kwargs 

694 ) 

695 async with r: 

696 if r.status == 416: 

697 # range request outside file 

698 return b"" 

699 r.raise_for_status() 

700 

701 # If the server has handled the range request, it should reply 

702 # with status 206 (partial content). But we'll guess that a suitable 

703 # Content-Range header or a Content-Length no more than the 

704 # requested range also mean we have got the desired range. 

705 response_is_range = ( 

706 r.status == 206 

707 or self._parse_content_range(r.headers)[0] == start 

708 or int(r.headers.get("Content-Length", end + 1)) <= end - start 

709 ) 

710 

711 if response_is_range: 

712 # partial content, as expected 

713 out = await r.read() 

714 elif start > 0: 

715 raise ValueError( 

716 "The HTTP server doesn't appear to support range requests. " 

717 "Only reading this file from the beginning is supported. " 

718 "Open with block_size=0 for a streaming file interface." 

719 ) 

720 else: 

721 # Response is not a range, but we want the start of the file, 

722 # so we can read the required amount anyway. 

723 cl = 0 

724 out = [] 

725 while True: 

726 chunk = await r.content.read(2**20) 

727 # data size unknown, let's read until we have enough 

728 if chunk: 

729 out.append(chunk) 

730 cl += len(chunk) 

731 if cl > end - start: 

732 break 

733 else: 

734 break 

735 out = b"".join(out)[: end - start] 

736 return out 

737 

738 _fetch_range = sync_wrapper(async_fetch_range) 

739 

740 

741magic_check = re.compile("([*[])") 

742 

743 

744def has_magic(s): 

745 match = magic_check.search(s) 

746 return match is not None 

747 

748 

749class HTTPStreamFile(AbstractBufferedFile): 

750 def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs): 

751 self.asynchronous = kwargs.pop("asynchronous", False) 

752 self.url = url 

753 self.loop = loop 

754 self.session = session 

755 if mode != "rb": 

756 raise ValueError 

757 self.details = {"name": url, "size": None} 

758 super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs) 

759 

760 async def cor(): 

761 r = await self.session.get(self.fs.encode_url(url), **kwargs).__aenter__() 

762 self.fs._raise_not_found_for_status(r, url) 

763 return r 

764 

765 self.r = sync(self.loop, cor) 

766 self.loop = fs.loop 

767 

768 def seek(self, loc, whence=0): 

769 if loc == 0 and whence == 1: 

770 return 

771 if loc == self.loc and whence == 0: 

772 return 

773 raise ValueError("Cannot seek streaming HTTP file") 

774 

775 async def _read(self, num=-1): 

776 out = await self.r.content.read(num) 

777 self.loc += len(out) 

778 return out 

779 

780 read = sync_wrapper(_read) 

781 

782 async def _close(self): 

783 self.r.close() 

784 

785 def close(self): 

786 asyncio.run_coroutine_threadsafe(self._close(), self.loop) 

787 super().close() 

788 

789 

790class AsyncStreamFile(AbstractAsyncStreamedFile): 

791 def __init__( 

792 self, fs, url, mode="rb", loop=None, session=None, size=None, **kwargs 

793 ): 

794 self.url = url 

795 self.session = session 

796 self.r = None 

797 if mode != "rb": 

798 raise ValueError 

799 self.details = {"name": url, "size": None} 

800 self.kwargs = kwargs 

801 super().__init__(fs=fs, path=url, mode=mode, cache_type="none") 

802 self.size = size 

803 

804 async def read(self, num=-1): 

805 if self.r is None: 

806 r = await self.session.get( 

807 self.fs.encode_url(self.url), **self.kwargs 

808 ).__aenter__() 

809 self.fs._raise_not_found_for_status(r, self.url) 

810 self.r = r 

811 out = await self.r.content.read(num) 

812 self.loc += len(out) 

813 return out 

814 

815 async def close(self): 

816 if self.r is not None: 

817 self.r.close() 

818 self.r = None 

819 await super().close() 

820 

821 

822async def get_range(session, url, start, end, file=None, **kwargs): 

823 # explicit get a range when we know it must be safe 

824 kwargs = kwargs.copy() 

825 headers = kwargs.pop("headers", {}).copy() 

826 headers["Range"] = f"bytes={start}-{end - 1}" 

827 r = await session.get(url, headers=headers, **kwargs) 

828 r.raise_for_status() 

829 async with r: 

830 out = await r.read() 

831 if file: 

832 with open(file, "r+b") as f: # noqa: ASYNC230 

833 f.seek(start) 

834 f.write(out) 

835 else: 

836 return out 

837 

838 

839async def _file_info(url, session, size_policy="head", **kwargs): 

840 """Call HEAD on the server to get details about the file (size/checksum etc.) 

841 

842 Default operation is to explicitly allow redirects and use encoding 

843 'identity' (no compression) to get the true size of the target. 

844 """ 

845 logger.debug("Retrieve file size for %s", url) 

846 kwargs = kwargs.copy() 

847 ar = kwargs.pop("allow_redirects", True) 

848 head = kwargs.get("headers", {}).copy() 

849 head["Accept-Encoding"] = "identity" 

850 kwargs["headers"] = head 

851 

852 info = {} 

853 if size_policy == "head": 

854 r = await session.head(url, allow_redirects=ar, **kwargs) 

855 elif size_policy == "get": 

856 r = await session.get(url, allow_redirects=ar, **kwargs) 

857 else: 

858 raise TypeError(f'size_policy must be "head" or "get", got {size_policy}') 

859 async with r: 

860 r.raise_for_status() 

861 

862 if "Content-Length" in r.headers: 

863 # Some servers may choose to ignore Accept-Encoding and return 

864 # compressed content, in which case the returned size is unreliable. 

865 if "Content-Encoding" not in r.headers or r.headers["Content-Encoding"] in [ 

866 "identity", 

867 "", 

868 ]: 

869 info["size"] = int(r.headers["Content-Length"]) 

870 elif "Content-Range" in r.headers: 

871 info["size"] = int(r.headers["Content-Range"].split("/")[1]) 

872 

873 if "Content-Type" in r.headers: 

874 info["mimetype"] = r.headers["Content-Type"].partition(";")[0] 

875 

876 if r.headers.get("Accept-Ranges") == "none": 

877 # Some servers may explicitly discourage partial content requests, but 

878 # the lack of "Accept-Ranges" does not always indicate they would fail 

879 info["partial"] = False 

880 

881 info["url"] = str(r.url) 

882 

883 for checksum_field in ["ETag", "Content-MD5", "Digest", "Last-Modified"]: 

884 if r.headers.get(checksum_field): 

885 info[checksum_field] = r.headers[checksum_field] 

886 

887 return info 

888 

889 

890async def _file_size(url, session=None, *args, **kwargs): 

891 if session is None: 

892 session = await get_client() 

893 info = await _file_info(url, session=session, *args, **kwargs) 

894 return info.get("size") 

895 

896 

897file_size = sync_wrapper(_file_size)