Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/implementations/http.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

461 statements  

1import asyncio 

2import io 

3import logging 

4import re 

5import weakref 

6from copy import copy 

7from urllib.parse import urlparse 

8 

9import aiohttp 

10import yarl 

11 

12from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper 

13from fsspec.callbacks import DEFAULT_CALLBACK 

14from fsspec.exceptions import FSTimeoutError 

15from fsspec.spec import AbstractBufferedFile 

16from fsspec.utils import ( 

17 DEFAULT_BLOCK_SIZE, 

18 glob_translate, 

19 isfilelike, 

20 nullcontext, 

21 tokenize, 

22) 

23 

24from ..caching import AllBytes 

25 

26# https://stackoverflow.com/a/15926317/3821154 

27ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""") 

28ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""") 

29logger = logging.getLogger("fsspec.http") 

30 

31 

32async def get_client(**kwargs): 

33 return aiohttp.ClientSession(**kwargs) 

34 

35 

36class HTTPFileSystem(AsyncFileSystem): 

37 """ 

38 Simple File-System for fetching data via HTTP(S) 

39 

40 ``ls()`` is implemented by loading the parent page and doing a regex 

41 match on the result. If simple_link=True, anything of the form 

42 "http(s)://server.com/stuff?thing=other"; otherwise only links within 

43 HTML href tags will be used. 

44 """ 

45 

46 protocol = ("http", "https") 

47 sep = "/" 

48 

49 def __init__( 

50 self, 

51 simple_links=True, 

52 block_size=None, 

53 same_scheme=True, 

54 size_policy=None, 

55 cache_type="bytes", 

56 cache_options=None, 

57 asynchronous=False, 

58 loop=None, 

59 client_kwargs=None, 

60 get_client=get_client, 

61 encoded=False, 

62 **storage_options, 

63 ): 

64 """ 

65 NB: if this is called async, you must await set_client 

66 

67 Parameters 

68 ---------- 

69 block_size: int 

70 Blocks to read bytes; if 0, will default to raw requests file-like 

71 objects instead of HTTPFile instances 

72 simple_links: bool 

73 If True, will consider both HTML <a> tags and anything that looks 

74 like a URL; if False, will consider only the former. 

75 same_scheme: True 

76 When doing ls/glob, if this is True, only consider paths that have 

77 http/https matching the input URLs. 

78 size_policy: this argument is deprecated 

79 client_kwargs: dict 

80 Passed to aiohttp.ClientSession, see 

81 https://docs.aiohttp.org/en/stable/client_reference.html 

82 For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}`` 

83 get_client: Callable[..., aiohttp.ClientSession] 

84 A callable, which takes keyword arguments and constructs 

85 an aiohttp.ClientSession. Its state will be managed by 

86 the HTTPFileSystem class. 

87 storage_options: key-value 

88 Any other parameters passed on to requests 

89 cache_type, cache_options: defaults used in open() 

90 """ 

91 super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) 

92 self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE 

93 self.simple_links = simple_links 

94 self.same_schema = same_scheme 

95 self.cache_type = cache_type 

96 self.cache_options = cache_options 

97 self.client_kwargs = client_kwargs or {} 

98 self.get_client = get_client 

99 self.encoded = encoded 

100 self.kwargs = storage_options 

101 self._session = None 

102 

103 # Clean caching-related parameters from `storage_options` 

104 # before propagating them as `request_options` through `self.kwargs`. 

105 # TODO: Maybe rename `self.kwargs` to `self.request_options` to make 

106 # it clearer. 

107 request_options = copy(storage_options) 

108 self.use_listings_cache = request_options.pop("use_listings_cache", False) 

109 request_options.pop("listings_expiry_time", None) 

110 request_options.pop("max_paths", None) 

111 request_options.pop("skip_instance_cache", None) 

112 self.kwargs = request_options 

113 

114 @property 

115 def fsid(self): 

116 return "http" 

117 

118 def encode_url(self, url): 

119 return yarl.URL(url, encoded=self.encoded) 

120 

121 @staticmethod 

122 def close_session(loop, session): 

123 if loop is not None and loop.is_running(): 

124 try: 

125 sync(loop, session.close, timeout=0.1) 

126 return 

127 except (TimeoutError, FSTimeoutError, NotImplementedError): 

