Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/utils.py: 17%

242 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1from __future__ import annotations 

2 

3import logging 

4import math 

5import os 

6import pathlib 

7import re 

8import sys 

9from contextlib import contextmanager 

10from functools import partial 

11from hashlib import md5 

12from importlib.metadata import version 

13from urllib.parse import urlsplit 

14 

15DEFAULT_BLOCK_SIZE = 5 * 2**20 

16 

17 

18def infer_storage_options(urlpath, inherit_storage_options=None): 

19 """Infer storage options from URL path and merge it with existing storage 

20 options. 

21 

22 Parameters 

23 ---------- 

24 urlpath: str or unicode 

25 Either local absolute file path or URL (hdfs://namenode:8020/file.csv) 

26 inherit_storage_options: dict (optional) 

27 Its contents will get merged with the inferred information from the 

28 given path 

29 

30 Returns 

31 ------- 

32 Storage options dict. 

33 

34 Examples 

35 -------- 

36 >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP 

37 {"protocol": "file", "path", "/mnt/datasets/test.csv"} 

38 >>> infer_storage_options( 

39 ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1', 

40 ... inherit_storage_options={'extra': 'value'}, 

41 ... ) # doctest: +SKIP 

42 {"protocol": "hdfs", "username": "username", "password": "pwd", 

43 "host": "node", "port": 123, "path": "/mnt/datasets/test.csv", 

44 "url_query": "q=1", "extra": "value"} 

45 """ 

46 # Handle Windows paths including disk name in this special case 

47 if ( 

48 re.match(r"^[a-zA-Z]:[\\/]", urlpath) 

49 or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None 

50 ): 

51 return {"protocol": "file", "path": urlpath} 

52 

53 parsed_path = urlsplit(urlpath) 

54 protocol = parsed_path.scheme or "file" 

55 if parsed_path.fragment: 

56 path = "#".join([parsed_path.path, parsed_path.fragment]) 

57 else: 

58 path = parsed_path.path 

59 if protocol == "file": 

60 # Special case parsing file protocol URL on Windows according to: 

61 # https://msdn.microsoft.com/en-us/library/jj710207.aspx 

62 windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path) 

63 if windows_path: 

64 path = "%s:%s" % windows_path.groups() 

65 

66 if protocol in ["http", "https"]: 

67 # for HTTP, we don't want to parse, as requests will anyway 

68 return {"protocol": protocol, "path": urlpath} 

69 

70 options = {"protocol": protocol, "path": path} 

71 

72 if parsed_path.netloc: 

73 # Parse `hostname` from netloc manually because `parsed_path.hostname` 

74 # lowercases the hostname which is not always desirable (e.g. in S3): 

75 # https://github.com/dask/dask/issues/1417 

76 options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0] 

77 

78 if protocol in ("s3", "s3a", "gcs", "gs"): 

79 options["path"] = options["host"] + options["path"] 

80 else: 

81 options["host"] = options["host"] 

82 if parsed_path.port: 

83 options["port"] = parsed_path.port 

84 if parsed_path.username: 

85 options["username"] = parsed_path.username 

86 if parsed_path.password: 

87 options["password"] = parsed_path.password 

88 

89 if parsed_path.query: 

90 options["url_query"] = parsed_path.query 

91 if parsed_path.fragment: 

92 options["url_fragment"] = parsed_path.fragment 

93 

94 if inherit_storage_options: 

95 update_storage_options(options, inherit_storage_options) 

96 

97 return options 

98 

99 

100def update_storage_options(options, inherited=None): 

101 if not inherited: 

102 inherited = {} 

103 collisions = set(options) & set(inherited) 

104 if collisions: 

105 for collision in collisions: 

106 if options.get(collision) != inherited.get(collision): 

107 raise KeyError( 

108 "Collision between inferred and specified storage " 

109 "option:\n%s" % collision 

110 ) 

111 options.update(inherited) 

112 

113 

114# Compression extensions registered via fsspec.compression.register_compression 

115compressions: dict[str, str] = {} 

116 

117 

118def infer_compression(filename): 

119 """Infer compression, if available, from filename. 

120 

121 Infer a named compression type, if registered and available, from filename 

122 extension. This includes builtin (gz, bz2, zip) compressions, as well as 

123 optional compressions. See fsspec.compression.register_compression. 

124 """ 

125 extension = os.path.splitext(filename)[-1].strip(".").lower() 

126 if extension in compressions: 

127 return compressions[extension] 

128 

129 

130def build_name_function(max_int): 

