Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/utils.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

345 statements  

1from __future__ import annotations 

2 

3import contextlib 

4import logging 

5import math 

6import os 

7import re 

8import sys 

9import tempfile 

10from functools import partial 

11from hashlib import md5 

12from importlib.metadata import version 

13from typing import ( 

14 IO, 

15 TYPE_CHECKING, 

16 Any, 

17 Callable, 

18 Iterable, 

19 Iterator, 

20 Sequence, 

21 TypeVar, 

22) 

23from urllib.parse import urlsplit 

24 

25if TYPE_CHECKING: 

26 import pathlib 

27 

28 from typing_extensions import TypeGuard 

29 

30 from fsspec.spec import AbstractFileSystem 

31 

32 

33DEFAULT_BLOCK_SIZE = 5 * 2**20 

34 

35T = TypeVar("T") 

36 

37 

38def infer_storage_options( 

39 urlpath: str, inherit_storage_options: dict[str, Any] | None = None 

40) -> dict[str, Any]: 

41 """Infer storage options from URL path and merge it with existing storage 

42 options. 

43 

44 Parameters 

45 ---------- 

46 urlpath: str or unicode 

47 Either local absolute file path or URL (hdfs://namenode:8020/file.csv) 

48 inherit_storage_options: dict (optional) 

49 Its contents will get merged with the inferred information from the 

50 given path 

51 

52 Returns 

53 ------- 

54 Storage options dict. 

55 

56 Examples 

57 -------- 

58 >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP 

59 {"protocol": "file", "path", "/mnt/datasets/test.csv"} 

60 >>> infer_storage_options( 

61 ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1', 

62 ... inherit_storage_options={'extra': 'value'}, 

63 ... ) # doctest: +SKIP 

64 {"protocol": "hdfs", "username": "username", "password": "pwd", 

65 "host": "node", "port": 123, "path": "/mnt/datasets/test.csv", 

66 "url_query": "q=1", "extra": "value"} 

67 """ 

68 # Handle Windows paths including disk name in this special case 

69 if ( 

70 re.match(r"^[a-zA-Z]:[\\/]", urlpath) 

71 or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None 

72 ): 

73 return {"protocol": "file", "path": urlpath} 

74 

75 parsed_path = urlsplit(urlpath) 

76 protocol = parsed_path.scheme or "file" 

77 if parsed_path.fragment: 

78 path = "#".join([parsed_path.path, parsed_path.fragment]) 

79 else: 

80 path = parsed_path.path 

81 if protocol == "file": 

82 # Special case parsing file protocol URL on Windows according to: 

83 # https://msdn.microsoft.com/en-us/library/jj710207.aspx 

84 windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path) 

85 if windows_path: 

86 drive, path = windows_path.groups() 

87 path = f"{drive}:{path}" 

88 

89 if protocol in ["http", "https"]: 

90 # for HTTP, we don't want to parse, as requests will anyway 

91 return {"protocol": protocol, "path": urlpath} 

92 

93 options: dict[str, Any] = {"protocol": protocol, "path": path} 

94 

95 if parsed_path.netloc: 

96 # Parse `hostname` from netloc manually because `parsed_path.hostname` 

97 # lowercases the hostname which is not always desirable (e.g. in S3): 

98 # https://github.com/dask/dask/issues/1417 

99 options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0] 

100 

101 if protocol in ("s3", "s3a", "gcs", "gs"): 

102 options["path"] = options["host"] + options["path"] 

103 else: 

104 options["host"] = options["host"] 

105 if parsed_path.port: 

106 options["port"] = parsed_path.port 

107 if parsed_path.username: 

108 options["username"] = parsed_path.username 

109 if parsed_path.password: 

110 options["password"] = parsed_path.password 

111 

112 if parsed_path.query: 

113 options["url_query"] = parsed_path.query 

114 if parsed_path.fragment: 

115 options["url_fragment"] = parsed_path.fragment 

116 

117 if inherit_storage_options: 

118 update_storage_options(options, inherit_storage_options) 

119 

120 return options 

121 

122 

123def update_storage_options( 

124 options: dict[str, Any], inherited: dict[str, Any] | None = None 

125) -> None: 