128 pass 

129 connector = getattr(session, "_connector", None) 

130 if connector is not None: 

131 # close after loop is dead 

132 connector._close() 

133 

134 async def set_session(self): 

135 if self._session is None: 

136 self._session = await self.get_client(loop=self.loop, **self.client_kwargs) 

137 if not self.asynchronous: 

138 weakref.finalize(self, self.close_session, self.loop, self._session) 

139 return self._session 

140 

141 @classmethod 

142 def _strip_protocol(cls, path): 

143 """For HTTP, we always want to keep the full URL""" 

144 return path 

145 

146 @classmethod 

147 def _parent(cls, path): 

148 # override, since _strip_protocol is different for URLs 

149 par = super()._parent(path) 

150 if len(par) > 7: # "http://..." 

151 return par 

152 return "" 

153 

154 async def _ls_real(self, url, detail=True, **kwargs): 

155 # ignoring URL-encoded arguments 

156 kw = self.kwargs.copy() 

157 kw.update(kwargs) 

158 logger.debug(url) 

159 session = await self.set_session() 

160 async with session.get(self.encode_url(url), **self.kwargs) as r: 

161 self._raise_not_found_for_status(r, url) 

162 

163 if "Content-Type" in r.headers: 

164 mimetype = r.headers["Content-Type"].partition(";")[0] 

165 else: 

166 mimetype = None 

167 

168 if mimetype in ("text/html", None): 

169 try: 

170 text = await r.text(errors="ignore") 

171 if self.simple_links: 

172 links = ex2.findall(text) + [u[2] for u in ex.findall(text)] 

173 else: 

174 links = [u[2] for u in ex.findall(text)] 

175 except UnicodeDecodeError: 

176 links = [] # binary, not HTML 

177 else: 

178 links = [] 

179 

180 out = set() 

181 parts = urlparse(url) 

182 for l in links: 

183 if isinstance(l, tuple): 

184 l = l[1] 

185 if l.startswith("/") and len(l) > 1: 

186 # absolute URL on this server 

187 l = f"{parts.scheme}://{parts.netloc}{l}" 

188 if l.startswith("http"): 

189 if self.same_schema and l.startswith(url.rstrip("/") + "/"): 

190 out.add(l) 

191 elif l.replace("https", "http").startswith( 

192 url.replace("https", "http").rstrip("/") + "/" 

193 ): 

194 # allowed to cross http <-> https 

195 out.add(l) 

196 else: 

197 if l not in ["..", "../"]: 

198 # Ignore FTP-like "parent" 

199 out.add("/".join([url.rstrip("/"), l.lstrip("/")])) 

200 if not out and url.endswith("/"): 

201 out = await self._ls_real(url.rstrip("/"), detail=False) 

202 if detail: 

203 return [ 

204 { 

205 "name": u, 

206 "size": None, 

207 "type": "directory" if u.endswith("/") else "file", 

208 } 

209 for u in out 

210 ] 

211 else: 

212 return sorted(out) 

213 

214 async def _ls(self, url, detail=True, **kwargs): 

215 if self.use_listings_cache and url in self.dircache: 

216 out = self.dircache[url] 

217 else: 

218 out = await self._ls_real(url, detail=detail, **kwargs) 

219 self.dircache[url] = out 

220 return out 

221 

222 ls = sync_wrapper(_ls) 

223 

224 def _raise_not_found_for_status(self, response, url): 

225 """ 

226 Raises FileNotFoundError for 404s, otherwise uses raise_for_status. 

227 """ 

228 if response.status == 404: 

229 raise FileNotFoundError(url) 

230 response.raise_for_status() 

231 

232 async def _cat_file(self, url, start=None, end=None, **kwargs): 

233 kw = self.kwargs.copy() 

234 kw.update(kwargs) 

235 logger.debug(url) 

236 

237 if start is not None or end is not None: 

238 if start == end: 

239 return b"" 

240 headers = kw.pop("headers", {}).copy() 

241 

242 headers["Range"] = await self._process_limits(url, start, end) 

243 kw["headers"] = headers 

244 session = await self.set_session() 

245 async with session.get(self.encode_url(url), **kw) as r: 

246 out = await r.read() 

247 self._raise_not_found_for_status(r, url) 

