Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/utils.py: 17%
242 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1from __future__ import annotations
3import logging
4import math
5import os
6import pathlib
7import re
8import sys
9from contextlib import contextmanager
10from functools import partial
11from hashlib import md5
12from importlib.metadata import version
13from urllib.parse import urlsplit
15DEFAULT_BLOCK_SIZE = 5 * 2**20
18def infer_storage_options(urlpath, inherit_storage_options=None):
19 """Infer storage options from URL path and merge it with existing storage
20 options.
22 Parameters
23 ----------
24 urlpath: str or unicode
25 Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
26 inherit_storage_options: dict (optional)
27 Its contents will get merged with the inferred information from the
28 given path
30 Returns
31 -------
32 Storage options dict.
34 Examples
35 --------
36 >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP
37 {"protocol": "file", "path", "/mnt/datasets/test.csv"}
38 >>> infer_storage_options(
39 ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
40 ... inherit_storage_options={'extra': 'value'},
41 ... ) # doctest: +SKIP
42 {"protocol": "hdfs", "username": "username", "password": "pwd",
43 "host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
44 "url_query": "q=1", "extra": "value"}
45 """
46 # Handle Windows paths including disk name in this special case
47 if (
48 re.match(r"^[a-zA-Z]:[\\/]", urlpath)
49 or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
50 ):
51 return {"protocol": "file", "path": urlpath}
53 parsed_path = urlsplit(urlpath)
54 protocol = parsed_path.scheme or "file"
55 if parsed_path.fragment:
56 path = "#".join([parsed_path.path, parsed_path.fragment])
57 else:
58 path = parsed_path.path
59 if protocol == "file":
60 # Special case parsing file protocol URL on Windows according to:
61 # https://msdn.microsoft.com/en-us/library/jj710207.aspx
62 windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
63 if windows_path:
64 path = "%s:%s" % windows_path.groups()
66 if protocol in ["http", "https"]:
67 # for HTTP, we don't want to parse, as requests will anyway
68 return {"protocol": protocol, "path": urlpath}
70 options = {"protocol": protocol, "path": path}
72 if parsed_path.netloc:
73 # Parse `hostname` from netloc manually because `parsed_path.hostname`
74 # lowercases the hostname which is not always desirable (e.g. in S3):
75 # https://github.com/dask/dask/issues/1417
76 options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
78 if protocol in ("s3", "s3a", "gcs", "gs"):
79 options["path"] = options["host"] + options["path"]
80 else:
81 options["host"] = options["host"]
82 if parsed_path.port:
83 options["port"] = parsed_path.port
84 if parsed_path.username:
85 options["username"] = parsed_path.username
86 if parsed_path.password:
87 options["password"] = parsed_path.password
89 if parsed_path.query:
90 options["url_query"] = parsed_path.query
91 if parsed_path.fragment:
92 options["url_fragment"] = parsed_path.fragment
94 if inherit_storage_options:
95 update_storage_options(options, inherit_storage_options)
97 return options
100def update_storage_options(options, inherited=None):
101 if not inherited:
102 inherited = {}
103 collisions = set(options) & set(inherited)
104 if collisions:
105 for collision in collisions:
106 if options.get(collision) != inherited.get(collision):
107 raise KeyError(
108 "Collision between inferred and specified storage "
109 "option:\n%s" % collision
110 )
111 options.update(inherited)
114# Compression extensions registered via fsspec.compression.register_compression
115compressions: dict[str, str] = {}
118def infer_compression(filename):
119 """Infer compression, if available, from filename.
121 Infer a named compression type, if registered and available, from filename
122 extension. This includes builtin (gz, bz2, zip) compressions, as well as
123 optional compressions. See fsspec.compression.register_compression.
124 """
125 extension = os.path.splitext(filename)[-1].strip(".").lower()
126 if extension in compressions:
127 return compressions[extension]
130def build_name_function(max_int):
131 """Returns a function that receives a single integer
132 and returns it as a string padded by enough zero characters
133 to align with maximum possible integer
135 >>> name_f = build_name_function(57)
137 >>> name_f(7)
138 '07'
139 >>> name_f(31)
140 '31'
141 >>> build_name_function(1000)(42)
142 '0042'
143 >>> build_name_function(999)(42)
144 '042'
145 >>> build_name_function(0)(0)
146 '0'
147 """
148 # handle corner cases max_int is 0 or exact power of 10
149 max_int += 1e-8
151 pad_length = int(math.ceil(math.log10(max_int)))
153 def name_function(i):
154 return str(i).zfill(pad_length)
156 return name_function
159def seek_delimiter(file, delimiter, blocksize):
160 r"""Seek current file to file start, file end, or byte after delimiter seq.
162 Seeks file to next chunk delimiter, where chunks are defined on file start,
163 a delimiting sequence, and file end. Use file.tell() to see location afterwards.
164 Note that file start is a valid split, so must be at offset > 0 to seek for
165 delimiter.
167 Parameters
168 ----------
169 file: a file
170 delimiter: bytes
171 a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
172 blocksize: int
173 Number of bytes to read from the file at once.
176 Returns
177 -------
178 Returns True if a delimiter was found, False if at file start or end.
180 """
182 if file.tell() == 0:
183 # beginning-of-file, return without seek
184 return False
186 # Interface is for binary IO, with delimiter as bytes, but initialize last
187 # with result of file.read to preserve compatibility with text IO.
188 last = None
189 while True:
190 current = file.read(blocksize)
191 if not current:
192 # end-of-file without delimiter
193 return False
194 full = last + current if last else current
195 try:
196 if delimiter in full:
197 i = full.index(delimiter)
198 file.seek(file.tell() - (len(full) - i) + len(delimiter))
199 return True
200 elif len(current) < blocksize:
201 # end-of-file without delimiter
202 return False
203 except (OSError, ValueError):
204 pass
205 last = full[-len(delimiter) :]
208def read_block(f, offset, length, delimiter=None, split_before=False):
209 """Read a block of bytes from a file
211 Parameters
212 ----------
213 f: File
214 Open file
215 offset: int
216 Byte offset to start read
217 length: int
218 Number of bytes to read, read through end of file if None
219 delimiter: bytes (optional)
220 Ensure reading starts and stops at delimiter bytestring
221 split_before: bool (optional)
222 Start/stop read *before* delimiter bytestring.
225 If using the ``delimiter=`` keyword argument we ensure that the read
226 starts and stops at delimiter boundaries that follow the locations
227 ``offset`` and ``offset + length``. If ``offset`` is zero then we
228 start at zero, regardless of delimiter. The bytestring returned WILL
229 include the terminating delimiter string.
231 Examples
232 --------
234 >>> from io import BytesIO # doctest: +SKIP
235 >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
236 >>> read_block(f, 0, 13) # doctest: +SKIP
237 b'Alice, 100\\nBo'
239 >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
240 b'Alice, 100\\nBob, 200\\n'
242 >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
243 b'Bob, 200\\nCharlie, 300'
244 """
245 if delimiter:
246 f.seek(offset)
247 found_start_delim = seek_delimiter(f, delimiter, 2**16)
248 if length is None:
249 return f.read()
250 start = f.tell()
251 length -= start - offset
253 f.seek(start + length)
254 found_end_delim = seek_delimiter(f, delimiter, 2**16)
255 end = f.tell()
257 # Adjust split location to before delimiter iff seek found the
258 # delimiter sequence, not start or end of file.
259 if found_start_delim and split_before:
260 start -= len(delimiter)
262 if found_end_delim and split_before:
263 end -= len(delimiter)
265 offset = start
266 length = end - start
268 f.seek(offset)
269 b = f.read(length)
270 return b
273def tokenize(*args, **kwargs):
274 """Deterministic token
276 (modified from dask.base)
278 >>> tokenize([1, 2, '3'])
279 '9d71491b50023b06fc76928e6eddb952'
281 >>> tokenize('Hello') == tokenize('Hello')
282 True
283 """
284 if kwargs:
285 args += (kwargs,)
286 try:
287 return md5(str(args).encode()).hexdigest()
288 except ValueError:
289 # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380
290 return md5(str(args).encode(), usedforsecurity=False).hexdigest()
293def stringify_path(filepath):
294 """Attempt to convert a path-like object to a string.
296 Parameters
297 ----------
298 filepath: object to be converted
300 Returns
301 -------
302 filepath_str: maybe a string version of the object
304 Notes
305 -----
306 Objects supporting the fspath protocol are coerced according to its
307 __fspath__ method.
309 For backwards compatibility with older Python version, pathlib.Path
310 objects are specially coerced.
312 Any other object is passed through unchanged, which includes bytes,
313 strings, buffers, or anything else that's not even path-like.
314 """
315 if isinstance(filepath, str):
316 return filepath
317 elif hasattr(filepath, "__fspath__"):
318 return filepath.__fspath__()
319 elif isinstance(filepath, pathlib.Path):
320 return str(filepath)
321 elif hasattr(filepath, "path"):
322 return filepath.path
323 else:
324 return filepath
327def make_instance(cls, args, kwargs):
328 inst = cls(*args, **kwargs)
329 inst._determine_worker()
330 return inst
333def common_prefix(paths):
334 """For a list of paths, find the shortest prefix common to all"""
335 parts = [p.split("/") for p in paths]
336 lmax = min(len(p) for p in parts)
337 end = 0
338 for i in range(lmax):
339 end = all(p[i] == parts[0][i] for p in parts)
340 if not end:
341 break
342 i += end
343 return "/".join(parts[0][:i])
346def other_paths(paths, path2, is_dir=None, exists=False, flatten=False):
347 """In bulk file operations, construct a new file tree from a list of files
349 Parameters
350 ----------
351 paths: list of str
352 The input file tree
353 path2: str or list of str
354 Root to construct the new list in. If this is already a list of str, we just
355 assert it has the right number of elements.
356 is_dir: bool (optional)
357 For the special case where the input in one element, whether to regard the value
358 as the target path, or as a directory to put a file path within. If None, a
359 directory is inferred if the path ends in '/'
360 exists: bool (optional)
361 For a str destination, it is already exists (and is a dir), files should
362 end up inside.
363 flatten: bool (optional)
364 Whether to flatten the input directory tree structure so that the output files
365 are in the same directory.
367 Returns
368 -------
369 list of str
370 """
372 if isinstance(path2, str):
373 is_dir = is_dir or path2.endswith("/")
374 path2 = path2.rstrip("/")
376 if flatten:
377 path2 = ["/".join((path2, p.split("/")[-1])) for p in paths]
378 else:
379 cp = common_prefix(paths)
380 if exists:
381 cp = cp.rsplit("/", 1)[0]
382 if not cp and all(not s.startswith("/") for s in paths):
383 path2 = ["/".join([path2, p]) for p in paths]
384 else:
385 path2 = [p.replace(cp, path2, 1) for p in paths]
386 else:
387 assert len(paths) == len(path2)
388 return path2
391def is_exception(obj):
392 return isinstance(obj, BaseException)
395def isfilelike(f):
396 for attr in ["read", "close", "tell"]:
397 if not hasattr(f, attr):
398 return False
399 return True
402def get_protocol(url):
403 parts = re.split(r"(\:\:|\://)", url, 1)
404 if len(parts) > 1:
405 return parts[0]
406 return "file"
409def can_be_local(path):
410 """Can the given URL be used with open_local?"""
411 from fsspec import get_filesystem_class
413 try:
414 return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
415 except (ValueError, ImportError):
416 # not in registry or import failed
417 return False
420def get_package_version_without_import(name):
421 """For given package name, try to find the version without importing it
423 Import and package.__version__ is still the backup here, so an import
424 *might* happen.
426 Returns either the version string, or None if the package
427 or the version was not readily found.
428 """
429 if name in sys.modules:
430 mod = sys.modules[name]
431 if hasattr(mod, "__version__"):
432 return mod.__version__
433 try:
434 return version(name)
435 except: # noqa: E722
436 pass
437 try:
438 import importlib
440 mod = importlib.import_module(name)
441 return mod.__version__
442 except (ImportError, AttributeError):
443 return None
446def setup_logging(logger=None, logger_name=None, level="DEBUG", clear=True):
447 if logger is None and logger_name is None:
448 raise ValueError("Provide either logger object or logger name")
449 logger = logger or logging.getLogger(logger_name)
450 handle = logging.StreamHandler()
451 formatter = logging.Formatter(
452 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s"
453 )
454 handle.setFormatter(formatter)
455 if clear:
456 logger.handlers.clear()
457 logger.addHandler(handle)
458 logger.setLevel(level)
459 return logger
462def _unstrip_protocol(name, fs):
463 return fs.unstrip_protocol(name)
466def mirror_from(origin_name, methods):
467 """Mirror attributes and methods from the given
468 origin_name attribute of the instance to the
469 decorated class"""
471 def origin_getter(method, self):
472 origin = getattr(self, origin_name)
473 return getattr(origin, method)
475 def wrapper(cls):
476 for method in methods:
477 wrapped_method = partial(origin_getter, method)
478 setattr(cls, method, property(wrapped_method))
479 return cls
481 return wrapper
484@contextmanager
485def nullcontext(obj):
486 yield obj
489def merge_offset_ranges(paths, starts, ends, max_gap=0, max_block=None, sort=True):
490 """Merge adjacent byte-offset ranges when the inter-range
491 gap is <= `max_gap`, and when the merged byte range does not
492 exceed `max_block` (if specified). By default, this function
493 will re-order the input paths and byte ranges to ensure sorted
494 order. If the user can guarantee that the inputs are already
495 sorted, passing `sort=False` will skip the re-ordering.
496 """
497 # Check input
498 if not isinstance(paths, list):
499 raise TypeError
500 if not isinstance(starts, list):
501 starts = [starts] * len(paths)
502 if not isinstance(ends, list):
503 ends = [starts] * len(paths)
504 if len(starts) != len(paths) or len(ends) != len(paths):
505 raise ValueError
507 # Early Return
508 if len(starts) <= 1:
509 return paths, starts, ends
511 starts = [s or 0 for s in starts]
512 # Sort by paths and then ranges if `sort=True`
513 if sort:
514 paths, starts, ends = [
515 list(v)
516 for v in zip(
517 *sorted(
518 zip(paths, starts, ends),
519 )
520 )
521 ]
523 if paths:
524 # Loop through the coupled `paths`, `starts`, and
525 # `ends`, and merge adjacent blocks when appropriate
526 new_paths = paths[:1]
527 new_starts = starts[:1]
528 new_ends = ends[:1]
529 for i in range(1, len(paths)):
530 if paths[i] == paths[i - 1] and new_ends[-1] is None:
531 continue
532 elif (
533 paths[i] != paths[i - 1]
534 or ((starts[i] - new_ends[-1]) > max_gap)
535 or ((max_block is not None and (ends[i] - new_starts[-1]) > max_block))
536 ):
537 # Cannot merge with previous block.
538 # Add new `paths`, `starts`, and `ends` elements
539 new_paths.append(paths[i])
540 new_starts.append(starts[i])
541 new_ends.append(ends[i])
542 else:
543 # Merge with previous block by updating the
544 # last element of `ends`
545 new_ends[-1] = ends[i]
546 return new_paths, new_starts, new_ends
548 # `paths` is empty. Just return input lists
549 return paths, starts, ends
552def file_size(filelike):
553 """Find length of any open read-mode file-like"""
554 pos = filelike.tell()
555 try:
556 return filelike.seek(0, 2)
557 finally:
558 filelike.seek(pos)