126 if not inherited: 

127 inherited = {} 

128 collisions = set(options) & set(inherited) 

129 if collisions: 

130 for collision in collisions: 

131 if options.get(collision) != inherited.get(collision): 

132 raise KeyError( 

133 f"Collision between inferred and specified storage " 

134 f"option:\n{collision}" 

135 ) 

136 options.update(inherited) 

137 

138 

139# Compression extensions registered via fsspec.compression.register_compression 

140compressions: dict[str, str] = {} 

141 

142 

143def infer_compression(filename: str) -> str | None: 

144 """Infer compression, if available, from filename. 

145 

146 Infer a named compression type, if registered and available, from filename 

147 extension. This includes builtin (gz, bz2, zip) compressions, as well as 

148 optional compressions. See fsspec.compression.register_compression. 

149 """ 

150 extension = os.path.splitext(filename)[-1].strip(".").lower() 

151 if extension in compressions: 

152 return compressions[extension] 

153 return None 

154 

155 

156def build_name_function(max_int: float) -> Callable[[int], str]: 

157 """Returns a function that receives a single integer 

158 and returns it as a string padded by enough zero characters 

159 to align with maximum possible integer 

160 

161 >>> name_f = build_name_function(57) 

162 

163 >>> name_f(7) 

164 '07' 

165 >>> name_f(31) 

166 '31' 

167 >>> build_name_function(1000)(42) 

168 '0042' 

169 >>> build_name_function(999)(42) 

170 '042' 

171 >>> build_name_function(0)(0) 

172 '0' 

173 """ 

174 # handle corner cases max_int is 0 or exact power of 10 

175 max_int += 1e-8 

176 

177 pad_length = int(math.ceil(math.log10(max_int))) 

178 

179 def name_function(i: int) -> str: 

180 return str(i).zfill(pad_length) 

181 

182 return name_function 

183 

184 

185def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool: 

186 r"""Seek current file to file start, file end, or byte after delimiter seq. 

187 

188 Seeks file to next chunk delimiter, where chunks are defined on file start, 

189 a delimiting sequence, and file end. Use file.tell() to see location afterwards. 

190 Note that file start is a valid split, so must be at offset > 0 to seek for 

191 delimiter. 

192 

193 Parameters 

194 ---------- 

195 file: a file 

196 delimiter: bytes 

197 a delimiter like ``b'\n'`` or message sentinel, matching file .read() type 

198 blocksize: int 

199 Number of bytes to read from the file at once. 

200 

201 

202 Returns 

203 ------- 

204 Returns True if a delimiter was found, False if at file start or end. 

205 

206 """ 

207 

208 if file.tell() == 0: 

209 # beginning-of-file, return without seek 

210 return False 

211 

212 # Interface is for binary IO, with delimiter as bytes, but initialize last 

213 # with result of file.read to preserve compatibility with text IO. 

214 last: bytes | None = None 

215 while True: 

216 current = file.read(blocksize) 

217 if not current: 

218 # end-of-file without delimiter 

219 return False 

220 full = last + current if last else current 

221 try: 

222 if delimiter in full: 

223 i = full.index(delimiter) 

224 file.seek(file.tell() - (len(full) - i) + len(delimiter)) 

225 return True 

226 elif len(current) < blocksize: 

227 # end-of-file without delimiter 

228 return False 

229 except (OSError, ValueError): 

230 pass 

231 last = full[-len(delimiter) :] 

232 

233 

234def read_block( 

235 f: IO[bytes], 

236 offset: int, 

237 length: int | None, 

238 delimiter: bytes | None = None, 

239 split_before: bool = False, 

240) -> bytes: 