248 return out 

249 

250 async def _get_file( 

251 self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs 

252 ): 

253 kw = self.kwargs.copy() 

254 kw.update(kwargs) 

255 logger.debug(rpath) 

256 session = await self.set_session() 

257 async with session.get(self.encode_url(rpath), **kw) as r: 

258 try: 

259 size = int(r.headers["content-length"]) 

260 except (ValueError, KeyError): 

261 size = None 

262 

263 callback.set_size(size) 

264 self._raise_not_found_for_status(r, rpath) 

265 if isfilelike(lpath): 

266 outfile = lpath 

267 else: 

268 outfile = open(lpath, "wb") # noqa: ASYNC230 

269 

270 try: 

271 chunk = True 

272 while chunk: 

273 chunk = await r.content.read(chunk_size) 

274 outfile.write(chunk) 

275 callback.relative_update(len(chunk)) 

276 finally: 

277 if not isfilelike(lpath): 

278 outfile.close() 

279 

280 async def _put_file( 

281 self, 

282 lpath, 

283 rpath, 

284 chunk_size=5 * 2**20, 

285 callback=DEFAULT_CALLBACK, 

286 method="post", 

287 mode="overwrite", 

288 **kwargs, 

289 ): 

290 if mode != "overwrite": 

291 raise NotImplementedError("Exclusive write") 

292 

293 async def gen_chunks(): 

294 # Support passing arbitrary file-like objects 

295 # and use them instead of streams. 

296 if isinstance(lpath, io.IOBase): 

297 context = nullcontext(lpath) 

298 use_seek = False # might not support seeking 

299 else: 

300 context = open(lpath, "rb") # noqa: ASYNC230 

301 use_seek = True 

302 

303 with context as f: 

304 if use_seek: 

305 callback.set_size(f.seek(0, 2)) 

306 f.seek(0) 

307 else: 

308 callback.set_size(getattr(f, "size", None)) 

309 

310 chunk = f.read(chunk_size) 

311 while chunk: 

312 yield chunk 

313 callback.relative_update(len(chunk)) 

314 chunk = f.read(chunk_size) 

315 

316 kw = self.kwargs.copy() 

317 kw.update(kwargs) 

318 session = await self.set_session() 

319 

320 method = method.lower() 

321 if method not in ("post", "put"): 

322 raise ValueError( 

323 f"method has to be either 'post' or 'put', not: {method!r}" 

324 ) 

325 

326 meth = getattr(session, method) 

327 async with meth(self.encode_url(rpath), data=gen_chunks(), **kw) as resp: 

328 self._raise_not_found_for_status(resp, rpath) 

329 

330 async def _exists(self, path, **kwargs): 

331 kw = self.kwargs.copy() 

332 kw.update(kwargs) 

333 try: 

334 logger.debug(path) 

335 session = await self.set_session() 

336 r = await session.get(self.encode_url(path), **kw) 

337 async with r: 

338 return r.status < 400 

339 except aiohttp.ClientError: 

340 return False 

341 

342 async def _isfile(self, path, **kwargs): 

343 return await self._exists(path, **kwargs) 

344 

345 def _open( 

346 self, 

347 path, 

348 mode="rb", 

349 block_size=None, 

350 autocommit=None, # XXX: This differs from the base class. 

351 cache_type=None, 

352 cache_options=None, 

353 size=None, 

354 **kwargs, 

355 ): 

356 """Make a file-like object 

357 

358 Parameters 

359 ---------- 

360 path: str 

361 Full URL with protocol 

362 mode: string 

363 must be "rb" 

364 block_size: int or None 

365 Bytes to download in one request; use instance value if None. If 

366 zero, will return a streaming Requests file-like instance. 

367 kwargs: key-value 

368 Any other parameters, passed to requests calls 

369 """ 

370 if mode != "rb": 

371 raise NotImplementedError 

372 block_size = block_size if block_size is not None else self.block_size 

373 kw = self.kwargs.copy() 

374 kw["asynchronous"] = self.asynchronous 

375 kw.update(kwargs) 

376 info = {} 

377 size = size or info.update(self.info(path, **kwargs)) or info["size"] 

378 session = sync(self.loop, self.set_session) 

379 if block_size and size and info.get("partial", True): 