131 """Returns a function that receives a single integer 

132 and returns it as a string padded by enough zero characters 

133 to align with maximum possible integer 

134 

135 >>> name_f = build_name_function(57) 

136 

137 >>> name_f(7) 

138 '07' 

139 >>> name_f(31) 

140 '31' 

141 >>> build_name_function(1000)(42) 

142 '0042' 

143 >>> build_name_function(999)(42) 

144 '042' 

145 >>> build_name_function(0)(0) 

146 '0' 

147 """ 

148 # handle corner cases max_int is 0 or exact power of 10 

149 max_int += 1e-8 

150 

151 pad_length = int(math.ceil(math.log10(max_int))) 

152 

153 def name_function(i): 

154 return str(i).zfill(pad_length) 

155 

156 return name_function 

157 

158 

159def seek_delimiter(file, delimiter, blocksize): 

160 r"""Seek current file to file start, file end, or byte after delimiter seq. 

161 

162 Seeks file to next chunk delimiter, where chunks are defined on file start, 

163 a delimiting sequence, and file end. Use file.tell() to see location afterwards. 

164 Note that file start is a valid split, so must be at offset > 0 to seek for 

165 delimiter. 

166 

167 Parameters 

168 ---------- 

169 file: a file 

170 delimiter: bytes 

171 a delimiter like ``b'\n'`` or message sentinel, matching file .read() type 

172 blocksize: int 

173 Number of bytes to read from the file at once. 

174 

175 

176 Returns 

177 ------- 

178 Returns True if a delimiter was found, False if at file start or end. 

179 

180 """ 

181 

182 if file.tell() == 0: 

183 # beginning-of-file, return without seek 

184 return False 

185 

186 # Interface is for binary IO, with delimiter as bytes, but initialize last 

187 # with result of file.read to preserve compatibility with text IO. 

188 last = None 

189 while True: 

190 current = file.read(blocksize) 

191 if not current: 

192 # end-of-file without delimiter 

193 return False 

194 full = last + current if last else current 

195 try: 

196 if delimiter in full: 

197 i = full.index(delimiter) 

198 file.seek(file.tell() - (len(full) - i) + len(delimiter)) 

199 return True 

200 elif len(current) < blocksize: 

201 # end-of-file without delimiter 

202 return False 

203 except (OSError, ValueError): 

204 pass 

205 last = full[-len(delimiter) :] 

206 

207 

208def read_block(f, offset, length, delimiter=None, split_before=False): 

209 """Read a block of bytes from a file 

210 

211 Parameters 

212 ---------- 

213 f: File 

214 Open file 

215 offset: int 

216 Byte offset to start read 

217 length: int 

218 Number of bytes to read, read through end of file if None 

219 delimiter: bytes (optional) 

220 Ensure reading starts and stops at delimiter bytestring 

221 split_before: bool (optional) 

222 Start/stop read *before* delimiter bytestring. 

223 

224 

225 If using the ``delimiter=`` keyword argument we ensure that the read 

226 starts and stops at delimiter boundaries that follow the locations 

227 ``offset`` and ``offset + length``. If ``offset`` is zero then we 

228 start at zero, regardless of delimiter. The bytestring returned WILL 

229 include the terminating delimiter string. 

230 

231 Examples 

232 -------- 

233 

234 >>> from io import BytesIO # doctest: +SKIP 

235 >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP 

236 >>> read_block(f, 0, 13) # doctest: +SKIP 

237 b'Alice, 100\\nBo' 

238 

239 >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP 

240 b'Alice, 100\\nBob, 200\\n' 

241 

242 >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP 

243 b'Bob, 200\\nCharlie, 300' 

244 """ 

245 if delimiter: 

246 f.seek(offset) 

247 found_start_delim = seek_delimiter(f, delimiter, 2**16) 

248 if length is None: 

249 return f.read() 

250 start = f.tell() 

251 length -= start - offset 

252 

253 f.seek(start + length) 

254 found_end_delim = seek_delimiter(f, delimiter, 2**16) 

255 end = f.tell() 

256 

257 # Adjust split location to before delimiter iff seek found the 

258 # delimiter sequence, not start or end of file. 

259 if found_start_delim and split_before: 

260 start -= len(delimiter) 

261 

262 if found_end_delim and split_before: 

263 end -= len(delimiter) 

264 

265 offset = start 

266 length = end - start 

267 

268 f.seek(offset) 

269 b = f.read(length) 

270 return b 

271 

272 

273def tokenize(*args, **kwargs): 