241 """Read a block of bytes from a file 

242 

243 Parameters 

244 ---------- 

245 f: File 

246 Open file 

247 offset: int 

248 Byte offset to start read 

249 length: int 

250 Number of bytes to read, read through end of file if None 

251 delimiter: bytes (optional) 

252 Ensure reading starts and stops at delimiter bytestring 

253 split_before: bool (optional) 

254 Start/stop read *before* delimiter bytestring. 

255 

256 

257 If using the ``delimiter=`` keyword argument we ensure that the read 

258 starts and stops at delimiter boundaries that follow the locations 

259 ``offset`` and ``offset + length``. If ``offset`` is zero then we 

260 start at zero, regardless of delimiter. The bytestring returned WILL 

261 include the terminating delimiter string. 

262 

263 Examples 

264 -------- 

265 

266 >>> from io import BytesIO # doctest: +SKIP 

267 >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP 

268 >>> read_block(f, 0, 13) # doctest: +SKIP 

269 b'Alice, 100\\nBo' 

270 

271 >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP 

272 b'Alice, 100\\nBob, 200\\n' 

273 

274 >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP 

275 b'Bob, 200\\nCharlie, 300' 

276 """ 

277 if delimiter: 

278 f.seek(offset) 

279 found_start_delim = seek_delimiter(f, delimiter, 2**16) 

280 if length is None: 

281 return f.read() 

282 start = f.tell() 

283 length -= start - offset 

284 

285 f.seek(start + length) 

286 found_end_delim = seek_delimiter(f, delimiter, 2**16) 

287 end = f.tell() 

288 

289 # Adjust split location to before delimiter if seek found the 

290 # delimiter sequence, not start or end of file. 

291 if found_start_delim and split_before: 

292 start -= len(delimiter) 

293 

294 if found_end_delim and split_before: 

295 end -= len(delimiter) 

296 

297 offset = start 

298 length = end - start 

299 

300 f.seek(offset) 

301 

302 # TODO: allow length to be None and read to the end of the file? 

303 assert length is not None 

304 b = f.read(length) 

305 return b 

306 

307 

308def tokenize(*args: Any, **kwargs: Any) -> str: 

309 """Deterministic token 

310 

311 (modified from dask.base) 

312 

313 >>> tokenize([1, 2, '3']) 

314 '9d71491b50023b06fc76928e6eddb952' 

315 

316 >>> tokenize('Hello') == tokenize('Hello') 

317 True 

318 """ 

319 if kwargs: 

320 args += (kwargs,) 

321 try: 

322 h = md5(str(args).encode()) 

323 except ValueError: 

324 # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380 

325 h = md5(str(args).encode(), usedforsecurity=False) 

326 return h.hexdigest() 

327 

328 

329def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str: 

330 """Attempt to convert a path-like object to a string. 

331 

332 Parameters 

333 ---------- 

334 filepath: object to be converted 

335 

336 Returns 

337 ------- 

338 filepath_str: maybe a string version of the object 

339 

340 Notes 

341 ----- 

342 Objects supporting the fspath protocol are coerced according to its 

343 __fspath__ method. 

344 

345 For backwards compatibility with older Python version, pathlib.Path 

346 objects are specially coerced. 

347 

348 Any other object is passed through unchanged, which includes bytes, 

349 strings, buffers, or anything else that's not even path-like. 

350 """ 

351 if isinstance(filepath, str): 

352 return filepath 

353 elif hasattr(filepath, "__fspath__"): 

354 return filepath.__fspath__() 

355 elif hasattr(filepath, "path"): 

356 return filepath.path 

357 else: 

358 return filepath # type: ignore[return-value] 

359 

360 

361def make_instance( 

362 cls: Callable[..., T], args: Sequence[Any], kwargs: dict[str, Any] 

363) -> T: 

364 inst = cls(*args, **kwargs) 

365 inst._determine_worker() # type: ignore[attr-defined] 

366 return inst 

367 

368 

369def common_prefix(paths: Iterable[str]) -> str: 

370 """For a list of paths, find the shortest prefix common to all""" 

371 parts = [p.split("/") for p in paths] 

372 lmax = min(len(p) for p in parts) 

373 end = 0 

374 for i in range(lmax): 

375 end = all(p[i] == parts[0][i] for p in parts) 

376 if not end: 

377 break 

378 i += end 

379 return "/".join(parts[0][:i]) 

380 

381 

382def other_paths( 

383 paths: list[str], 

384 path2: str | list[str], 

385 exists: bool = False, 

386 flatten: bool = False, 

387) -> list[str]: 