380 return HTTPFile( 

381 self, 

382 path, 

383 session=session, 

384 block_size=block_size, 

385 mode=mode, 

386 size=size, 

387 cache_type=cache_type or self.cache_type, 

388 cache_options=cache_options or self.cache_options, 

389 loop=self.loop, 

390 **kw, 

391 ) 

392 else: 

393 return HTTPStreamFile( 

394 self, 

395 path, 

396 mode=mode, 

397 loop=self.loop, 

398 session=session, 

399 **kw, 

400 ) 

401 

402 async def open_async(self, path, mode="rb", size=None, **kwargs): 

403 session = await self.set_session() 

404 if size is None: 

405 try: 

406 size = (await self._info(path, **kwargs))["size"] 

407 except FileNotFoundError: 

408 pass 

409 return AsyncStreamFile( 

410 self, 

411 path, 

412 loop=self.loop, 

413 session=session, 

414 size=size, 

415 **kwargs, 

416 ) 

417 

418 def ukey(self, url): 

419 """Unique identifier; assume HTTP files are static, unchanging""" 

420 return tokenize(url, self.kwargs, self.protocol) 

421 

422 async def _info(self, url, **kwargs): 

423 """Get info of URL 

424 

425 Tries to access location via HEAD, and then GET methods, but does 

426 not fetch the data. 

427 

428 It is possible that the server does not supply any size information, in 

429 which case size will be given as None (and certain operations on the 

430 corresponding file will not work). 

431 """ 

432 info = {} 

433 session = await self.set_session() 

434 

435 for policy in ["head", "get"]: 

436 try: 

437 info.update( 

438 await _file_info( 

439 self.encode_url(url), 

440 size_policy=policy, 

441 session=session, 

442 **self.kwargs, 

443 **kwargs, 

444 ) 

445 ) 

446 if info.get("size") is not None: 

447 break 

448 except Exception as exc: 

449 if policy == "get": 

450 # If get failed, then raise a FileNotFoundError 

451 raise FileNotFoundError(url) from exc 

452 logger.debug("", exc_info=exc) 

453 

454 return {"name": url, "size": None, **info, "type": "file"} 

455 

456 async def _glob(self, path, maxdepth=None, **kwargs): 

457 """ 

458 Find files by glob-matching. 

459 

460 This implementation is idntical to the one in AbstractFileSystem, 

461 but "?" is not considered as a character for globbing, because it is 

462 so common in URLs, often identifying the "query" part. 

463 """ 

464 if maxdepth is not None and maxdepth < 1: 

465 raise ValueError("maxdepth must be at least 1") 

466 import re 

467 

468 ends_with_slash = path.endswith("/") # _strip_protocol strips trailing slash 

469 path = self._strip_protocol(path) 

470 append_slash_to_dirname = ends_with_slash or path.endswith(("/**", "/*")) 

471 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

472 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

473 

474 min_idx = min(idx_star, idx_brace) 

475 

476 detail = kwargs.pop("detail", False) 

477 

478 if not has_magic(path): 

479 if await self._exists(path, **kwargs): 

480 if not detail: 

481 return [path] 

482 else: 

483 return {path: await self._info(path, **kwargs)} 

484 else: 

485 if not detail: 

486 return [] # glob of non-existent returns empty 

487 else: 

488 return {} 

489 elif "/" in path[:min_idx]: 

490 min_idx = path[:min_idx].rindex("/") 

491 root = path[: min_idx + 1] 

492 depth = path[min_idx + 1 :].count("/") + 1 

493 else: 

494 root = "" 

495 depth = path[min_idx + 1 :].count("/") + 1 

496 

497 if "**" in path: 

498 if maxdepth is not None: 

499 idx_double_stars = path.find("**") 

500 depth_double_stars = path[idx_double_stars:].count("/") + 1 

501 depth = depth - depth_double_stars + maxdepth 

502 else: 

503 depth = None 

504 

505 allpaths = await self._find( 

506 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

507 ) 

508 

509 pattern = glob_translate(path + ("/" if ends_with_slash else "")) 

510 pattern = re.compile(pattern) 

511 