274 """Deterministic token 

275 

276 (modified from dask.base) 

277 

278 >>> tokenize([1, 2, '3']) 

279 '9d71491b50023b06fc76928e6eddb952' 

280 

281 >>> tokenize('Hello') == tokenize('Hello') 

282 True 

283 """ 

284 if kwargs: 

285 args += (kwargs,) 

286 try: 

287 return md5(str(args).encode()).hexdigest() 

288 except ValueError: 

289 # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380 

290 return md5(str(args).encode(), usedforsecurity=False).hexdigest() 

291 

292 

293def stringify_path(filepath): 

294 """Attempt to convert a path-like object to a string. 

295 

296 Parameters 

297 ---------- 

298 filepath: object to be converted 

299 

300 Returns 

301 ------- 

302 filepath_str: maybe a string version of the object 

303 

304 Notes 

305 ----- 

306 Objects supporting the fspath protocol are coerced according to its 

307 __fspath__ method. 

308 

309 For backwards compatibility with older Python version, pathlib.Path 

310 objects are specially coerced. 

311 

312 Any other object is passed through unchanged, which includes bytes, 

313 strings, buffers, or anything else that's not even path-like. 

314 """ 

315 if isinstance(filepath, str): 

316 return filepath 

317 elif hasattr(filepath, "__fspath__"): 

318 return filepath.__fspath__() 

319 elif isinstance(filepath, pathlib.Path): 

320 return str(filepath) 

321 elif hasattr(filepath, "path"): 

322 return filepath.path 

323 else: 

324 return filepath 

325 

326 

327def make_instance(cls, args, kwargs): 

328 inst = cls(*args, **kwargs) 

329 inst._determine_worker() 

330 return inst 

331 

332 

333def common_prefix(paths): 

334 """For a list of paths, find the shortest prefix common to all""" 

335 parts = [p.split("/") for p in paths] 

336 lmax = min(len(p) for p in parts) 

337 end = 0 

338 for i in range(lmax): 

339 end = all(p[i] == parts[0][i] for p in parts) 

340 if not end: 

341 break 

342 i += end 

343 return "/".join(parts[0][:i]) 

344 

345 

346def other_paths(paths, path2, is_dir=None, exists=False, flatten=False): 

347 """In bulk file operations, construct a new file tree from a list of files 

348 

349 Parameters 

350 ---------- 

351 paths: list of str 

352 The input file tree 

353 path2: str or list of str 

354 Root to construct the new list in. If this is already a list of str, we just 

355 assert it has the right number of elements. 

356 is_dir: bool (optional) 

357 For the special case where the input in one element, whether to regard the value 

358 as the target path, or as a directory to put a file path within. If None, a 

359 directory is inferred if the path ends in '/' 

360 exists: bool (optional) 

361 For a str destination, it is already exists (and is a dir), files should 

362 end up inside. 

363 flatten: bool (optional) 

364 Whether to flatten the input directory tree structure so that the output files 

365 are in the same directory. 

366 

367 Returns 

368 ------- 

369 list of str 

370 """ 

371 

372 if isinstance(path2, str): 

373 is_dir = is_dir or path2.endswith("/") 

374 path2 = path2.rstrip("/") 

375 

376 if flatten: 

377 path2 = ["/".join((path2, p.split("/")[-1])) for p in paths] 

378 else: 

379 cp = common_prefix(paths) 

380 if exists: 

381 cp = cp.rsplit("/", 1)[0] 

382 if not cp and all(not s.startswith("/") for s in paths): 

383 path2 = ["/".join([path2, p]) for p in paths] 

384 else: 

385 path2 = [p.replace(cp, path2, 1) for p in paths] 

386 else: 

387 assert len(paths) == len(path2) 

388 return path2 

389 

390 

391def is_exception(obj): 

392 return isinstance(obj, BaseException) 

393 

394 

395def isfilelike(f): 

396 for attr in ["read", "close", "tell"]: 

397 if not hasattr(f, attr): 

398 return False 

399 return True 

400 

401 

402def get_protocol(url): 

403 parts = re.split(r"(\:\:|\://)", url, 1) 

404 if len(parts) > 1: 

405 return parts[0] 

406 return "file" 

407 

408 

409def can_be_local(path): 

410 """Can the given URL be used with open_local?""" 

411 from fsspec import get_filesystem_class 

412 

413 try: 

414 return getattr(get_filesystem_class(get_protocol(path)), "local_file", False) 

415 except (ValueError, ImportError): 

416 # not in registry or import failed 

417 return False 

418 

419 

420def get_package_version_without_import(name): 