388 """In bulk file operations, construct a new file tree from a list of files 

389 

390 Parameters 

391 ---------- 

392 paths: list of str 

393 The input file tree 

394 path2: str or list of str 

395 Root to construct the new list in. If this is already a list of str, we just 

396 assert it has the right number of elements. 

397 exists: bool (optional) 

398 For a str destination, it is already exists (and is a dir), files should 

399 end up inside. 

400 flatten: bool (optional) 

401 Whether to flatten the input directory tree structure so that the output files 

402 are in the same directory. 

403 

404 Returns 

405 ------- 

406 list of str 

407 """ 

408 

409 if isinstance(path2, str): 

410 path2 = path2.rstrip("/") 

411 

412 if flatten: 

413 path2 = ["/".join((path2, p.split("/")[-1])) for p in paths] 

414 else: 

415 cp = common_prefix(paths) 

416 if exists: 

417 cp = cp.rsplit("/", 1)[0] 

418 if not cp and all(not s.startswith("/") for s in paths): 

419 path2 = ["/".join([path2, p]) for p in paths] 

420 else: 

421 path2 = [p.replace(cp, path2, 1) for p in paths] 

422 else: 

423 assert len(paths) == len(path2) 

424 return path2 

425 

426 

427def is_exception(obj: Any) -> bool: 

428 return isinstance(obj, BaseException) 

429 

430 

431def isfilelike(f: Any) -> TypeGuard[IO[bytes]]: 

432 return all(hasattr(f, attr) for attr in ["read", "close", "tell"]) 

433 

434 

435def get_protocol(url: str) -> str: 

436 url = stringify_path(url) 

437 parts = re.split(r"(\:\:|\://)", url, maxsplit=1) 

438 if len(parts) > 1: 

439 return parts[0] 

440 return "file" 

441 

442 

443def can_be_local(path: str) -> bool: 

444 """Can the given URL be used with open_local?""" 

445 from fsspec import get_filesystem_class 

446 

447 try: 

448 return getattr(get_filesystem_class(get_protocol(path)), "local_file", False) 

449 except (ValueError, ImportError): 

450 # not in registry or import failed 

451 return False 

452 

453 

454def get_package_version_without_import(name: str) -> str | None: 

455 """For given package name, try to find the version without importing it 

456 

457 Import and package.__version__ is still the backup here, so an import 

458 *might* happen. 

459 

460 Returns either the version string, or None if the package 

461 or the version was not readily found. 

462 """ 

463 if name in sys.modules: 

464 mod = sys.modules[name] 

465 if hasattr(mod, "__version__"): 

466 return mod.__version__ 

467 try: 

468 return version(name) 

469 except: # noqa: E722 

470 pass 

471 try: 

472 import importlib 

473 

474 mod = importlib.import_module(name) 

475 return mod.__version__ 

476 except (ImportError, AttributeError): 

477 return None 

478 

479 

480def setup_logging( 

481 logger: logging.Logger | None = None, 

482 logger_name: str | None = None, 

483 level: str = "DEBUG", 

484 clear: bool = True, 

485) -> logging.Logger: 

486 if logger is None and logger_name is None: 

487 raise ValueError("Provide either logger object or logger name") 

488 logger = logger or logging.getLogger(logger_name) 

489 handle = logging.StreamHandler() 

490 formatter = logging.Formatter( 

491 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s" 

492 ) 

493 handle.setFormatter(formatter) 

494 if clear: 

495 logger.handlers.clear() 

496 logger.addHandler(handle) 

497 logger.setLevel(level) 

498 return logger 

499 

500 

501def _unstrip_protocol(name: str, fs: AbstractFileSystem) -> str: 

502 return fs.unstrip_protocol(name) 

503 

504 

505def mirror_from( 

506 origin_name: str, methods: Iterable[str] 

507) -> Callable[[type[T]], type[T]]: 

508 """Mirror attributes and methods from the given 

509 origin_name attribute of the instance to the 

510 decorated class""" 

511 

512 def origin_getter(method: str, self: Any) -> Any: 

513 origin = getattr(self, origin_name) 

514 return getattr(origin, method) 

515 