512 out = { 

513 ( 

514 p.rstrip("/") 

515 if not append_slash_to_dirname 

516 and info["type"] == "directory" 

517 and p.endswith("/") 

518 else p 

519 ): info 

520 for p, info in sorted(allpaths.items()) 

521 if pattern.match(p.rstrip("/")) 

522 } 

523 

524 if detail: 

525 return out 

526 else: 

527 return list(out) 

528 

529 async def _isdir(self, path): 

530 # override, since all URLs are (also) files 

531 try: 

532 return bool(await self._ls(path)) 

533 except (FileNotFoundError, ValueError): 

534 return False 

535 

536 async def _pipe_file(self, path, value, mode="overwrite", **kwargs): 

537 """ 

538 Write bytes to a remote file over HTTP. 

539 

540 Parameters 

541 ---------- 

542 path : str 

543 Target URL where the data should be written 

544 value : bytes 

545 Data to be written 

546 mode : str 

547 How to write to the file - 'overwrite' or 'append' 

548 **kwargs : dict 

549 Additional parameters to pass to the HTTP request 

550 """ 

551 url = self._strip_protocol(path) 

552 headers = kwargs.pop("headers", {}) 

553 headers["Content-Length"] = str(len(value)) 

554 

555 session = await self.set_session() 

556 

557 async with session.put(url, data=value, headers=headers, **kwargs) as r: 

558 r.raise_for_status() 

559 

560 

561class HTTPFile(AbstractBufferedFile): 

562 """ 

563 A file-like object pointing to a remote HTTP(S) resource 

564 

565 Supports only reading, with read-ahead of a predetermined block-size. 

566 

567 In the case that the server does not supply the filesize, only reading of 

568 the complete file in one go is supported. 

569 

570 Parameters 

571 ---------- 

572 url: str 

573 Full URL of the remote resource, including the protocol 

574 session: aiohttp.ClientSession or None 

575 All calls will be made within this session, to avoid restarting 

576 connections where the server allows this 

577 block_size: int or None 

578 The amount of read-ahead to do, in bytes. Default is 5MB, or the value 

579 configured for the FileSystem creating this file 

580 size: None or int 

581 If given, this is the size of the file in bytes, and we don't attempt 

582 to call the server to find the value. 

583 kwargs: all other key-values are passed to requests calls. 

584 """ 

585 

586 def __init__( 

587 self, 

588 fs, 

589 url, 

590 session=None, 

591 block_size=None, 

592 mode="rb", 

593 cache_type="bytes", 

594 cache_options=None, 

595 size=None, 

596 loop=None, 

597 asynchronous=False, 

598 **kwargs, 

599 ): 

600 if mode != "rb": 

601 raise NotImplementedError("File mode not supported") 

602 self.asynchronous = asynchronous 

603 self.loop = loop 

604 self.url = url 

605 self.session = session 

606 self.details = {"name": url, "size": size, "type": "file"} 

607 super().__init__( 

608 fs=fs, 

609 path=url, 

610 mode=mode, 

611 block_size=block_size, 

612 cache_type=cache_type, 

613 cache_options=cache_options, 

614 **kwargs, 

615 ) 

616 

617 def read(self, length=-1): 

618 """Read bytes from file 

619 

620 Parameters 

621 ---------- 

622 length: int 

623 Read up to this many bytes. If negative, read all content to end of 

624 file. If the server has not supplied the filesize, attempting to 

625 read only part of the data will raise a ValueError. 

626 """ 

627 if ( 

628 (length < 0 and self.loc == 0) # explicit read all 

629 # but not when the size is known and fits into a block anyways 

630 and not (self.size is not None and self.size <= self.blocksize) 

631 ): 

632 self._fetch_all() 

633 if self.size is None: 

634 if length < 0: 

635 self._fetch_all() 

636 else: 

637 length = min(self.size - self.loc, length) 

638 return super().read(length) 

639 

640 async def async_fetch_all(self): 

641 """Read whole file in one shot, without caching 

642 

643 This is only called when position is still at zero, 

644 and read() is called without a byte-count. 

645 """ 

646 logger.debug(f"Fetch all for {self}") 

647 if not isinstance(self.cache, AllBytes): 

648 r = await self.session.get(self.fs.encode_url(self.url), **self.kwargs) 

649 async with r: 

650 r.raise_for_status() 

651 out = await r.read() 