421 """For given package name, try to find the version without importing it 

422 

423 Import and package.__version__ is still the backup here, so an import 

424 *might* happen. 

425 

426 Returns either the version string, or None if the package 

427 or the version was not readily found. 

428 """ 

429 if name in sys.modules: 

430 mod = sys.modules[name] 

431 if hasattr(mod, "__version__"): 

432 return mod.__version__ 

433 try: 

434 return version(name) 

435 except: # noqa: E722 

436 pass 

437 try: 

438 import importlib 

439 

440 mod = importlib.import_module(name) 

441 return mod.__version__ 

442 except (ImportError, AttributeError): 

443 return None 

444 

445 

446def setup_logging(logger=None, logger_name=None, level="DEBUG", clear=True): 

447 if logger is None and logger_name is None: 

448 raise ValueError("Provide either logger object or logger name") 

449 logger = logger or logging.getLogger(logger_name) 

450 handle = logging.StreamHandler() 

451 formatter = logging.Formatter( 

452 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s" 

453 ) 

454 handle.setFormatter(formatter) 

455 if clear: 

456 logger.handlers.clear() 

457 logger.addHandler(handle) 

458 logger.setLevel(level) 

459 return logger 

460 

461 

462def _unstrip_protocol(name, fs): 

463 return fs.unstrip_protocol(name) 

464 

465 

466def mirror_from(origin_name, methods): 

467 """Mirror attributes and methods from the given 

468 origin_name attribute of the instance to the 

469 decorated class""" 

470 

471 def origin_getter(method, self): 

472 origin = getattr(self, origin_name) 

473 return getattr(origin, method) 

474 

475 def wrapper(cls): 

476 for method in methods: 

477 wrapped_method = partial(origin_getter, method) 

478 setattr(cls, method, property(wrapped_method)) 

479 return cls 

480 

481 return wrapper 

482 

483 

484@contextmanager 

485def nullcontext(obj): 

486 yield obj 

487 

488 

489def merge_offset_ranges(paths, starts, ends, max_gap=0, max_block=None, sort=True): 

490 """Merge adjacent byte-offset ranges when the inter-range 

491 gap is <= `max_gap`, and when the merged byte range does not 

492 exceed `max_block` (if specified). By default, this function 

493 will re-order the input paths and byte ranges to ensure sorted 

494 order. If the user can guarantee that the inputs are already 

495 sorted, passing `sort=False` will skip the re-ordering. 

496 """ 

497 # Check input 

498 if not isinstance(paths, list): 

499 raise TypeError 

500 if not isinstance(starts, list): 

501 starts = [starts] * len(paths) 

502 if not isinstance(ends, list): 

503 ends = [starts] * len(paths) 

504 if len(starts) != len(paths) or len(ends) != len(paths): 

505 raise ValueError 

506 

507 # Early Return 

508 if len(starts) <= 1: 

509 return paths, starts, ends 

510 

511 starts = [s or 0 for s in starts] 

512 # Sort by paths and then ranges if `sort=True` 

513 if sort: 

514 paths, starts, ends = [ 

515 list(v) 

516 for v in zip( 

517 *sorted( 

518 zip(paths, starts, ends), 

519 ) 

520 ) 

521 ] 

522 

523 if paths: 

524 # Loop through the coupled `paths`, `starts`, and 

525 # `ends`, and merge adjacent blocks when appropriate 

526 new_paths = paths[:1] 

527 new_starts = starts[:1] 

528 new_ends = ends[:1] 

529 for i in range(1, len(paths)): 

530 if paths[i] == paths[i - 1] and new_ends[-1] is None: 

531 continue 

532 elif ( 

533 paths[i] != paths[i - 1] 

534 or ((starts[i] - new_ends[-1]) > max_gap) 

535 or ((max_block is not None and (ends[i] - new_starts[-1]) > max_block)) 

536 ): 

537 # Cannot merge with previous block. 

538 # Add new `paths`, `starts`, and `ends` elements 

539 new_paths.append(paths[i]) 

540 new_starts.append(starts[i]) 

541 new_ends.append(ends[i]) 

542 else: 

543 # Merge with previous block by updating the 

544 # last element of `ends` 

545 new_ends[-1] = ends[i] 

546 return new_paths, new_starts, new_ends 

547 

548 # `paths` is empty. Just return input lists 

549 return paths, starts, ends 

550 

551 

552def file_size(filelike): 

553 """Find length of any open read-mode file-like""" 

554 pos = filelike.tell() 

555 try: 

556 return filelike.seek(0, 2) 

557 finally: 

558 filelike.seek(pos)