516 def wrapper(cls: type[T]) -> type[T]: 

517 for method in methods: 

518 wrapped_method = partial(origin_getter, method) 

519 setattr(cls, method, property(wrapped_method)) 

520 return cls 

521 

522 return wrapper 

523 

524 

525@contextlib.contextmanager 

526def nullcontext(obj: T) -> Iterator[T]: 

527 yield obj 

528 

529 

530def merge_offset_ranges( 

531 paths: list[str], 

532 starts: list[int] | int, 

533 ends: list[int] | int, 

534 max_gap: int = 0, 

535 max_block: int | None = None, 

536 sort: bool = True, 

537) -> tuple[list[str], list[int], list[int]]: 

538 """Merge adjacent byte-offset ranges when the inter-range 

539 gap is <= `max_gap`, and when the merged byte range does not 

540 exceed `max_block` (if specified). By default, this function 

541 will re-order the input paths and byte ranges to ensure sorted 

542 order. If the user can guarantee that the inputs are already 

543 sorted, passing `sort=False` will skip the re-ordering. 

544 """ 

545 # Check input 

546 if not isinstance(paths, list): 

547 raise TypeError 

548 if not isinstance(starts, list): 

549 starts = [starts] * len(paths) 

550 if not isinstance(ends, list): 

551 ends = [ends] * len(paths) 

552 if len(starts) != len(paths) or len(ends) != len(paths): 

553 raise ValueError 

554 

555 # Early Return 

556 if len(starts) <= 1: 

557 return paths, starts, ends 

558 

559 starts = [s or 0 for s in starts] 

560 # Sort by paths and then ranges if `sort=True` 

561 if sort: 

562 paths, starts, ends = ( 

563 list(v) 

564 for v in zip( 

565 *sorted( 

566 zip(paths, starts, ends), 

567 ) 

568 ) 

569 ) 

570 

571 if paths: 

572 # Loop through the coupled `paths`, `starts`, and 

573 # `ends`, and merge adjacent blocks when appropriate 

574 new_paths = paths[:1] 

575 new_starts = starts[:1] 

576 new_ends = ends[:1] 

577 for i in range(1, len(paths)): 

578 if paths[i] == paths[i - 1] and new_ends[-1] is None: 

579 continue 

580 elif ( 

581 paths[i] != paths[i - 1] 

582 or ((starts[i] - new_ends[-1]) > max_gap) 

583 or (max_block is not None and (ends[i] - new_starts[-1]) > max_block) 

584 ): 

585 # Cannot merge with previous block. 

586 # Add new `paths`, `starts`, and `ends` elements 

587 new_paths.append(paths[i]) 

588 new_starts.append(starts[i]) 

589 new_ends.append(ends[i]) 

590 else: 

591 # Merge with previous block by updating the 

592 # last element of `ends` 

593 new_ends[-1] = ends[i] 

594 return new_paths, new_starts, new_ends 

595 

596 # `paths` is empty. Just return input lists 

597 return paths, starts, ends 

598 

599 

600def file_size(filelike: IO[bytes]) -> int: 

601 """Find length of any open read-mode file-like""" 

602 pos = filelike.tell() 

603 try: 

604 return filelike.seek(0, 2) 

605 finally: 

606 filelike.seek(pos) 

607 

608 

609@contextlib.contextmanager 

610def atomic_write(path: str, mode: str = "wb"): 

611 """ 

612 A context manager that opens a temporary file next to `path` and, on exit, 

613 replaces `path` with the temporary file, thereby updating `path` 

614 atomically. 

615 """ 

616 fd, fn = tempfile.mkstemp( 

617 dir=os.path.dirname(path), prefix=os.path.basename(path) + "-" 

618 ) 

619 try: 

620 with open(fd, mode) as fp: 

621 yield fp 

622 except BaseException: 

623 with contextlib.suppress(FileNotFoundError): 

624 os.unlink(fn) 

625 raise 

626 else: 

627 os.replace(fn, path) 

628 

629 

630def _translate(pat, STAR, QUESTION_MARK): 

631 # Copied from: https://github.com/python/cpython/pull/106703. 

632 res: list[str] = [] 