652 self.cache = AllBytes( 

653 size=len(out), fetcher=None, blocksize=None, data=out 

654 ) 

655 self.size = len(out) 

656 

657 _fetch_all = sync_wrapper(async_fetch_all) 

658 

659 def _parse_content_range(self, headers): 

660 """Parse the Content-Range header""" 

661 s = headers.get("Content-Range", "") 

662 m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s) 

663 if not m: 

664 return None, None, None 

665 

666 if m[1] == "*": 

667 start = end = None 

668 else: 

669 start, end = [int(x) for x in m[1].split("-")] 

670 total = None if m[2] == "*" else int(m[2]) 

671 return start, end, total 

672 

673 async def async_fetch_range(self, start, end): 

674 """Download a block of data 

675 

676 The expectation is that the server returns only the requested bytes, 

677 with HTTP code 206. If this is not the case, we first check the headers, 

678 and then stream the output - if the data size is bigger than we 

679 requested, an exception is raised. 

680 """ 

681 logger.debug(f"Fetch range for {self}: {start}-{end}") 

682 kwargs = self.kwargs.copy() 

683 headers = kwargs.pop("headers", {}).copy() 

684 headers["Range"] = f"bytes={start}-{end - 1}" 

685 logger.debug(f"{self.url} : {headers['Range']}") 

686 r = await self.session.get( 

687 self.fs.encode_url(self.url), headers=headers, **kwargs 

688 ) 

689 async with r: 

690 if r.status == 416: 

691 # range request outside file 

692 return b"" 

693 r.raise_for_status() 

694 

695 # If the server has handled the range request, it should reply 

696 # with status 206 (partial content). But we'll guess that a suitable 

697 # Content-Range header or a Content-Length no more than the 

698 # requested range also mean we have got the desired range. 

699 response_is_range = ( 

700 r.status == 206 

701 or self._parse_content_range(r.headers)[0] == start 

702 or int(r.headers.get("Content-Length", end + 1)) <= end - start 

703 ) 

704 

705 if response_is_range: 

706 # partial content, as expected 

707 out = await r.read() 

708 elif start > 0: 

709 raise ValueError( 

710 "The HTTP server doesn't appear to support range requests. " 

711 "Only reading this file from the beginning is supported. " 

712 "Open with block_size=0 for a streaming file interface." 

713 ) 

714 else: 

715 # Response is not a range, but we want the start of the file, 

716 # so we can read the required amount anyway. 

717 cl = 0 

718 out = [] 

719 while True: 

720 chunk = await r.content.read(2**20) 

721 # data size unknown, let's read until we have enough 

722 if chunk: 

723 out.append(chunk) 

724 cl += len(chunk) 

725 if cl > end - start: 

726 break 

727 else: 

728 break 

729 out = b"".join(out)[: end - start] 

730 return out 

731 

732 _fetch_range = sync_wrapper(async_fetch_range) 

733 

734 

735magic_check = re.compile("([*[])") 

736 

737 

738def has_magic(s): 

739 match = magic_check.search(s) 

740 return match is not None 

741 

742 

743class HTTPStreamFile(AbstractBufferedFile): 

744 def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs): 

745 self.asynchronous = kwargs.pop("asynchronous", False) 

746 self.url = url 

747 self.loop = loop 

748 self.session = session 

749 if mode != "rb": 

750 raise ValueError 

751 self.details = {"name": url, "size": None} 

752 super().__init__(fs=fs, path=url, mode=mode, cache_type="none", **kwargs) 

753 

754 async def cor(): 

755 r = await self.session.get(self.fs.encode_url(url), **kwargs).__aenter__() 

756 self.fs._raise_not_found_for_status(r, url) 

757 return r 

758 

759 self.r = sync(self.loop, cor) 

760 self.loop = fs.loop 

761 

762 def seek(self, loc, whence=0): 

763 if loc == 0 and whence == 1: 

764 return 

765 if loc == self.loc and whence == 0: 

766 return 

767 raise ValueError("Cannot seek streaming HTTP file") 

768 

769 async def _read(self, num=-1): 

770 out = await self.r.content.read(num) 

771 self.loc += len(out) 

772 return out 

773 

774 read = sync_wrapper(_read) 

775 

776 async def _close(self): 

777 self.r.close() 

