Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/utils.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import contextlib
4import logging
5import math
6import os
7import re
8import sys
9import tempfile
10from collections.abc import Callable, Iterable, Iterator, Sequence
11from functools import partial
12from hashlib import md5
13from importlib.metadata import version
14from typing import IO, TYPE_CHECKING, Any, TypeVar
15from urllib.parse import urlsplit
17if TYPE_CHECKING:
18 import pathlib
19 from typing import TypeGuard
21 from fsspec.spec import AbstractFileSystem
24DEFAULT_BLOCK_SIZE = 5 * 2**20
26T = TypeVar("T")
29def infer_storage_options(
30 urlpath: str, inherit_storage_options: dict[str, Any] | None = None
31) -> dict[str, Any]:
32 """Infer storage options from URL path and merge it with existing storage
33 options.
35 Parameters
36 ----------
37 urlpath: str or unicode
38 Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
39 inherit_storage_options: dict (optional)
40 Its contents will get merged with the inferred information from the
41 given path
43 Returns
44 -------
45 Storage options dict.
47 Examples
48 --------
49 >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP
50 {"protocol": "file", "path", "/mnt/datasets/test.csv"}
51 >>> infer_storage_options(
52 ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
53 ... inherit_storage_options={'extra': 'value'},
54 ... ) # doctest: +SKIP
55 {"protocol": "hdfs", "username": "username", "password": "pwd",
56 "host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
57 "url_query": "q=1", "extra": "value"}
58 """
59 # Handle Windows paths including disk name in this special case
60 if (
61 re.match(r"^[a-zA-Z]:[\\/]", urlpath)
62 or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
63 ):
64 return {"protocol": "file", "path": urlpath}
66 parsed_path = urlsplit(urlpath)
67 protocol = parsed_path.scheme or "file"
68 if parsed_path.fragment:
69 path = "#".join([parsed_path.path, parsed_path.fragment])
70 else:
71 path = parsed_path.path
72 if protocol == "file":
73 # Special case parsing file protocol URL on Windows according to:
74 # https://msdn.microsoft.com/en-us/library/jj710207.aspx
75 windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
76 if windows_path:
77 drive, path = windows_path.groups()
78 path = f"{drive}:{path}"
80 if protocol in ["http", "https"]:
81 # for HTTP, we don't want to parse, as requests will anyway
82 return {"protocol": protocol, "path": urlpath}
84 options: dict[str, Any] = {"protocol": protocol, "path": path}
86 if parsed_path.netloc:
87 # Parse `hostname` from netloc manually because `parsed_path.hostname`
88 # lowercases the hostname which is not always desirable (e.g. in S3):
89 # https://github.com/dask/dask/issues/1417
90 options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
92 if protocol in ("s3", "s3a", "gcs", "gs"):
93 options["path"] = options["host"] + options["path"]
94 else:
95 options["host"] = options["host"]
96 if parsed_path.port:
97 options["port"] = parsed_path.port
98 if parsed_path.username:
99 options["username"] = parsed_path.username
100 if parsed_path.password:
101 options["password"] = parsed_path.password
103 if parsed_path.query:
104 options["url_query"] = parsed_path.query
105 if parsed_path.fragment:
106 options["url_fragment"] = parsed_path.fragment
108 if inherit_storage_options:
109 update_storage_options(options, inherit_storage_options)
111 return options
114def update_storage_options(
115 options: dict[str, Any], inherited: dict[str, Any] | None = None
116) -> None:
117 if not inherited:
118 inherited = {}
119 collisions = set(options) & set(inherited)
120 if collisions:
121 for collision in collisions:
122 if options.get(collision) != inherited.get(collision):
123 raise KeyError(
124 f"Collision between inferred and specified storage "
125 f"option:\n{collision}"
126 )
127 options.update(inherited)
130# Compression extensions registered via fsspec.compression.register_compression
131compressions: dict[str, str] = {}
134def infer_compression(filename: str) -> str | None:
135 """Infer compression, if available, from filename.
137 Infer a named compression type, if registered and available, from filename
138 extension. This includes builtin (gz, bz2, zip) compressions, as well as
139 optional compressions. See fsspec.compression.register_compression.
140 """
141 extension = os.path.splitext(filename)[-1].strip(".").lower()
142 if extension in compressions:
143 return compressions[extension]
144 return None
147def build_name_function(max_int: float) -> Callable[[int], str]:
148 """Returns a function that receives a single integer
149 and returns it as a string padded by enough zero characters
150 to align with maximum possible integer
152 >>> name_f = build_name_function(57)
154 >>> name_f(7)
155 '07'
156 >>> name_f(31)
157 '31'
158 >>> build_name_function(1000)(42)
159 '0042'
160 >>> build_name_function(999)(42)
161 '042'
162 >>> build_name_function(0)(0)
163 '0'
164 """
165 # handle corner cases max_int is 0 or exact power of 10
166 max_int += 1e-8
168 pad_length = int(math.ceil(math.log10(max_int)))
170 def name_function(i: int) -> str:
171 return str(i).zfill(pad_length)
173 return name_function
176def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
177 r"""Seek current file to file start, file end, or byte after delimiter seq.
179 Seeks file to next chunk delimiter, where chunks are defined on file start,
180 a delimiting sequence, and file end. Use file.tell() to see location afterwards.
181 Note that file start is a valid split, so must be at offset > 0 to seek for
182 delimiter.
184 Parameters
185 ----------
186 file: a file
187 delimiter: bytes
188 a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
189 blocksize: int
190 Number of bytes to read from the file at once.
193 Returns
194 -------
195 Returns True if a delimiter was found, False if at file start or end.
197 """
199 if file.tell() == 0:
200 # beginning-of-file, return without seek
201 return False
203 # Interface is for binary IO, with delimiter as bytes, but initialize last
204 # with result of file.read to preserve compatibility with text IO.
205 last: bytes | None = None
206 while True:
207 current = file.read(blocksize)
208 if not current:
209 # end-of-file without delimiter
210 return False
211 full = last + current if last else current
212 try:
213 if delimiter in full:
214 i = full.index(delimiter)
215 file.seek(file.tell() - (len(full) - i) + len(delimiter))
216 return True
217 elif len(current) < blocksize:
218 # end-of-file without delimiter
219 return False
220 except (OSError, ValueError):
221 pass
222 last = full[-len(delimiter) :]
225def read_block(
226 f: IO[bytes],
227 offset: int,
228 length: int | None,
229 delimiter: bytes | None = None,
230 split_before: bool = False,
231) -> bytes:
232 """Read a block of bytes from a file
234 Parameters
235 ----------
236 f: File
237 Open file
238 offset: int
239 Byte offset to start read
240 length: int
241 Number of bytes to read, read through end of file if None
242 delimiter: bytes (optional)
243 Ensure reading starts and stops at delimiter bytestring
244 split_before: bool (optional)
245 Start/stop read *before* delimiter bytestring.
248 If using the ``delimiter=`` keyword argument we ensure that the read
249 starts and stops at delimiter boundaries that follow the locations
250 ``offset`` and ``offset + length``. If ``offset`` is zero then we
251 start at zero, regardless of delimiter. The bytestring returned WILL
252 include the terminating delimiter string.
254 Examples
255 --------
257 >>> from io import BytesIO # doctest: +SKIP
258 >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
259 >>> read_block(f, 0, 13) # doctest: +SKIP
260 b'Alice, 100\\nBo'
262 >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
263 b'Alice, 100\\nBob, 200\\n'
265 >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
266 b'Bob, 200\\nCharlie, 300'
267 """
268 if delimiter:
269 f.seek(offset)
270 found_start_delim = seek_delimiter(f, delimiter, 2**16)
271 if length is None:
272 return f.read()
273 start = f.tell()
274 length -= start - offset
276 f.seek(start + length)
277 found_end_delim = seek_delimiter(f, delimiter, 2**16)
278 end = f.tell()
280 # Adjust split location to before delimiter if seek found the
281 # delimiter sequence, not start or end of file.
282 if found_start_delim and split_before:
283 start -= len(delimiter)
285 if found_end_delim and split_before:
286 end -= len(delimiter)
288 offset = start
289 length = end - start
291 f.seek(offset)
293 # TODO: allow length to be None and read to the end of the file?
294 assert length is not None
295 b = f.read(length)
296 return b
299def tokenize(*args: Any, **kwargs: Any) -> str:
300 """Deterministic token
302 (modified from dask.base)
304 >>> tokenize([1, 2, '3'])
305 '9d71491b50023b06fc76928e6eddb952'
307 >>> tokenize('Hello') == tokenize('Hello')
308 True
309 """
310 if kwargs:
311 args += (kwargs,)
312 try:
313 h = md5(str(args).encode())
314 except ValueError:
315 # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380
316 h = md5(str(args).encode(), usedforsecurity=False)
317 return h.hexdigest()
320def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
321 """Attempt to convert a path-like object to a string.
323 Parameters
324 ----------
325 filepath: object to be converted
327 Returns
328 -------
329 filepath_str: maybe a string version of the object
331 Notes
332 -----
333 Objects supporting the fspath protocol are coerced according to its
334 __fspath__ method.
336 For backwards compatibility with older Python version, pathlib.Path
337 objects are specially coerced.
339 Any other object is passed through unchanged, which includes bytes,
340 strings, buffers, or anything else that's not even path-like.
341 """
342 if isinstance(filepath, str):
343 return filepath
344 elif hasattr(filepath, "__fspath__"):
345 return filepath.__fspath__()
346 elif hasattr(filepath, "path"):
347 return filepath.path
348 else:
349 return filepath # type: ignore[return-value]
352def make_instance(
353 cls: Callable[..., T], args: Sequence[Any], kwargs: dict[str, Any]
354) -> T:
355 inst = cls(*args, **kwargs)
356 inst._determine_worker() # type: ignore[attr-defined]
357 return inst
360def common_prefix(paths: Iterable[str]) -> str:
361 """For a list of paths, find the shortest prefix common to all"""
362 parts = [p.split("/") for p in paths]
363 lmax = min(len(p) for p in parts)
364 end = 0
365 for i in range(lmax):
366 end = all(p[i] == parts[0][i] for p in parts)
367 if not end:
368 break
369 i += end
370 return "/".join(parts[0][:i])
373def other_paths(
374 paths: list[str],
375 path2: str | list[str],
376 exists: bool = False,
377 flatten: bool = False,
378) -> list[str]:
379 """In bulk file operations, construct a new file tree from a list of files
381 Parameters
382 ----------
383 paths: list of str
384 The input file tree
385 path2: str or list of str
386 Root to construct the new list in. If this is already a list of str, we just
387 assert it has the right number of elements.
388 exists: bool (optional)
389 For a str destination, it is already exists (and is a dir), files should
390 end up inside.
391 flatten: bool (optional)
392 Whether to flatten the input directory tree structure so that the output files
393 are in the same directory.
395 Returns
396 -------
397 list of str
398 """
400 if isinstance(path2, str):
401 path2 = path2.rstrip("/")
403 if flatten:
404 path2 = ["/".join((path2, p.split("/")[-1])) for p in paths]
405 else:
406 cp = common_prefix(paths)
407 if exists:
408 cp = cp.rsplit("/", 1)[0]
409 if not cp and all(not s.startswith("/") for s in paths):
410 path2 = ["/".join([path2, p]) for p in paths]
411 else:
412 path2 = [p.replace(cp, path2, 1) for p in paths]
413 else:
414 assert len(paths) == len(path2)
415 return path2
418def is_exception(obj: Any) -> bool:
419 return isinstance(obj, BaseException)
422def isfilelike(f: Any) -> TypeGuard[IO[bytes]]:
423 return all(hasattr(f, attr) for attr in ["read", "close", "tell"])
426def get_protocol(url: str) -> str:
427 url = stringify_path(url)
428 parts = re.split(r"(\:\:|\://)", url, maxsplit=1)
429 if len(parts) > 1:
430 return parts[0]
431 return "file"
434def get_file_extension(url: str) -> str:
435 url = stringify_path(url)
436 ext_parts = url.rsplit(".", 1)
437 if len(ext_parts) > 1:
438 return ext_parts[-1]
439 return ""
442def can_be_local(path: str) -> bool:
443 """Can the given URL be used with open_local?"""
444 from fsspec import get_filesystem_class
446 try:
447 return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
448 except (ValueError, ImportError):
449 # not in registry or import failed
450 return False
453def get_package_version_without_import(name: str) -> str | None:
454 """For given package name, try to find the version without importing it
456 Import and package.__version__ is still the backup here, so an import
457 *might* happen.
459 Returns either the version string, or None if the package
460 or the version was not readily found.
461 """
462 if name in sys.modules:
463 mod = sys.modules[name]
464 if hasattr(mod, "__version__"):
465 return mod.__version__
466 try:
467 return version(name)
468 except: # noqa: E722
469 pass
470 try:
471 import importlib
473 mod = importlib.import_module(name)
474 return mod.__version__
475 except (ImportError, AttributeError):
476 return None
479def setup_logging(
480 logger: logging.Logger | None = None,
481 logger_name: str | None = None,
482 level: str = "DEBUG",
483 clear: bool = True,
484) -> logging.Logger:
485 if logger is None and logger_name is None:
486 raise ValueError("Provide either logger object or logger name")
487 logger = logger or logging.getLogger(logger_name)
488 handle = logging.StreamHandler()
489 formatter = logging.Formatter(
490 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s"
491 )
492 handle.setFormatter(formatter)
493 if clear:
494 logger.handlers.clear()
495 logger.addHandler(handle)
496 logger.setLevel(level)
497 return logger
500def _unstrip_protocol(name: str, fs: AbstractFileSystem) -> str:
501 return fs.unstrip_protocol(name)
504def mirror_from(
505 origin_name: str, methods: Iterable[str]
506) -> Callable[[type[T]], type[T]]:
507 """Mirror attributes and methods from the given
508 origin_name attribute of the instance to the
509 decorated class"""
511 def origin_getter(method: str, self: Any) -> Any:
512 origin = getattr(self, origin_name)
513 return getattr(origin, method)
515 def wrapper(cls: type[T]) -> type[T]:
516 for method in methods:
517 wrapped_method = partial(origin_getter, method)
518 setattr(cls, method, property(wrapped_method))
519 return cls
521 return wrapper
524@contextlib.contextmanager
525def nullcontext(obj: T) -> Iterator[T]:
526 yield obj
529def merge_offset_ranges(
530 paths: list[str],
531 starts: list[int] | int,
532 ends: list[int] | int,
533 max_gap: int = 0,
534 max_block: int | None = None,
535 sort: bool = True,
536) -> tuple[list[str], list[int], list[int]]:
537 """Merge adjacent byte-offset ranges when the inter-range
538 gap is <= `max_gap`, and when the merged byte range does not
539 exceed `max_block` (if specified). By default, this function
540 will re-order the input paths and byte ranges to ensure sorted
541 order. If the user can guarantee that the inputs are already
542 sorted, passing `sort=False` will skip the re-ordering.
543 """
544 # Check input
545 if not isinstance(paths, list):
546 raise TypeError
547 if not isinstance(starts, list):
548 starts = [starts] * len(paths)
549 if not isinstance(ends, list):
550 ends = [ends] * len(paths)
551 if len(starts) != len(paths) or len(ends) != len(paths):
552 raise ValueError
554 # Early Return
555 if len(starts) <= 1:
556 return paths, starts, ends
558 starts = [s or 0 for s in starts]
559 # Sort by paths and then ranges if `sort=True`
560 if sort:
561 paths, starts, ends = (
562 list(v)
563 for v in zip(
564 *sorted(
565 zip(paths, starts, ends),
566 )
567 )
568 )
570 if paths:
571 # Loop through the coupled `paths`, `starts`, and
572 # `ends`, and merge adjacent blocks when appropriate
573 new_paths = paths[:1]
574 new_starts = starts[:1]
575 new_ends = ends[:1]
576 for i in range(1, len(paths)):
577 if paths[i] == paths[i - 1] and new_ends[-1] is None:
578 continue
579 elif (
580 paths[i] != paths[i - 1]
581 or ((starts[i] - new_ends[-1]) > max_gap)
582 or (max_block is not None and (ends[i] - new_starts[-1]) > max_block)
583 ):
584 # Cannot merge with previous block.
585 # Add new `paths`, `starts`, and `ends` elements
586 new_paths.append(paths[i])
587 new_starts.append(starts[i])
588 new_ends.append(ends[i])
589 else:
590 # Merge with previous block by updating the
591 # last element of `ends`
592 new_ends[-1] = ends[i]
593 return new_paths, new_starts, new_ends
595 # `paths` is empty. Just return input lists
596 return paths, starts, ends
599def file_size(filelike: IO[bytes]) -> int:
600 """Find length of any open read-mode file-like"""
601 pos = filelike.tell()
602 try:
603 return filelike.seek(0, 2)
604 finally:
605 filelike.seek(pos)
608@contextlib.contextmanager
609def atomic_write(path: str, mode: str = "wb"):
610 """
611 A context manager that opens a temporary file next to `path` and, on exit,
612 replaces `path` with the temporary file, thereby updating `path`
613 atomically.
614 """
615 fd, fn = tempfile.mkstemp(
616 dir=os.path.dirname(path), prefix=os.path.basename(path) + "-"
617 )
618 try:
619 with open(fd, mode) as fp:
620 yield fp
621 except BaseException:
622 with contextlib.suppress(FileNotFoundError):
623 os.unlink(fn)
624 raise
625 else:
626 os.replace(fn, path)
629def _translate(pat, STAR, QUESTION_MARK):
630 # Copied from: https://github.com/python/cpython/pull/106703.
631 res: list[str] = []
632 add = res.append
633 i, n = 0, len(pat)
634 while i < n:
635 c = pat[i]
636 i = i + 1
637 if c == "*":
638 # compress consecutive `*` into one
639 if (not res) or res[-1] is not STAR:
640 add(STAR)
641 elif c == "?":
642 add(QUESTION_MARK)
643 elif c == "[":
644 j = i
645 if j < n and pat[j] == "!":
646 j = j + 1
647 if j < n and pat[j] == "]":
648 j = j + 1
649 while j < n and pat[j] != "]":
650 j = j + 1
651 if j >= n:
652 add("\\[")
653 else:
654 stuff = pat[i:j]
655 if "-" not in stuff:
656 stuff = stuff.replace("\\", r"\\")
657 else:
658 chunks = []
659 k = i + 2 if pat[i] == "!" else i + 1
660 while True:
661 k = pat.find("-", k, j)
662 if k < 0:
663 break
664 chunks.append(pat[i:k])
665 i = k + 1
666 k = k + 3
667 chunk = pat[i:j]
668 if chunk:
669 chunks.append(chunk)
670 else:
671 chunks[-1] += "-"
672 # Remove empty ranges -- invalid in RE.
673 for k in range(len(chunks) - 1, 0, -1):
674 if chunks[k - 1][-1] > chunks[k][0]:
675 chunks[k - 1] = chunks[k - 1][:-1] + chunks[k][1:]
676 del chunks[k]
677 # Escape backslashes and hyphens for set difference (--).
678 # Hyphens that create ranges shouldn't be escaped.
679 stuff = "-".join(
680 s.replace("\\", r"\\").replace("-", r"\-") for s in chunks
681 )
682 # Escape set operations (&&, ~~ and ||).
683 stuff = re.sub(r"([&~|])", r"\\\1", stuff)
684 i = j + 1
685 if not stuff:
686 # Empty range: never match.
687 add("(?!)")
688 elif stuff == "!":
689 # Negated empty range: match any character.
690 add(".")
691 else:
692 if stuff[0] == "!":
693 stuff = "^" + stuff[1:]
694 elif stuff[0] in ("^", "["):
695 stuff = "\\" + stuff
696 add(f"[{stuff}]")
697 else:
698 add(re.escape(c))
699 assert i == n
700 return res
703def glob_translate(pat):
704 # Copied from: https://github.com/python/cpython/pull/106703.
705 # The keyword parameters' values are fixed to:
706 # recursive=True, include_hidden=True, seps=None
707 """Translate a pathname with shell wildcards to a regular expression."""
708 if os.path.altsep:
709 seps = os.path.sep + os.path.altsep
710 else:
711 seps = os.path.sep
712 escaped_seps = "".join(map(re.escape, seps))
713 any_sep = f"[{escaped_seps}]" if len(seps) > 1 else escaped_seps
714 not_sep = f"[^{escaped_seps}]"
715 one_last_segment = f"{not_sep}+"
716 one_segment = f"{one_last_segment}{any_sep}"
717 any_segments = f"(?:.+{any_sep})?"
718 any_last_segments = ".*"
719 results = []
720 parts = re.split(any_sep, pat)
721 last_part_idx = len(parts) - 1
722 for idx, part in enumerate(parts):
723 if part == "*":
724 results.append(one_segment if idx < last_part_idx else one_last_segment)
725 continue
726 if part == "**":
727 results.append(any_segments if idx < last_part_idx else any_last_segments)
728 continue
729 elif "**" in part:
730 raise ValueError(
731 "Invalid pattern: '**' can only be an entire path component"
732 )
733 if part:
734 results.extend(_translate(part, f"{not_sep}*", not_sep))
735 if idx < last_part_idx:
736 results.append(any_sep)
737 res = "".join(results)
738 return rf"(?s:{res})\Z"