Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/utils.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import contextlib
4import logging
5import math
6import os
7import re
8import sys
9import tempfile
10from functools import partial
11from hashlib import md5
12from importlib.metadata import version
13from typing import (
14 IO,
15 TYPE_CHECKING,
16 Any,
17 Callable,
18 Iterable,
19 Iterator,
20 Sequence,
21 TypeVar,
22)
23from urllib.parse import urlsplit
25if TYPE_CHECKING:
26 import pathlib
28 from typing_extensions import TypeGuard
30 from fsspec.spec import AbstractFileSystem
33DEFAULT_BLOCK_SIZE = 5 * 2**20
35T = TypeVar("T")
38def infer_storage_options(
39 urlpath: str, inherit_storage_options: dict[str, Any] | None = None
40) -> dict[str, Any]:
41 """Infer storage options from URL path and merge it with existing storage
42 options.
44 Parameters
45 ----------
46 urlpath: str or unicode
47 Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
48 inherit_storage_options: dict (optional)
49 Its contents will get merged with the inferred information from the
50 given path
52 Returns
53 -------
54 Storage options dict.
56 Examples
57 --------
58 >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP
59 {"protocol": "file", "path", "/mnt/datasets/test.csv"}
60 >>> infer_storage_options(
61 ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
62 ... inherit_storage_options={'extra': 'value'},
63 ... ) # doctest: +SKIP
64 {"protocol": "hdfs", "username": "username", "password": "pwd",
65 "host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
66 "url_query": "q=1", "extra": "value"}
67 """
68 # Handle Windows paths including disk name in this special case
69 if (
70 re.match(r"^[a-zA-Z]:[\\/]", urlpath)
71 or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
72 ):
73 return {"protocol": "file", "path": urlpath}
75 parsed_path = urlsplit(urlpath)
76 protocol = parsed_path.scheme or "file"
77 if parsed_path.fragment:
78 path = "#".join([parsed_path.path, parsed_path.fragment])
79 else:
80 path = parsed_path.path
81 if protocol == "file":
82 # Special case parsing file protocol URL on Windows according to:
83 # https://msdn.microsoft.com/en-us/library/jj710207.aspx
84 windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
85 if windows_path:
86 drive, path = windows_path.groups()
87 path = f"{drive}:{path}"
89 if protocol in ["http", "https"]:
90 # for HTTP, we don't want to parse, as requests will anyway
91 return {"protocol": protocol, "path": urlpath}
93 options: dict[str, Any] = {"protocol": protocol, "path": path}
95 if parsed_path.netloc:
96 # Parse `hostname` from netloc manually because `parsed_path.hostname`
97 # lowercases the hostname which is not always desirable (e.g. in S3):
98 # https://github.com/dask/dask/issues/1417
99 options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
101 if protocol in ("s3", "s3a", "gcs", "gs"):
102 options["path"] = options["host"] + options["path"]
103 else:
104 options["host"] = options["host"]
105 if parsed_path.port:
106 options["port"] = parsed_path.port
107 if parsed_path.username:
108 options["username"] = parsed_path.username
109 if parsed_path.password:
110 options["password"] = parsed_path.password
112 if parsed_path.query:
113 options["url_query"] = parsed_path.query
114 if parsed_path.fragment:
115 options["url_fragment"] = parsed_path.fragment
117 if inherit_storage_options:
118 update_storage_options(options, inherit_storage_options)
120 return options
123def update_storage_options(
124 options: dict[str, Any], inherited: dict[str, Any] | None = None
125) -> None:
126 if not inherited:
127 inherited = {}
128 collisions = set(options) & set(inherited)
129 if collisions:
130 for collision in collisions:
131 if options.get(collision) != inherited.get(collision):
132 raise KeyError(
133 f"Collision between inferred and specified storage "
134 f"option:\n{collision}"
135 )
136 options.update(inherited)
139# Compression extensions registered via fsspec.compression.register_compression
140compressions: dict[str, str] = {}
143def infer_compression(filename: str) -> str | None:
144 """Infer compression, if available, from filename.
146 Infer a named compression type, if registered and available, from filename
147 extension. This includes builtin (gz, bz2, zip) compressions, as well as
148 optional compressions. See fsspec.compression.register_compression.
149 """
150 extension = os.path.splitext(filename)[-1].strip(".").lower()
151 if extension in compressions:
152 return compressions[extension]
153 return None
156def build_name_function(max_int: float) -> Callable[[int], str]:
157 """Returns a function that receives a single integer
158 and returns it as a string padded by enough zero characters
159 to align with maximum possible integer
161 >>> name_f = build_name_function(57)
163 >>> name_f(7)
164 '07'
165 >>> name_f(31)
166 '31'
167 >>> build_name_function(1000)(42)
168 '0042'
169 >>> build_name_function(999)(42)
170 '042'
171 >>> build_name_function(0)(0)
172 '0'
173 """
174 # handle corner cases max_int is 0 or exact power of 10
175 max_int += 1e-8
177 pad_length = int(math.ceil(math.log10(max_int)))
179 def name_function(i: int) -> str:
180 return str(i).zfill(pad_length)
182 return name_function
185def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
186 r"""Seek current file to file start, file end, or byte after delimiter seq.
188 Seeks file to next chunk delimiter, where chunks are defined on file start,
189 a delimiting sequence, and file end. Use file.tell() to see location afterwards.
190 Note that file start is a valid split, so must be at offset > 0 to seek for
191 delimiter.
193 Parameters
194 ----------
195 file: a file
196 delimiter: bytes
197 a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
198 blocksize: int
199 Number of bytes to read from the file at once.
202 Returns
203 -------
204 Returns True if a delimiter was found, False if at file start or end.
206 """
208 if file.tell() == 0:
209 # beginning-of-file, return without seek
210 return False
212 # Interface is for binary IO, with delimiter as bytes, but initialize last
213 # with result of file.read to preserve compatibility with text IO.
214 last: bytes | None = None
215 while True:
216 current = file.read(blocksize)
217 if not current:
218 # end-of-file without delimiter
219 return False
220 full = last + current if last else current
221 try:
222 if delimiter in full:
223 i = full.index(delimiter)
224 file.seek(file.tell() - (len(full) - i) + len(delimiter))
225 return True
226 elif len(current) < blocksize:
227 # end-of-file without delimiter
228 return False
229 except (OSError, ValueError):
230 pass
231 last = full[-len(delimiter) :]
234def read_block(
235 f: IO[bytes],
236 offset: int,
237 length: int | None,
238 delimiter: bytes | None = None,
239 split_before: bool = False,
240) -> bytes:
241 """Read a block of bytes from a file
243 Parameters
244 ----------
245 f: File
246 Open file
247 offset: int
248 Byte offset to start read
249 length: int
250 Number of bytes to read, read through end of file if None
251 delimiter: bytes (optional)
252 Ensure reading starts and stops at delimiter bytestring
253 split_before: bool (optional)
254 Start/stop read *before* delimiter bytestring.
257 If using the ``delimiter=`` keyword argument we ensure that the read
258 starts and stops at delimiter boundaries that follow the locations
259 ``offset`` and ``offset + length``. If ``offset`` is zero then we
260 start at zero, regardless of delimiter. The bytestring returned WILL
261 include the terminating delimiter string.
263 Examples
264 --------
266 >>> from io import BytesIO # doctest: +SKIP
267 >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
268 >>> read_block(f, 0, 13) # doctest: +SKIP
269 b'Alice, 100\\nBo'
271 >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
272 b'Alice, 100\\nBob, 200\\n'
274 >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
275 b'Bob, 200\\nCharlie, 300'
276 """
277 if delimiter:
278 f.seek(offset)
279 found_start_delim = seek_delimiter(f, delimiter, 2**16)
280 if length is None:
281 return f.read()
282 start = f.tell()
283 length -= start - offset
285 f.seek(start + length)
286 found_end_delim = seek_delimiter(f, delimiter, 2**16)
287 end = f.tell()
289 # Adjust split location to before delimiter if seek found the
290 # delimiter sequence, not start or end of file.
291 if found_start_delim and split_before:
292 start -= len(delimiter)
294 if found_end_delim and split_before:
295 end -= len(delimiter)
297 offset = start
298 length = end - start
300 f.seek(offset)
302 # TODO: allow length to be None and read to the end of the file?
303 assert length is not None
304 b = f.read(length)
305 return b
308def tokenize(*args: Any, **kwargs: Any) -> str:
309 """Deterministic token
311 (modified from dask.base)
313 >>> tokenize([1, 2, '3'])
314 '9d71491b50023b06fc76928e6eddb952'
316 >>> tokenize('Hello') == tokenize('Hello')
317 True
318 """
319 if kwargs:
320 args += (kwargs,)
321 try:
322 h = md5(str(args).encode())
323 except ValueError:
324 # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380
325 h = md5(str(args).encode(), usedforsecurity=False)
326 return h.hexdigest()
329def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
330 """Attempt to convert a path-like object to a string.
332 Parameters
333 ----------
334 filepath: object to be converted
336 Returns
337 -------
338 filepath_str: maybe a string version of the object
340 Notes
341 -----
342 Objects supporting the fspath protocol are coerced according to its
343 __fspath__ method.
345 For backwards compatibility with older Python version, pathlib.Path
346 objects are specially coerced.
348 Any other object is passed through unchanged, which includes bytes,
349 strings, buffers, or anything else that's not even path-like.
350 """
351 if isinstance(filepath, str):
352 return filepath
353 elif hasattr(filepath, "__fspath__"):
354 return filepath.__fspath__()
355 elif hasattr(filepath, "path"):
356 return filepath.path
357 else:
358 return filepath # type: ignore[return-value]
361def make_instance(
362 cls: Callable[..., T], args: Sequence[Any], kwargs: dict[str, Any]
363) -> T:
364 inst = cls(*args, **kwargs)
365 inst._determine_worker() # type: ignore[attr-defined]
366 return inst
369def common_prefix(paths: Iterable[str]) -> str:
370 """For a list of paths, find the shortest prefix common to all"""
371 parts = [p.split("/") for p in paths]
372 lmax = min(len(p) for p in parts)
373 end = 0
374 for i in range(lmax):
375 end = all(p[i] == parts[0][i] for p in parts)
376 if not end:
377 break
378 i += end
379 return "/".join(parts[0][:i])
382def other_paths(
383 paths: list[str],
384 path2: str | list[str],
385 exists: bool = False,
386 flatten: bool = False,
387) -> list[str]:
388 """In bulk file operations, construct a new file tree from a list of files
390 Parameters
391 ----------
392 paths: list of str
393 The input file tree
394 path2: str or list of str
395 Root to construct the new list in. If this is already a list of str, we just
396 assert it has the right number of elements.
397 exists: bool (optional)
398 For a str destination, it is already exists (and is a dir), files should
399 end up inside.
400 flatten: bool (optional)
401 Whether to flatten the input directory tree structure so that the output files
402 are in the same directory.
404 Returns
405 -------
406 list of str
407 """
409 if isinstance(path2, str):
410 path2 = path2.rstrip("/")
412 if flatten:
413 path2 = ["/".join((path2, p.split("/")[-1])) for p in paths]
414 else:
415 cp = common_prefix(paths)
416 if exists:
417 cp = cp.rsplit("/", 1)[0]
418 if not cp and all(not s.startswith("/") for s in paths):
419 path2 = ["/".join([path2, p]) for p in paths]
420 else:
421 path2 = [p.replace(cp, path2, 1) for p in paths]
422 else:
423 assert len(paths) == len(path2)
424 return path2
427def is_exception(obj: Any) -> bool:
428 return isinstance(obj, BaseException)
431def isfilelike(f: Any) -> TypeGuard[IO[bytes]]:
432 return all(hasattr(f, attr) for attr in ["read", "close", "tell"])
435def get_protocol(url: str) -> str:
436 url = stringify_path(url)
437 parts = re.split(r"(\:\:|\://)", url, maxsplit=1)
438 if len(parts) > 1:
439 return parts[0]
440 return "file"
443def can_be_local(path: str) -> bool:
444 """Can the given URL be used with open_local?"""
445 from fsspec import get_filesystem_class
447 try:
448 return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
449 except (ValueError, ImportError):
450 # not in registry or import failed
451 return False
454def get_package_version_without_import(name: str) -> str | None:
455 """For given package name, try to find the version without importing it
457 Import and package.__version__ is still the backup here, so an import
458 *might* happen.
460 Returns either the version string, or None if the package
461 or the version was not readily found.
462 """
463 if name in sys.modules:
464 mod = sys.modules[name]
465 if hasattr(mod, "__version__"):
466 return mod.__version__
467 try:
468 return version(name)
469 except: # noqa: E722
470 pass
471 try:
472 import importlib
474 mod = importlib.import_module(name)
475 return mod.__version__
476 except (ImportError, AttributeError):
477 return None
480def setup_logging(
481 logger: logging.Logger | None = None,
482 logger_name: str | None = None,
483 level: str = "DEBUG",
484 clear: bool = True,
485) -> logging.Logger:
486 if logger is None and logger_name is None:
487 raise ValueError("Provide either logger object or logger name")
488 logger = logger or logging.getLogger(logger_name)
489 handle = logging.StreamHandler()
490 formatter = logging.Formatter(
491 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s"
492 )
493 handle.setFormatter(formatter)
494 if clear:
495 logger.handlers.clear()
496 logger.addHandler(handle)
497 logger.setLevel(level)
498 return logger
501def _unstrip_protocol(name: str, fs: AbstractFileSystem) -> str:
502 return fs.unstrip_protocol(name)
505def mirror_from(
506 origin_name: str, methods: Iterable[str]
507) -> Callable[[type[T]], type[T]]:
508 """Mirror attributes and methods from the given
509 origin_name attribute of the instance to the
510 decorated class"""
512 def origin_getter(method: str, self: Any) -> Any:
513 origin = getattr(self, origin_name)
514 return getattr(origin, method)
516 def wrapper(cls: type[T]) -> type[T]:
517 for method in methods:
518 wrapped_method = partial(origin_getter, method)
519 setattr(cls, method, property(wrapped_method))
520 return cls
522 return wrapper
525@contextlib.contextmanager
526def nullcontext(obj: T) -> Iterator[T]:
527 yield obj
530def merge_offset_ranges(
531 paths: list[str],
532 starts: list[int] | int,
533 ends: list[int] | int,
534 max_gap: int = 0,
535 max_block: int | None = None,
536 sort: bool = True,
537) -> tuple[list[str], list[int], list[int]]:
538 """Merge adjacent byte-offset ranges when the inter-range
539 gap is <= `max_gap`, and when the merged byte range does not
540 exceed `max_block` (if specified). By default, this function
541 will re-order the input paths and byte ranges to ensure sorted
542 order. If the user can guarantee that the inputs are already
543 sorted, passing `sort=False` will skip the re-ordering.
544 """
545 # Check input
546 if not isinstance(paths, list):
547 raise TypeError
548 if not isinstance(starts, list):
549 starts = [starts] * len(paths)
550 if not isinstance(ends, list):
551 ends = [ends] * len(paths)
552 if len(starts) != len(paths) or len(ends) != len(paths):
553 raise ValueError
555 # Early Return
556 if len(starts) <= 1:
557 return paths, starts, ends
559 starts = [s or 0 for s in starts]
560 # Sort by paths and then ranges if `sort=True`
561 if sort:
562 paths, starts, ends = (
563 list(v)
564 for v in zip(
565 *sorted(
566 zip(paths, starts, ends),
567 )
568 )
569 )
571 if paths:
572 # Loop through the coupled `paths`, `starts`, and
573 # `ends`, and merge adjacent blocks when appropriate
574 new_paths = paths[:1]
575 new_starts = starts[:1]
576 new_ends = ends[:1]
577 for i in range(1, len(paths)):
578 if paths[i] == paths[i - 1] and new_ends[-1] is None:
579 continue
580 elif (
581 paths[i] != paths[i - 1]
582 or ((starts[i] - new_ends[-1]) > max_gap)
583 or (max_block is not None and (ends[i] - new_starts[-1]) > max_block)
584 ):
585 # Cannot merge with previous block.
586 # Add new `paths`, `starts`, and `ends` elements
587 new_paths.append(paths[i])
588 new_starts.append(starts[i])
589 new_ends.append(ends[i])
590 else:
591 # Merge with previous block by updating the
592 # last element of `ends`
593 new_ends[-1] = ends[i]
594 return new_paths, new_starts, new_ends
596 # `paths` is empty. Just return input lists
597 return paths, starts, ends
600def file_size(filelike: IO[bytes]) -> int:
601 """Find length of any open read-mode file-like"""
602 pos = filelike.tell()
603 try:
604 return filelike.seek(0, 2)
605 finally:
606 filelike.seek(pos)
609@contextlib.contextmanager
610def atomic_write(path: str, mode: str = "wb"):
611 """
612 A context manager that opens a temporary file next to `path` and, on exit,
613 replaces `path` with the temporary file, thereby updating `path`
614 atomically.
615 """
616 fd, fn = tempfile.mkstemp(
617 dir=os.path.dirname(path), prefix=os.path.basename(path) + "-"
618 )
619 try:
620 with open(fd, mode) as fp:
621 yield fp
622 except BaseException:
623 with contextlib.suppress(FileNotFoundError):
624 os.unlink(fn)
625 raise
626 else:
627 os.replace(fn, path)
630def _translate(pat, STAR, QUESTION_MARK):
631 # Copied from: https://github.com/python/cpython/pull/106703.
632 res: list[str] = []
633 add = res.append
634 i, n = 0, len(pat)
635 while i < n:
636 c = pat[i]
637 i = i + 1
638 if c == "*":
639 # compress consecutive `*` into one
640 if (not res) or res[-1] is not STAR:
641 add(STAR)
642 elif c == "?":
643 add(QUESTION_MARK)
644 elif c == "[":
645 j = i
646 if j < n and pat[j] == "!":
647 j = j + 1
648 if j < n and pat[j] == "]":
649 j = j + 1
650 while j < n and pat[j] != "]":
651 j = j + 1
652 if j >= n:
653 add("\\[")
654 else:
655 stuff = pat[i:j]
656 if "-" not in stuff:
657 stuff = stuff.replace("\\", r"\\")
658 else:
659 chunks = []
660 k = i + 2 if pat[i] == "!" else i + 1
661 while True:
662 k = pat.find("-", k, j)
663 if k < 0:
664 break
665 chunks.append(pat[i:k])
666 i = k + 1
667 k = k + 3
668 chunk = pat[i:j]
669 if chunk:
670 chunks.append(chunk)
671 else:
672 chunks[-1] += "-"
673 # Remove empty ranges -- invalid in RE.
674 for k in range(len(chunks) - 1, 0, -1):
675 if chunks[k - 1][-1] > chunks[k][0]:
676 chunks[k - 1] = chunks[k - 1][:-1] + chunks[k][1:]
677 del chunks[k]
678 # Escape backslashes and hyphens for set difference (--).
679 # Hyphens that create ranges shouldn't be escaped.
680 stuff = "-".join(
681 s.replace("\\", r"\\").replace("-", r"\-") for s in chunks
682 )
683 # Escape set operations (&&, ~~ and ||).
684 stuff = re.sub(r"([&~|])", r"\\\1", stuff)
685 i = j + 1
686 if not stuff:
687 # Empty range: never match.
688 add("(?!)")
689 elif stuff == "!":
690 # Negated empty range: match any character.
691 add(".")
692 else:
693 if stuff[0] == "!":
694 stuff = "^" + stuff[1:]
695 elif stuff[0] in ("^", "["):
696 stuff = "\\" + stuff
697 add(f"[{stuff}]")
698 else:
699 add(re.escape(c))
700 assert i == n
701 return res
704def glob_translate(pat):
705 # Copied from: https://github.com/python/cpython/pull/106703.
706 # The keyword parameters' values are fixed to:
707 # recursive=True, include_hidden=True, seps=None
708 """Translate a pathname with shell wildcards to a regular expression."""
709 if os.path.altsep:
710 seps = os.path.sep + os.path.altsep
711 else:
712 seps = os.path.sep
713 escaped_seps = "".join(map(re.escape, seps))
714 any_sep = f"[{escaped_seps}]" if len(seps) > 1 else escaped_seps
715 not_sep = f"[^{escaped_seps}]"
716 one_last_segment = f"{not_sep}+"
717 one_segment = f"{one_last_segment}{any_sep}"
718 any_segments = f"(?:.+{any_sep})?"
719 any_last_segments = ".*"
720 results = []
721 parts = re.split(any_sep, pat)
722 last_part_idx = len(parts) - 1
723 for idx, part in enumerate(parts):
724 if part == "*":
725 results.append(one_segment if idx < last_part_idx else one_last_segment)
726 continue
727 if part == "**":
728 results.append(any_segments if idx < last_part_idx else any_last_segments)
729 continue
730 elif "**" in part:
731 raise ValueError(
732 "Invalid pattern: '**' can only be an entire path component"
733 )
734 if part:
735 results.extend(_translate(part, f"{not_sep}*", not_sep))
736 if idx < last_part_idx:
737 results.append(any_sep)
738 res = "".join(results)
739 return rf"(?s:{res})\Z"