778 

779 def close(self): 

780 asyncio.run_coroutine_threadsafe(self._close(), self.loop) 

781 super().close() 

782 

783 

784class AsyncStreamFile(AbstractAsyncStreamedFile): 

785 def __init__( 

786 self, fs, url, mode="rb", loop=None, session=None, size=None, **kwargs 

787 ): 

788 self.url = url 

789 self.session = session 

790 self.r = None 

791 if mode != "rb": 

792 raise ValueError 

793 self.details = {"name": url, "size": None} 

794 self.kwargs = kwargs 

795 super().__init__(fs=fs, path=url, mode=mode, cache_type="none") 

796 self.size = size 

797 

798 async def read(self, num=-1): 

799 if self.r is None: 

800 r = await self.session.get( 

801 self.fs.encode_url(self.url), **self.kwargs 

802 ).__aenter__() 

803 self.fs._raise_not_found_for_status(r, self.url) 

804 self.r = r 

805 out = await self.r.content.read(num) 

806 self.loc += len(out) 

807 return out 

808 

809 async def close(self): 

810 if self.r is not None: 

811 self.r.close() 

812 self.r = None 

813 await super().close() 

814 

815 

816async def get_range(session, url, start, end, file=None, **kwargs): 

817 # explicit get a range when we know it must be safe 

818 kwargs = kwargs.copy() 

819 headers = kwargs.pop("headers", {}).copy() 

820 headers["Range"] = f"bytes={start}-{end - 1}" 

821 r = await session.get(url, headers=headers, **kwargs) 

822 r.raise_for_status() 

823 async with r: 

824 out = await r.read() 

825 if file: 

826 with open(file, "r+b") as f: # noqa: ASYNC230 

827 f.seek(start) 

828 f.write(out) 

829 else: 

830 return out 

831 

832 

833async def _file_info(url, session, size_policy="head", **kwargs): 

834 """Call HEAD on the server to get details about the file (size/checksum etc.) 

835 

836 Default operation is to explicitly allow redirects and use encoding 

837 'identity' (no compression) to get the true size of the target. 

838 """ 

839 logger.debug("Retrieve file size for %s", url) 

840 kwargs = kwargs.copy() 

841 ar = kwargs.pop("allow_redirects", True) 

842 head = kwargs.get("headers", {}).copy() 

843 head["Accept-Encoding"] = "identity" 

844 kwargs["headers"] = head 

845 

846 info = {} 

847 if size_policy == "head": 

848 r = await session.head(url, allow_redirects=ar, **kwargs) 

849 elif size_policy == "get": 

850 r = await session.get(url, allow_redirects=ar, **kwargs) 

851 else: 

852 raise TypeError(f'size_policy must be "head" or "get", got {size_policy}') 

853 async with r: 

854 r.raise_for_status() 

855 

856 if "Content-Length" in r.headers: 

857 # Some servers may choose to ignore Accept-Encoding and return 

858 # compressed content, in which case the returned size is unreliable. 

859 if "Content-Encoding" not in r.headers or r.headers["Content-Encoding"] in [ 

860 "identity", 

861 "", 

862 ]: 

863 info["size"] = int(r.headers["Content-Length"]) 

864 elif "Content-Range" in r.headers: 

865 info["size"] = int(r.headers["Content-Range"].split("/")[1]) 

866 

867 if "Content-Type" in r.headers: 

868 info["mimetype"] = r.headers["Content-Type"].partition(";")[0] 

869 

870 if r.headers.get("Accept-Ranges") == "none": 

871 # Some servers may explicitly discourage partial content requests, but 

872 # the lack of "Accept-Ranges" does not always indicate they would fail 

873 info["partial"] = False 

874 

875 info["url"] = str(r.url) 

876 

877 for checksum_field in ["ETag", "Content-MD5", "Digest", "Last-Modified"]: 

878 if r.headers.get(checksum_field): 

879 info[checksum_field] = r.headers[checksum_field] 

880 

881 return info 

882 

883 

884async def _file_size(url, session=None, *args, **kwargs): 

885 if session is None: 

886 session = await get_client() 

887 info = await _file_info(url, session=session, *args, **kwargs) 

888 return info.get("size") 

889 

890 

891file_size = sync_wrapper(_file_size)