Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/utils.py: 14%
347 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1from __future__ import annotations
3import contextlib
4import logging
5import math
6import os
7import pathlib
8import re
9import sys
10import tempfile
11from functools import partial
12from hashlib import md5
13from importlib.metadata import version
14from typing import (
15 IO,
16 TYPE_CHECKING,
17 Any,
18 Callable,
19 Iterable,
20 Iterator,
21 Sequence,
22 TypeVar,
23)
24from urllib.parse import urlsplit
26if TYPE_CHECKING:
27 from typing_extensions import TypeGuard
29 from fsspec.spec import AbstractFileSystem
32DEFAULT_BLOCK_SIZE = 5 * 2**20
34T = TypeVar("T")
37def infer_storage_options(
38 urlpath: str, inherit_storage_options: dict[str, Any] | None = None
39) -> dict[str, Any]:
40 """Infer storage options from URL path and merge it with existing storage
41 options.
43 Parameters
44 ----------
45 urlpath: str or unicode
46 Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
47 inherit_storage_options: dict (optional)
48 Its contents will get merged with the inferred information from the
49 given path
51 Returns
52 -------
53 Storage options dict.
55 Examples
56 --------
57 >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP
58 {"protocol": "file", "path", "/mnt/datasets/test.csv"}
59 >>> infer_storage_options(
60 ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
61 ... inherit_storage_options={'extra': 'value'},
62 ... ) # doctest: +SKIP
63 {"protocol": "hdfs", "username": "username", "password": "pwd",
64 "host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
65 "url_query": "q=1", "extra": "value"}
66 """
67 # Handle Windows paths including disk name in this special case
68 if (
69 re.match(r"^[a-zA-Z]:[\\/]", urlpath)
70 or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
71 ):
72 return {"protocol": "file", "path": urlpath}
74 parsed_path = urlsplit(urlpath)
75 protocol = parsed_path.scheme or "file"
76 if parsed_path.fragment:
77 path = "#".join([parsed_path.path, parsed_path.fragment])
78 else:
79 path = parsed_path.path
80 if protocol == "file":
81 # Special case parsing file protocol URL on Windows according to:
82 # https://msdn.microsoft.com/en-us/library/jj710207.aspx
83 windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
84 if windows_path:
85 path = "%s:%s" % windows_path.groups()
87 if protocol in ["http", "https"]:
88 # for HTTP, we don't want to parse, as requests will anyway
89 return {"protocol": protocol, "path": urlpath}
91 options: dict[str, Any] = {"protocol": protocol, "path": path}
93 if parsed_path.netloc:
94 # Parse `hostname` from netloc manually because `parsed_path.hostname`
95 # lowercases the hostname which is not always desirable (e.g. in S3):
96 # https://github.com/dask/dask/issues/1417
97 options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
99 if protocol in ("s3", "s3a", "gcs", "gs"):
100 options["path"] = options["host"] + options["path"]
101 else:
102 options["host"] = options["host"]
103 if parsed_path.port:
104 options["port"] = parsed_path.port
105 if parsed_path.username:
106 options["username"] = parsed_path.username
107 if parsed_path.password:
108 options["password"] = parsed_path.password
110 if parsed_path.query:
111 options["url_query"] = parsed_path.query
112 if parsed_path.fragment:
113 options["url_fragment"] = parsed_path.fragment
115 if inherit_storage_options:
116 update_storage_options(options, inherit_storage_options)
118 return options
121def update_storage_options(
122 options: dict[str, Any], inherited: dict[str, Any] | None = None
123) -> None:
124 if not inherited:
125 inherited = {}
126 collisions = set(options) & set(inherited)
127 if collisions:
128 for collision in collisions:
129 if options.get(collision) != inherited.get(collision):
130 raise KeyError(
131 f"Collision between inferred and specified storage "
132 f"option:\n{collision}"
133 )
134 options.update(inherited)
137# Compression extensions registered via fsspec.compression.register_compression
138compressions: dict[str, str] = {}
141def infer_compression(filename: str) -> str | None:
142 """Infer compression, if available, from filename.
144 Infer a named compression type, if registered and available, from filename
145 extension. This includes builtin (gz, bz2, zip) compressions, as well as
146 optional compressions. See fsspec.compression.register_compression.
147 """
148 extension = os.path.splitext(filename)[-1].strip(".").lower()
149 if extension in compressions:
150 return compressions[extension]
151 return None
154def build_name_function(max_int: float) -> Callable[[int], str]:
155 """Returns a function that receives a single integer
156 and returns it as a string padded by enough zero characters
157 to align with maximum possible integer
159 >>> name_f = build_name_function(57)
161 >>> name_f(7)
162 '07'
163 >>> name_f(31)
164 '31'
165 >>> build_name_function(1000)(42)
166 '0042'
167 >>> build_name_function(999)(42)
168 '042'
169 >>> build_name_function(0)(0)
170 '0'
171 """
172 # handle corner cases max_int is 0 or exact power of 10
173 max_int += 1e-8
175 pad_length = int(math.ceil(math.log10(max_int)))
177 def name_function(i: int) -> str:
178 return str(i).zfill(pad_length)
180 return name_function
183def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
184 r"""Seek current file to file start, file end, or byte after delimiter seq.
186 Seeks file to next chunk delimiter, where chunks are defined on file start,
187 a delimiting sequence, and file end. Use file.tell() to see location afterwards.
188 Note that file start is a valid split, so must be at offset > 0 to seek for
189 delimiter.
191 Parameters
192 ----------
193 file: a file
194 delimiter: bytes
195 a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
196 blocksize: int
197 Number of bytes to read from the file at once.
200 Returns
201 -------
202 Returns True if a delimiter was found, False if at file start or end.
204 """
206 if file.tell() == 0:
207 # beginning-of-file, return without seek
208 return False
210 # Interface is for binary IO, with delimiter as bytes, but initialize last
211 # with result of file.read to preserve compatibility with text IO.
212 last: bytes | None = None
213 while True:
214 current = file.read(blocksize)
215 if not current:
216 # end-of-file without delimiter
217 return False
218 full = last + current if last else current
219 try:
220 if delimiter in full:
221 i = full.index(delimiter)
222 file.seek(file.tell() - (len(full) - i) + len(delimiter))
223 return True
224 elif len(current) < blocksize:
225 # end-of-file without delimiter
226 return False
227 except (OSError, ValueError):
228 pass
229 last = full[-len(delimiter) :]
232def read_block(
233 f: IO[bytes],
234 offset: int,
235 length: int | None,
236 delimiter: bytes | None = None,
237 split_before: bool = False,
238) -> bytes:
239 """Read a block of bytes from a file
241 Parameters
242 ----------
243 f: File
244 Open file
245 offset: int
246 Byte offset to start read
247 length: int
248 Number of bytes to read, read through end of file if None
249 delimiter: bytes (optional)
250 Ensure reading starts and stops at delimiter bytestring
251 split_before: bool (optional)
252 Start/stop read *before* delimiter bytestring.
255 If using the ``delimiter=`` keyword argument we ensure that the read
256 starts and stops at delimiter boundaries that follow the locations
257 ``offset`` and ``offset + length``. If ``offset`` is zero then we
258 start at zero, regardless of delimiter. The bytestring returned WILL
259 include the terminating delimiter string.
261 Examples
262 --------
264 >>> from io import BytesIO # doctest: +SKIP
265 >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
266 >>> read_block(f, 0, 13) # doctest: +SKIP
267 b'Alice, 100\\nBo'
269 >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
270 b'Alice, 100\\nBob, 200\\n'
272 >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
273 b'Bob, 200\\nCharlie, 300'
274 """
275 if delimiter:
276 f.seek(offset)
277 found_start_delim = seek_delimiter(f, delimiter, 2**16)
278 if length is None:
279 return f.read()
280 start = f.tell()
281 length -= start - offset
283 f.seek(start + length)
284 found_end_delim = seek_delimiter(f, delimiter, 2**16)
285 end = f.tell()
287 # Adjust split location to before delimiter iff seek found the
288 # delimiter sequence, not start or end of file.
289 if found_start_delim and split_before:
290 start -= len(delimiter)
292 if found_end_delim and split_before:
293 end -= len(delimiter)
295 offset = start
296 length = end - start
298 f.seek(offset)
300 # TODO: allow length to be None and read to the end of the file?
301 assert length is not None
302 b = f.read(length)
303 return b
306def tokenize(*args: Any, **kwargs: Any) -> str:
307 """Deterministic token
309 (modified from dask.base)
311 >>> tokenize([1, 2, '3'])
312 '9d71491b50023b06fc76928e6eddb952'
314 >>> tokenize('Hello') == tokenize('Hello')
315 True
316 """
317 if kwargs:
318 args += (kwargs,)
319 try:
320 h = md5(str(args).encode())
321 except ValueError:
322 # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380
323 h = md5(str(args).encode(), usedforsecurity=False)
324 return h.hexdigest()
327def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
328 """Attempt to convert a path-like object to a string.
330 Parameters
331 ----------
332 filepath: object to be converted
334 Returns
335 -------
336 filepath_str: maybe a string version of the object
338 Notes
339 -----
340 Objects supporting the fspath protocol are coerced according to its
341 __fspath__ method.
343 For backwards compatibility with older Python version, pathlib.Path
344 objects are specially coerced.
346 Any other object is passed through unchanged, which includes bytes,
347 strings, buffers, or anything else that's not even path-like.
348 """
349 if isinstance(filepath, str):
350 return filepath
351 elif hasattr(filepath, "__fspath__"):
352 return filepath.__fspath__()
353 elif isinstance(filepath, pathlib.Path):
354 return str(filepath)
355 elif hasattr(filepath, "path"):
356 return filepath.path
357 else:
358 return filepath # type: ignore[return-value]
361def make_instance(
362 cls: Callable[..., T], args: Sequence[Any], kwargs: dict[str, Any]
363) -> T:
364 inst = cls(*args, **kwargs)
365 inst._determine_worker() # type: ignore[attr-defined]
366 return inst
369def common_prefix(paths: Iterable[str]) -> str:
370 """For a list of paths, find the shortest prefix common to all"""
371 parts = [p.split("/") for p in paths]
372 lmax = min(len(p) for p in parts)
373 end = 0
374 for i in range(lmax):
375 end = all(p[i] == parts[0][i] for p in parts)
376 if not end:
377 break
378 i += end
379 return "/".join(parts[0][:i])
382def other_paths(
383 paths: list[str],
384 path2: str | list[str],
385 exists: bool = False,
386 flatten: bool = False,
387) -> list[str]:
388 """In bulk file operations, construct a new file tree from a list of files
390 Parameters
391 ----------
392 paths: list of str
393 The input file tree
394 path2: str or list of str
395 Root to construct the new list in. If this is already a list of str, we just
396 assert it has the right number of elements.
397 exists: bool (optional)
398 For a str destination, it is already exists (and is a dir), files should
399 end up inside.
400 flatten: bool (optional)
401 Whether to flatten the input directory tree structure so that the output files
402 are in the same directory.
404 Returns
405 -------
406 list of str
407 """
409 if isinstance(path2, str):
410 path2 = path2.rstrip("/")
412 if flatten:
413 path2 = ["/".join((path2, p.split("/")[-1])) for p in paths]
414 else:
415 cp = common_prefix(paths)
416 if exists:
417 cp = cp.rsplit("/", 1)[0]
418 if not cp and all(not s.startswith("/") for s in paths):
419 path2 = ["/".join([path2, p]) for p in paths]
420 else:
421 path2 = [p.replace(cp, path2, 1) for p in paths]
422 else:
423 assert len(paths) == len(path2)
424 return path2
427def is_exception(obj: Any) -> bool:
428 return isinstance(obj, BaseException)
431def isfilelike(f: Any) -> TypeGuard[IO[bytes]]:
432 for attr in ["read", "close", "tell"]:
433 if not hasattr(f, attr):
434 return False
435 return True
438def get_protocol(url: str) -> str:
439 url = stringify_path(url)
440 parts = re.split(r"(\:\:|\://)", url, 1)
441 if len(parts) > 1:
442 return parts[0]
443 return "file"
446def can_be_local(path: str) -> bool:
447 """Can the given URL be used with open_local?"""
448 from fsspec import get_filesystem_class
450 try:
451 return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
452 except (ValueError, ImportError):
453 # not in registry or import failed
454 return False
457def get_package_version_without_import(name: str) -> str | None:
458 """For given package name, try to find the version without importing it
460 Import and package.__version__ is still the backup here, so an import
461 *might* happen.
463 Returns either the version string, or None if the package
464 or the version was not readily found.
465 """
466 if name in sys.modules:
467 mod = sys.modules[name]
468 if hasattr(mod, "__version__"):
469 return mod.__version__
470 try:
471 return version(name)
472 except: # noqa: E722
473 pass
474 try:
475 import importlib
477 mod = importlib.import_module(name)
478 return mod.__version__
479 except (ImportError, AttributeError):
480 return None
483def setup_logging(
484 logger: logging.Logger | None = None,
485 logger_name: str | None = None,
486 level: str = "DEBUG",
487 clear: bool = True,
488) -> logging.Logger:
489 if logger is None and logger_name is None:
490 raise ValueError("Provide either logger object or logger name")
491 logger = logger or logging.getLogger(logger_name)
492 handle = logging.StreamHandler()
493 formatter = logging.Formatter(
494 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s"
495 )
496 handle.setFormatter(formatter)
497 if clear:
498 logger.handlers.clear()
499 logger.addHandler(handle)
500 logger.setLevel(level)
501 return logger
504def _unstrip_protocol(name: str, fs: AbstractFileSystem) -> str:
505 return fs.unstrip_protocol(name)
508def mirror_from(
509 origin_name: str, methods: Iterable[str]
510) -> Callable[[type[T]], type[T]]:
511 """Mirror attributes and methods from the given
512 origin_name attribute of the instance to the
513 decorated class"""
515 def origin_getter(method: str, self: Any) -> Any:
516 origin = getattr(self, origin_name)
517 return getattr(origin, method)
519 def wrapper(cls: type[T]) -> type[T]:
520 for method in methods:
521 wrapped_method = partial(origin_getter, method)
522 setattr(cls, method, property(wrapped_method))
523 return cls
525 return wrapper
528@contextlib.contextmanager
529def nullcontext(obj: T) -> Iterator[T]:
530 yield obj
533def merge_offset_ranges(
534 paths: list[str],
535 starts: list[int] | int,
536 ends: list[int] | int,
537 max_gap: int = 0,
538 max_block: int | None = None,
539 sort: bool = True,
540) -> tuple[list[str], list[int], list[int]]:
541 """Merge adjacent byte-offset ranges when the inter-range
542 gap is <= `max_gap`, and when the merged byte range does not
543 exceed `max_block` (if specified). By default, this function
544 will re-order the input paths and byte ranges to ensure sorted
545 order. If the user can guarantee that the inputs are already
546 sorted, passing `sort=False` will skip the re-ordering.
547 """
548 # Check input
549 if not isinstance(paths, list):
550 raise TypeError
551 if not isinstance(starts, list):
552 starts = [starts] * len(paths)
553 if not isinstance(ends, list):
554 ends = [ends] * len(paths)
555 if len(starts) != len(paths) or len(ends) != len(paths):
556 raise ValueError
558 # Early Return
559 if len(starts) <= 1:
560 return paths, starts, ends
562 starts = [s or 0 for s in starts]
563 # Sort by paths and then ranges if `sort=True`
564 if sort:
565 paths, starts, ends = (
566 list(v)
567 for v in zip(
568 *sorted(
569 zip(paths, starts, ends),
570 )
571 )
572 )
574 if paths:
575 # Loop through the coupled `paths`, `starts`, and
576 # `ends`, and merge adjacent blocks when appropriate
577 new_paths = paths[:1]
578 new_starts = starts[:1]
579 new_ends = ends[:1]
580 for i in range(1, len(paths)):
581 if paths[i] == paths[i - 1] and new_ends[-1] is None:
582 continue
583 elif (
584 paths[i] != paths[i - 1]
585 or ((starts[i] - new_ends[-1]) > max_gap)
586 or (max_block is not None and (ends[i] - new_starts[-1]) > max_block)
587 ):
588 # Cannot merge with previous block.
589 # Add new `paths`, `starts`, and `ends` elements
590 new_paths.append(paths[i])
591 new_starts.append(starts[i])
592 new_ends.append(ends[i])
593 else:
594 # Merge with previous block by updating the
595 # last element of `ends`
596 new_ends[-1] = ends[i]
597 return new_paths, new_starts, new_ends
599 # `paths` is empty. Just return input lists
600 return paths, starts, ends
603def file_size(filelike: IO[bytes]) -> int:
604 """Find length of any open read-mode file-like"""
605 pos = filelike.tell()
606 try:
607 return filelike.seek(0, 2)
608 finally:
609 filelike.seek(pos)
612@contextlib.contextmanager
613def atomic_write(path: str, mode: str = "wb"):
614 """
615 A context manager that opens a temporary file next to `path` and, on exit,
616 replaces `path` with the temporary file, thereby updating `path`
617 atomically.
618 """
619 fd, fn = tempfile.mkstemp(
620 dir=os.path.dirname(path), prefix=os.path.basename(path) + "-"
621 )
622 try:
623 with open(fd, mode) as fp:
624 yield fp
625 except BaseException:
626 with contextlib.suppress(FileNotFoundError):
627 os.unlink(fn)
628 raise
629 else:
630 os.replace(fn, path)
633def _translate(pat, STAR, QUESTION_MARK):
634 # Copied from: https://github.com/python/cpython/pull/106703.
635 res: list[str] = []
636 add = res.append
637 i, n = 0, len(pat)
638 while i < n:
639 c = pat[i]
640 i = i + 1
641 if c == "*":
642 # compress consecutive `*` into one
643 if (not res) or res[-1] is not STAR:
644 add(STAR)
645 elif c == "?":
646 add(QUESTION_MARK)
647 elif c == "[":
648 j = i
649 if j < n and pat[j] == "!":
650 j = j + 1
651 if j < n and pat[j] == "]":
652 j = j + 1
653 while j < n and pat[j] != "]":
654 j = j + 1
655 if j >= n:
656 add("\\[")
657 else:
658 stuff = pat[i:j]
659 if "-" not in stuff:
660 stuff = stuff.replace("\\", r"\\")
661 else:
662 chunks = []
663 k = i + 2 if pat[i] == "!" else i + 1
664 while True:
665 k = pat.find("-", k, j)
666 if k < 0:
667 break
668 chunks.append(pat[i:k])
669 i = k + 1
670 k = k + 3
671 chunk = pat[i:j]
672 if chunk:
673 chunks.append(chunk)
674 else:
675 chunks[-1] += "-"
676 # Remove empty ranges -- invalid in RE.
677 for k in range(len(chunks) - 1, 0, -1):
678 if chunks[k - 1][-1] > chunks[k][0]:
679 chunks[k - 1] = chunks[k - 1][:-1] + chunks[k][1:]
680 del chunks[k]
681 # Escape backslashes and hyphens for set difference (--).
682 # Hyphens that create ranges shouldn't be escaped.
683 stuff = "-".join(
684 s.replace("\\", r"\\").replace("-", r"\-") for s in chunks
685 )
686 # Escape set operations (&&, ~~ and ||).
687 stuff = re.sub(r"([&~|])", r"\\\1", stuff)
688 i = j + 1
689 if not stuff:
690 # Empty range: never match.
691 add("(?!)")
692 elif stuff == "!":
693 # Negated empty range: match any character.
694 add(".")
695 else:
696 if stuff[0] == "!":
697 stuff = "^" + stuff[1:]
698 elif stuff[0] in ("^", "["):
699 stuff = "\\" + stuff
700 add(f"[{stuff}]")
701 else:
702 add(re.escape(c))
703 assert i == n
704 return res
707def glob_translate(pat):
708 # Copied from: https://github.com/python/cpython/pull/106703.
709 # The keyword parameters' values are fixed to:
710 # recursive=True, include_hidden=True, seps=None
711 """Translate a pathname with shell wildcards to a regular expression."""
712 if os.path.altsep:
713 seps = os.path.sep + os.path.altsep
714 else:
715 seps = os.path.sep
716 escaped_seps = "".join(map(re.escape, seps))
717 any_sep = f"[{escaped_seps}]" if len(seps) > 1 else escaped_seps
718 not_sep = f"[^{escaped_seps}]"
719 one_last_segment = f"{not_sep}+"
720 one_segment = f"{one_last_segment}{any_sep}"
721 any_segments = f"(?:.+{any_sep})?"
722 any_last_segments = ".*"
723 results = []
724 parts = re.split(any_sep, pat)
725 last_part_idx = len(parts) - 1
726 for idx, part in enumerate(parts):
727 if part == "*":
728 results.append(one_segment if idx < last_part_idx else one_last_segment)
729 continue
730 if part == "**":
731 results.append(any_segments if idx < last_part_idx else any_last_segments)
732 continue
733 elif "**" in part:
734 raise ValueError(
735 "Invalid pattern: '**' can only be an entire path component"
736 )
737 if part:
738 results.extend(_translate(part, f"{not_sep}*", not_sep))
739 if idx < last_part_idx:
740 results.append(any_sep)
741 res = "".join(results)
742 return rf"(?s:{res})\Z"