633 add = res.append 

634 i, n = 0, len(pat) 

635 while i < n: 

636 c = pat[i] 

637 i = i + 1 

638 if c == "*": 

639 # compress consecutive `*` into one 

640 if (not res) or res[-1] is not STAR: 

641 add(STAR) 

642 elif c == "?": 

643 add(QUESTION_MARK) 

644 elif c == "[": 

645 j = i 

646 if j < n and pat[j] == "!": 

647 j = j + 1 

648 if j < n and pat[j] == "]": 

649 j = j + 1 

650 while j < n and pat[j] != "]": 

651 j = j + 1 

652 if j >= n: 

653 add("\\[") 

654 else: 

655 stuff = pat[i:j] 

656 if "-" not in stuff: 

657 stuff = stuff.replace("\\", r"\\") 

658 else: 

659 chunks = [] 

660 k = i + 2 if pat[i] == "!" else i + 1 

661 while True: 

662 k = pat.find("-", k, j) 

663 if k < 0: 

664 break 

665 chunks.append(pat[i:k]) 

666 i = k + 1 

667 k = k + 3 

668 chunk = pat[i:j] 

669 if chunk: 

670 chunks.append(chunk) 

671 else: 

672 chunks[-1] += "-" 

673 # Remove empty ranges -- invalid in RE. 

674 for k in range(len(chunks) - 1, 0, -1): 

675 if chunks[k - 1][-1] > chunks[k][0]: 

676 chunks[k - 1] = chunks[k - 1][:-1] + chunks[k][1:] 

677 del chunks[k] 

678 # Escape backslashes and hyphens for set difference (--). 

679 # Hyphens that create ranges shouldn't be escaped. 

680 stuff = "-".join( 

681 s.replace("\\", r"\\").replace("-", r"\-") for s in chunks 

682 ) 

683 # Escape set operations (&&, ~~ and ||). 

684 stuff = re.sub(r"([&~|])", r"\\\1", stuff) 

685 i = j + 1 

686 if not stuff: 

687 # Empty range: never match. 

688 add("(?!)") 

689 elif stuff == "!": 

690 # Negated empty range: match any character. 

691 add(".") 

692 else: 

693 if stuff[0] == "!": 

694 stuff = "^" + stuff[1:] 

695 elif stuff[0] in ("^", "["): 

696 stuff = "\\" + stuff 

697 add(f"[{stuff}]") 

698 else: 

699 add(re.escape(c)) 

700 assert i == n 

701 return res 

702 

703 

704def glob_translate(pat): 

705 # Copied from: https://github.com/python/cpython/pull/106703. 

706 # The keyword parameters' values are fixed to: 

707 # recursive=True, include_hidden=True, seps=None 

708 """Translate a pathname with shell wildcards to a regular expression.""" 

709 if os.path.altsep: 

710 seps = os.path.sep + os.path.altsep 

711 else: 

712 seps = os.path.sep 

713 escaped_seps = "".join(map(re.escape, seps)) 

714 any_sep = f"[{escaped_seps}]" if len(seps) > 1 else escaped_seps 

715 not_sep = f"[^{escaped_seps}]" 

716 one_last_segment = f"{not_sep}+" 

717 one_segment = f"{one_last_segment}{any_sep}" 

718 any_segments = f"(?:.+{any_sep})?" 

719 any_last_segments = ".*" 

720 results = [] 

721 parts = re.split(any_sep, pat) 

722 last_part_idx = len(parts) - 1 

723 for idx, part in enumerate(parts): 

724 if part == "*": 

725 results.append(one_segment if idx < last_part_idx else one_last_segment) 

726 continue 

727 if part == "**": 

728 results.append(any_segments if idx < last_part_idx else any_last_segments) 

729 continue 

730 elif "**" in part: 

731 raise ValueError( 

732 "Invalid pattern: '**' can only be an entire path component" 

733 ) 

734 if part: 

735 results.extend(_translate(part, f"{not_sep}*", not_sep)) 

736 if idx < last_part_idx: 

737 results.append(any_sep) 

738 res = "".join(results) 

739 return rf"(?s:{res})\Z"