Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/io/common.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

449 statements  

1"""Common IO api utilities""" 

2from __future__ import annotations 

3 

4from abc import ( 

5 ABC, 

6 abstractmethod, 

7) 

8import codecs 

9from collections import defaultdict 

10from collections.abc import ( 

11 Hashable, 

12 Mapping, 

13 Sequence, 

14) 

15import dataclasses 

16import functools 

17import gzip 

18from io import ( 

19 BufferedIOBase, 

20 BytesIO, 

21 RawIOBase, 

22 StringIO, 

23 TextIOBase, 

24 TextIOWrapper, 

25) 

26import mmap 

27import os 

28from pathlib import Path 

29import re 

30import tarfile 

31from typing import ( 

32 IO, 

33 TYPE_CHECKING, 

34 Any, 

35 AnyStr, 

36 DefaultDict, 

37 Generic, 

38 Literal, 

39 TypeVar, 

40 cast, 

41 overload, 

42) 

43from urllib.parse import ( 

44 urljoin, 

45 urlparse as parse_url, 

46 uses_netloc, 

47 uses_params, 

48 uses_relative, 

49) 

50import warnings 

51import zipfile 

52 

53from pandas._typing import ( 

54 BaseBuffer, 

55 ReadCsvBuffer, 

56) 

57from pandas.compat import ( 

58 get_bz2_file, 

59 get_lzma_file, 

60) 

61from pandas.compat._optional import import_optional_dependency 

62from pandas.util._decorators import doc 

63from pandas.util._exceptions import find_stack_level 

64 

65from pandas.core.dtypes.common import ( 

66 is_bool, 

67 is_file_like, 

68 is_integer, 

69 is_list_like, 

70) 

71from pandas.core.dtypes.generic import ABCMultiIndex 

72 

73from pandas.core.shared_docs import _shared_docs 

74 

75_VALID_URLS = set(uses_relative + uses_netloc + uses_params) 

76_VALID_URLS.discard("") 

77_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://") 

78 

79BaseBufferT = TypeVar("BaseBufferT", bound=BaseBuffer) 

80 

81 

82if TYPE_CHECKING: 

83 from types import TracebackType 

84 

85 from pandas._typing import ( 

86 CompressionDict, 

87 CompressionOptions, 

88 FilePath, 

89 ReadBuffer, 

90 StorageOptions, 

91 WriteBuffer, 

92 ) 

93 

94 from pandas import MultiIndex 

95 

96 

97@dataclasses.dataclass 

98class IOArgs: 

99 """ 

100 Return value of io/common.py:_get_filepath_or_buffer. 

101 """ 

102 

103 filepath_or_buffer: str | BaseBuffer 

104 encoding: str 

105 mode: str 

106 compression: CompressionDict 

107 should_close: bool = False 

108 

109 

110@dataclasses.dataclass 

111class IOHandles(Generic[AnyStr]): 

112 """ 

113 Return value of io/common.py:get_handle 

114 

115 Can be used as a context manager. 

116 

117 This is used to easily close created buffers and to handle corner cases when 

118 TextIOWrapper is inserted. 

119 

120 handle: The file handle to be used. 

121 created_handles: All file handles that are created by get_handle 

122 is_wrapped: Whether a TextIOWrapper needs to be detached. 

123 """ 

124 

125 # handle might not implement the IO-interface 

126 handle: IO[AnyStr] 

127 compression: CompressionDict 

128 created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list) 

129 is_wrapped: bool = False 

130 

131 def close(self) -> None: 

132 """ 

133 Close all created buffers. 

134 

135 Note: If a TextIOWrapper was inserted, it is flushed and detached to 

136 avoid closing the potentially user-created buffer. 

137 """ 

138 if self.is_wrapped: 

139 assert isinstance(self.handle, TextIOWrapper) 

140 self.handle.flush() 

141 self.handle.detach() 

142 self.created_handles.remove(self.handle) 

143 for handle in self.created_handles: 

144 handle.close() 

145 self.created_handles = [] 

146 self.is_wrapped = False 

147 

148 def __enter__(self) -> IOHandles[AnyStr]: 

149 return self 

150 

151 def __exit__( 

152 self, 

153 exc_type: type[BaseException] | None, 

154 exc_value: BaseException | None, 

155 traceback: TracebackType | None, 

156 ) -> None: 

157 self.close() 

158 

159 

160def is_url(url: object) -> bool: 

161 """ 

162 Check to see if a URL has a valid protocol. 

163 

164 Parameters 

165 ---------- 

166 url : str or unicode 

167 

168 Returns 

169 ------- 

170 isurl : bool 

171 If `url` has a valid protocol return True otherwise False. 

172 """ 

173 if not isinstance(url, str): 

174 return False 

175 return parse_url(url).scheme in _VALID_URLS 

176 

177 

178@overload 

179def _expand_user(filepath_or_buffer: str) -> str: 

180 ... 

181 

182 

183@overload 

184def _expand_user(filepath_or_buffer: BaseBufferT) -> BaseBufferT: 

185 ... 

186 

187 

188def _expand_user(filepath_or_buffer: str | BaseBufferT) -> str | BaseBufferT: 

189 """ 

190 Return the argument with an initial component of ~ or ~user 

191 replaced by that user's home directory. 

192 

193 Parameters 

194 ---------- 

195 filepath_or_buffer : object to be converted if possible 

196 

197 Returns 

198 ------- 

199 expanded_filepath_or_buffer : an expanded filepath or the 

200 input if not expandable 

201 """ 

202 if isinstance(filepath_or_buffer, str): 

203 return os.path.expanduser(filepath_or_buffer) 

204 return filepath_or_buffer 

205 

206 

207def validate_header_arg(header: object) -> None: 

208 if header is None: 

209 return 

210 if is_integer(header): 

211 header = cast(int, header) 

212 if header < 0: 

213 # GH 27779 

214 raise ValueError( 

215 "Passing negative integer to header is invalid. " 

216 "For no header, use header=None instead" 

217 ) 

218 return 

219 if is_list_like(header, allow_sets=False): 

220 header = cast(Sequence, header) 

221 if not all(map(is_integer, header)): 

222 raise ValueError("header must be integer or list of integers") 

223 if any(i < 0 for i in header): 

224 raise ValueError("cannot specify multi-index header with negative integers") 

225 return 

226 if is_bool(header): 

227 raise TypeError( 

228 "Passing a bool to header is invalid. Use header=None for no header or " 

229 "header=int or list-like of ints to specify " 

230 "the row(s) making up the column names" 

231 ) 

232 # GH 16338 

233 raise ValueError("header must be integer or list of integers") 

234 

235 

236@overload 

237def stringify_path(filepath_or_buffer: FilePath, convert_file_like: bool = ...) -> str: 

238 ... 

239 

240 

241@overload 

242def stringify_path( 

243 filepath_or_buffer: BaseBufferT, convert_file_like: bool = ... 

244) -> BaseBufferT: 

245 ... 

246 

247 

248def stringify_path( 

249 filepath_or_buffer: FilePath | BaseBufferT, 

250 convert_file_like: bool = False, 

251) -> str | BaseBufferT: 

252 """ 

253 Attempt to convert a path-like object to a string. 

254 

255 Parameters 

256 ---------- 

257 filepath_or_buffer : object to be converted 

258 

259 Returns 

260 ------- 

261 str_filepath_or_buffer : maybe a string version of the object 

262 

263 Notes 

264 ----- 

265 Objects supporting the fspath protocol are coerced 

266 according to its __fspath__ method. 

267 

268 Any other object is passed through unchanged, which includes bytes, 

269 strings, buffers, or anything else that's not even path-like. 

270 """ 

271 if not convert_file_like and is_file_like(filepath_or_buffer): 

272 # GH 38125: some fsspec objects implement os.PathLike but have already opened a 

273 # file. This prevents opening the file a second time. infer_compression calls 

274 # this function with convert_file_like=True to infer the compression. 

275 return cast(BaseBufferT, filepath_or_buffer) 

276 

277 if isinstance(filepath_or_buffer, os.PathLike): 

278 filepath_or_buffer = filepath_or_buffer.__fspath__() 

279 return _expand_user(filepath_or_buffer) 

280 

281 

282def urlopen(*args, **kwargs): 

283 """ 

284 Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of 

285 the stdlib. 

286 """ 

287 import urllib.request 

288 

289 return urllib.request.urlopen(*args, **kwargs) 

290 

291 

292def is_fsspec_url(url: FilePath | BaseBuffer) -> bool: 

293 """ 

294 Returns true if the given URL looks like 

295 something fsspec can handle 

296 """ 

297 return ( 

298 isinstance(url, str) 

299 and bool(_RFC_3986_PATTERN.match(url)) 

300 and not url.startswith(("http://", "https://")) 

301 ) 

302 

303 

304@doc( 

305 storage_options=_shared_docs["storage_options"], 

306 compression_options=_shared_docs["compression_options"] % "filepath_or_buffer", 

307) 

308def _get_filepath_or_buffer( 

309 filepath_or_buffer: FilePath | BaseBuffer, 

310 encoding: str = "utf-8", 

311 compression: CompressionOptions | None = None, 

312 mode: str = "r", 

313 storage_options: StorageOptions | None = None, 

314) -> IOArgs: 

315 """ 

316 If the filepath_or_buffer is a url, translate and return the buffer. 

317 Otherwise passthrough. 

318 

319 Parameters 

320 ---------- 

321 filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), 

322 or buffer 

323 {compression_options} 

324 

325 .. versionchanged:: 1.4.0 Zstandard support. 

326 

327 encoding : the encoding to use to decode bytes, default is 'utf-8' 

328 mode : str, optional 

329 

330 {storage_options} 

331 

332 

333 Returns the dataclass IOArgs. 

334 """ 

335 filepath_or_buffer = stringify_path(filepath_or_buffer) 

336 

337 # handle compression dict 

338 compression_method, compression = get_compression_method(compression) 

339 compression_method = infer_compression(filepath_or_buffer, compression_method) 

340 

341 # GH21227 internal compression is not used for non-binary handles. 

342 if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode: 

343 warnings.warn( 

344 "compression has no effect when passing a non-binary object as input.", 

345 RuntimeWarning, 

346 stacklevel=find_stack_level(), 

347 ) 

348 compression_method = None 

349 

350 compression = dict(compression, method=compression_method) 

351 

352 # bz2 and xz do not write the byte order mark for utf-16 and utf-32 

353 # print a warning when writing such files 

354 if ( 

355 "w" in mode 

356 and compression_method in ["bz2", "xz"] 

357 and encoding in ["utf-16", "utf-32"] 

358 ): 

359 warnings.warn( 

360 f"{compression} will not write the byte order mark for {encoding}", 

361 UnicodeWarning, 

362 stacklevel=find_stack_level(), 

363 ) 

364 

365 # Use binary mode when converting path-like objects to file-like objects (fsspec) 

366 # except when text mode is explicitly requested. The original mode is returned if 

367 # fsspec is not used. 

368 fsspec_mode = mode 

369 if "t" not in fsspec_mode and "b" not in fsspec_mode: 

370 fsspec_mode += "b" 

371 

372 if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): 

373 # TODO: fsspec can also handle HTTP via requests, but leaving this 

374 # unchanged. using fsspec appears to break the ability to infer if the 

375 # server responded with gzipped data 

376 storage_options = storage_options or {} 

377 

378 # waiting until now for importing to match intended lazy logic of 

379 # urlopen function defined elsewhere in this module 

380 import urllib.request 

381 

382 # assuming storage_options is to be interpreted as headers 

383 req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options) 

384 with urlopen(req_info) as req: 

385 content_encoding = req.headers.get("Content-Encoding", None) 

386 if content_encoding == "gzip": 

387 # Override compression based on Content-Encoding header 

388 compression = {"method": "gzip"} 

389 reader = BytesIO(req.read()) 

390 return IOArgs( 

391 filepath_or_buffer=reader, 

392 encoding=encoding, 

393 compression=compression, 

394 should_close=True, 

395 mode=fsspec_mode, 

396 ) 

397 

398 if is_fsspec_url(filepath_or_buffer): 

399 assert isinstance( 

400 filepath_or_buffer, str 

401 ) # just to appease mypy for this branch 

402 # two special-case s3-like protocols; these have special meaning in Hadoop, 

403 # but are equivalent to just "s3" from fsspec's point of view 

404 # cc #11071 

405 if filepath_or_buffer.startswith("s3a://"): 

406 filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://") 

407 if filepath_or_buffer.startswith("s3n://"): 

408 filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://") 

409 fsspec = import_optional_dependency("fsspec") 

410 

411 # If botocore is installed we fallback to reading with anon=True 

412 # to allow reads from public buckets 

413 err_types_to_retry_with_anon: list[Any] = [] 

414 try: 

415 import_optional_dependency("botocore") 

416 from botocore.exceptions import ( 

417 ClientError, 

418 NoCredentialsError, 

419 ) 

420 

421 err_types_to_retry_with_anon = [ 

422 ClientError, 

423 NoCredentialsError, 

424 PermissionError, 

425 ] 

426 except ImportError: 

427 pass 

428 

429 try: 

430 file_obj = fsspec.open( 

431 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) 

432 ).open() 

433 # GH 34626 Reads from Public Buckets without Credentials needs anon=True 

434 except tuple(err_types_to_retry_with_anon): 

435 if storage_options is None: 

436 storage_options = {"anon": True} 

437 else: 

438 # don't mutate user input. 

439 storage_options = dict(storage_options) 

440 storage_options["anon"] = True 

441 file_obj = fsspec.open( 

442 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) 

443 ).open() 

444 

445 return IOArgs( 

446 filepath_or_buffer=file_obj, 

447 encoding=encoding, 

448 compression=compression, 

449 should_close=True, 

450 mode=fsspec_mode, 

451 ) 

452 elif storage_options: 

453 raise ValueError( 

454 "storage_options passed with file object or non-fsspec file path" 

455 ) 

456 

457 if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): 

458 return IOArgs( 

459 filepath_or_buffer=_expand_user(filepath_or_buffer), 

460 encoding=encoding, 

461 compression=compression, 

462 should_close=False, 

463 mode=mode, 

464 ) 

465 

466 # is_file_like requires (read | write) & __iter__ but __iter__ is only 

467 # needed for read_csv(engine=python) 

468 if not ( 

469 hasattr(filepath_or_buffer, "read") or hasattr(filepath_or_buffer, "write") 

470 ): 

471 msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}" 

472 raise ValueError(msg) 

473 

474 return IOArgs( 

475 filepath_or_buffer=filepath_or_buffer, 

476 encoding=encoding, 

477 compression=compression, 

478 should_close=False, 

479 mode=mode, 

480 ) 

481 

482 

483def file_path_to_url(path: str) -> str: 

484 """ 

485 converts an absolute native path to a FILE URL. 

486 

487 Parameters 

488 ---------- 

489 path : a path in native format 

490 

491 Returns 

492 ------- 

493 a valid FILE URL 

494 """ 

495 # lazify expensive import (~30ms) 

496 from urllib.request import pathname2url 

497 

498 return urljoin("file:", pathname2url(path)) 

499 

500 

501extension_to_compression = { 

502 ".tar": "tar", 

503 ".tar.gz": "tar", 

504 ".tar.bz2": "tar", 

505 ".tar.xz": "tar", 

506 ".gz": "gzip", 

507 ".bz2": "bz2", 

508 ".zip": "zip", 

509 ".xz": "xz", 

510 ".zst": "zstd", 

511} 

512_supported_compressions = set(extension_to_compression.values()) 

513 

514 

515def get_compression_method( 

516 compression: CompressionOptions, 

517) -> tuple[str | None, CompressionDict]: 

518 """ 

519 Simplifies a compression argument to a compression method string and 

520 a mapping containing additional arguments. 

521 

522 Parameters 

523 ---------- 

524 compression : str or mapping 

525 If string, specifies the compression method. If mapping, value at key 

526 'method' specifies compression method. 

527 

528 Returns 

529 ------- 

530 tuple of ({compression method}, Optional[str] 

531 {compression arguments}, Dict[str, Any]) 

532 

533 Raises 

534 ------ 

535 ValueError on mapping missing 'method' key 

536 """ 

537 compression_method: str | None 

538 if isinstance(compression, Mapping): 

539 compression_args = dict(compression) 

540 try: 

541 compression_method = compression_args.pop("method") 

542 except KeyError as err: 

543 raise ValueError("If mapping, compression must have key 'method'") from err 

544 else: 

545 compression_args = {} 

546 compression_method = compression 

547 return compression_method, compression_args 

548 

549 

550@doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer") 

551def infer_compression( 

552 filepath_or_buffer: FilePath | BaseBuffer, compression: str | None 

553) -> str | None: 

554 """ 

555 Get the compression method for filepath_or_buffer. If compression='infer', 

556 the inferred compression method is returned. Otherwise, the input 

557 compression method is returned unchanged, unless it's invalid, in which 

558 case an error is raised. 

559 

560 Parameters 

561 ---------- 

562 filepath_or_buffer : str or file handle 

563 File path or object. 

564 {compression_options} 

565 

566 .. versionchanged:: 1.4.0 Zstandard support. 

567 

568 Returns 

569 ------- 

570 string or None 

571 

572 Raises 

573 ------ 

574 ValueError on invalid compression specified. 

575 """ 

576 if compression is None: 

577 return None 

578 

579 # Infer compression 

580 if compression == "infer": 

581 # Convert all path types (e.g. pathlib.Path) to strings 

582 filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True) 

583 if not isinstance(filepath_or_buffer, str): 

584 # Cannot infer compression of a buffer, assume no compression 

585 return None 

586 

587 # Infer compression from the filename/URL extension 

588 for extension, compression in extension_to_compression.items(): 

589 if filepath_or_buffer.lower().endswith(extension): 

590 return compression 

591 return None 

592 

593 # Compression has been specified. Check that it's valid 

594 if compression in _supported_compressions: 

595 return compression 

596 

597 valid = ["infer", None] + sorted(_supported_compressions) 

598 msg = ( 

599 f"Unrecognized compression type: {compression}\n" 

600 f"Valid compression types are {valid}" 

601 ) 

602 raise ValueError(msg) 

603 

604 

605def check_parent_directory(path: Path | str) -> None: 

606 """ 

607 Check if parent directory of a file exists, raise OSError if it does not 

608 

609 Parameters 

610 ---------- 

611 path: Path or str 

612 Path to check parent directory of 

613 """ 

614 parent = Path(path).parent 

615 if not parent.is_dir(): 

616 raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'") 

617 

618 

619@overload 

620def get_handle( 

621 path_or_buf: FilePath | BaseBuffer, 

622 mode: str, 

623 *, 

624 encoding: str | None = ..., 

625 compression: CompressionOptions = ..., 

626 memory_map: bool = ..., 

627 is_text: Literal[False], 

628 errors: str | None = ..., 

629 storage_options: StorageOptions = ..., 

630) -> IOHandles[bytes]: 

631 ... 

632 

633 

634@overload 

635def get_handle( 

636 path_or_buf: FilePath | BaseBuffer, 

637 mode: str, 

638 *, 

639 encoding: str | None = ..., 

640 compression: CompressionOptions = ..., 

641 memory_map: bool = ..., 

642 is_text: Literal[True] = ..., 

643 errors: str | None = ..., 

644 storage_options: StorageOptions = ..., 

645) -> IOHandles[str]: 

646 ... 

647 

648 

649@overload 

650def get_handle( 

651 path_or_buf: FilePath | BaseBuffer, 

652 mode: str, 

653 *, 

654 encoding: str | None = ..., 

655 compression: CompressionOptions = ..., 

656 memory_map: bool = ..., 

657 is_text: bool = ..., 

658 errors: str | None = ..., 

659 storage_options: StorageOptions = ..., 

660) -> IOHandles[str] | IOHandles[bytes]: 

661 ... 

662 

663 

664@doc(compression_options=_shared_docs["compression_options"] % "path_or_buf") 

665def get_handle( 

666 path_or_buf: FilePath | BaseBuffer, 

667 mode: str, 

668 *, 

669 encoding: str | None = None, 

670 compression: CompressionOptions | None = None, 

671 memory_map: bool = False, 

672 is_text: bool = True, 

673 errors: str | None = None, 

674 storage_options: StorageOptions | None = None, 

675) -> IOHandles[str] | IOHandles[bytes]: 

676 """ 

677 Get file handle for given path/buffer and mode. 

678 

679 Parameters 

680 ---------- 

681 path_or_buf : str or file handle 

682 File path or object. 

683 mode : str 

684 Mode to open path_or_buf with. 

685 encoding : str or None 

686 Encoding to use. 

687 {compression_options} 

688 

689 May be a dict with key 'method' as compression mode 

690 and other keys as compression options if compression 

691 mode is 'zip'. 

692 

693 Passing compression options as keys in dict is 

694 supported for compression modes 'gzip', 'bz2', 'zstd' and 'zip'. 

695 

696 .. versionchanged:: 1.4.0 Zstandard support. 

697 

698 memory_map : bool, default False 

699 See parsers._parser_params for more information. Only used by read_csv. 

700 is_text : bool, default True 

701 Whether the type of the content passed to the file/buffer is string or 

702 bytes. This is not the same as `"b" not in mode`. If a string content is 

703 passed to a binary file/buffer, a wrapper is inserted. 

704 errors : str, default 'strict' 

705 Specifies how encoding and decoding errors are to be handled. 

706 See the errors argument for :func:`open` for a full list 

707 of options. 

708 storage_options: StorageOptions = None 

709 Passed to _get_filepath_or_buffer 

710 

711 Returns the dataclass IOHandles 

712 """ 

713 # Windows does not default to utf-8. Set to utf-8 for a consistent behavior 

714 encoding = encoding or "utf-8" 

715 

716 errors = errors or "strict" 

717 

718 # read_csv does not know whether the buffer is opened in binary/text mode 

719 if _is_binary_mode(path_or_buf, mode) and "b" not in mode: 

720 mode += "b" 

721 

722 # validate encoding and errors 

723 codecs.lookup(encoding) 

724 if isinstance(errors, str): 

725 codecs.lookup_error(errors) 

726 

727 # open URLs 

728 ioargs = _get_filepath_or_buffer( 

729 path_or_buf, 

730 encoding=encoding, 

731 compression=compression, 

732 mode=mode, 

733 storage_options=storage_options, 

734 ) 

735 

736 handle = ioargs.filepath_or_buffer 

737 handles: list[BaseBuffer] 

738 

739 # memory mapping needs to be the first step 

740 # only used for read_csv 

741 handle, memory_map, handles = _maybe_memory_map(handle, memory_map) 

742 

743 is_path = isinstance(handle, str) 

744 compression_args = dict(ioargs.compression) 

745 compression = compression_args.pop("method") 

746 

747 # Only for write methods 

748 if "r" not in mode and is_path: 

749 check_parent_directory(str(handle)) 

750 

751 if compression: 

752 if compression != "zstd": 

753 # compression libraries do not like an explicit text-mode 

754 ioargs.mode = ioargs.mode.replace("t", "") 

755 elif compression == "zstd" and "b" not in ioargs.mode: 

756 # python-zstandard defaults to text mode, but we always expect 

757 # compression libraries to use binary mode. 

758 ioargs.mode += "b" 

759 

760 # GZ Compression 

761 if compression == "gzip": 

762 if isinstance(handle, str): 

763 # error: Incompatible types in assignment (expression has type 

764 # "GzipFile", variable has type "Union[str, BaseBuffer]") 

765 handle = gzip.GzipFile( # type: ignore[assignment] 

766 filename=handle, 

767 mode=ioargs.mode, 

768 **compression_args, 

769 ) 

770 else: 

771 handle = gzip.GzipFile( 

772 # No overload variant of "GzipFile" matches argument types 

773 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]" 

774 fileobj=handle, # type: ignore[call-overload] 

775 mode=ioargs.mode, 

776 **compression_args, 

777 ) 

778 

779 # BZ Compression 

780 elif compression == "bz2": 

781 # Overload of "BZ2File" to handle pickle protocol 5 

782 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]" 

783 handle = get_bz2_file()( # type: ignore[call-overload] 

784 handle, 

785 mode=ioargs.mode, 

786 **compression_args, 

787 ) 

788 

789 # ZIP Compression 

790 elif compression == "zip": 

791 # error: Argument 1 to "_BytesZipFile" has incompatible type 

792 # "Union[str, BaseBuffer]"; expected "Union[Union[str, PathLike[str]], 

793 # ReadBuffer[bytes], WriteBuffer[bytes]]" 

794 handle = _BytesZipFile( 

795 handle, ioargs.mode, **compression_args # type: ignore[arg-type] 

796 ) 

797 if handle.buffer.mode == "r": 

798 handles.append(handle) 

799 zip_names = handle.buffer.namelist() 

800 if len(zip_names) == 1: 

801 handle = handle.buffer.open(zip_names.pop()) 

802 elif not zip_names: 

803 raise ValueError(f"Zero files found in ZIP file {path_or_buf}") 

804 else: 

805 raise ValueError( 

806 "Multiple files found in ZIP file. " 

807 f"Only one file per ZIP: {zip_names}" 

808 ) 

809 

810 # TAR Encoding 

811 elif compression == "tar": 

812 compression_args.setdefault("mode", ioargs.mode) 

813 if isinstance(handle, str): 

814 handle = _BytesTarFile(name=handle, **compression_args) 

815 else: 

816 # error: Argument "fileobj" to "_BytesTarFile" has incompatible 

817 # type "BaseBuffer"; expected "Union[ReadBuffer[bytes], 

818 # WriteBuffer[bytes], None]" 

819 handle = _BytesTarFile( 

820 fileobj=handle, **compression_args # type: ignore[arg-type] 

821 ) 

822 assert isinstance(handle, _BytesTarFile) 

823 if "r" in handle.buffer.mode: 

824 handles.append(handle) 

825 files = handle.buffer.getnames() 

826 if len(files) == 1: 

827 file = handle.buffer.extractfile(files[0]) 

828 assert file is not None 

829 handle = file 

830 elif not files: 

831 raise ValueError(f"Zero files found in TAR archive {path_or_buf}") 

832 else: 

833 raise ValueError( 

834 "Multiple files found in TAR archive. " 

835 f"Only one file per TAR archive: {files}" 

836 ) 

837 

838 # XZ Compression 

839 elif compression == "xz": 

840 # error: Argument 1 to "LZMAFile" has incompatible type "Union[str, 

841 # BaseBuffer]"; expected "Optional[Union[Union[str, bytes, PathLike[str], 

842 # PathLike[bytes]], IO[bytes]], None]" 

843 handle = get_lzma_file()( 

844 handle, ioargs.mode, **compression_args # type: ignore[arg-type] 

845 ) 

846 

847 # Zstd Compression 

848 elif compression == "zstd": 

849 zstd = import_optional_dependency("zstandard") 

850 if "r" in ioargs.mode: 

851 open_args = {"dctx": zstd.ZstdDecompressor(**compression_args)} 

852 else: 

853 open_args = {"cctx": zstd.ZstdCompressor(**compression_args)} 

854 handle = zstd.open( 

855 handle, 

856 mode=ioargs.mode, 

857 **open_args, 

858 ) 

859 

860 # Unrecognized Compression 

861 else: 

862 msg = f"Unrecognized compression type: {compression}" 

863 raise ValueError(msg) 

864 

865 assert not isinstance(handle, str) 

866 handles.append(handle) 

867 

868 elif isinstance(handle, str): 

869 # Check whether the filename is to be opened in binary mode. 

870 # Binary mode does not support 'encoding' and 'newline'. 

871 if ioargs.encoding and "b" not in ioargs.mode: 

872 # Encoding 

873 handle = open( 

874 handle, 

875 ioargs.mode, 

876 encoding=ioargs.encoding, 

877 errors=errors, 

878 newline="", 

879 ) 

880 else: 

881 # Binary mode 

882 handle = open(handle, ioargs.mode) 

883 handles.append(handle) 

884 

885 # Convert BytesIO or file objects passed with an encoding 

886 is_wrapped = False 

887 if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase): 

888 # not added to handles as it does not open/buffer resources 

889 handle = _BytesIOWrapper( 

890 handle, 

891 encoding=ioargs.encoding, 

892 ) 

893 elif is_text and ( 

894 compression or memory_map or _is_binary_mode(handle, ioargs.mode) 

895 ): 

896 if ( 

897 not hasattr(handle, "readable") 

898 or not hasattr(handle, "writable") 

899 or not hasattr(handle, "seekable") 

900 ): 

901 handle = _IOWrapper(handle) 

902 # error: Argument 1 to "TextIOWrapper" has incompatible type 

903 # "_IOWrapper"; expected "IO[bytes]" 

904 handle = TextIOWrapper( 

905 handle, # type: ignore[arg-type] 

906 encoding=ioargs.encoding, 

907 errors=errors, 

908 newline="", 

909 ) 

910 handles.append(handle) 

911 # only marked as wrapped when the caller provided a handle 

912 is_wrapped = not ( 

913 isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close 

914 ) 

915 

916 if "r" in ioargs.mode and not hasattr(handle, "read"): 

917 raise TypeError( 

918 "Expected file path name or file-like object, " 

919 f"got {type(ioargs.filepath_or_buffer)} type" 

920 ) 

921 

922 handles.reverse() # close the most recently added buffer first 

923 if ioargs.should_close: 

924 assert not isinstance(ioargs.filepath_or_buffer, str) 

925 handles.append(ioargs.filepath_or_buffer) 

926 

927 return IOHandles( 

928 # error: Argument "handle" to "IOHandles" has incompatible type 

929 # "Union[TextIOWrapper, GzipFile, BaseBuffer, typing.IO[bytes], 

930 # typing.IO[Any]]"; expected "pandas._typing.IO[Any]" 

931 handle=handle, # type: ignore[arg-type] 

932 # error: Argument "created_handles" to "IOHandles" has incompatible type 

933 # "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]" 

934 created_handles=handles, # type: ignore[arg-type] 

935 is_wrapped=is_wrapped, 

936 compression=ioargs.compression, 

937 ) 

938 

939 

940# error: Definition of "__enter__" in base class "IOBase" is incompatible 

941# with definition in base class "BinaryIO" 

942class _BufferedWriter(BytesIO, ABC): # type: ignore[misc] 

943 """ 

944 Some objects do not support multiple .write() calls (TarFile and ZipFile). 

945 This wrapper writes to the underlying buffer on close. 

946 """ 

947 

948 buffer = BytesIO() 

949 

950 @abstractmethod 

951 def write_to_buffer(self) -> None: 

952 ... 

953 

954 def close(self) -> None: 

955 if self.closed: 

956 # already closed 

957 return 

958 if self.getbuffer().nbytes: 

959 # write to buffer 

960 self.seek(0) 

961 with self.buffer: 

962 self.write_to_buffer() 

963 else: 

964 self.buffer.close() 

965 super().close() 

966 

967 

968class _BytesTarFile(_BufferedWriter): 

969 def __init__( 

970 self, 

971 name: str | None = None, 

972 mode: Literal["r", "a", "w", "x"] = "r", 

973 fileobj: ReadBuffer[bytes] | WriteBuffer[bytes] | None = None, 

974 archive_name: str | None = None, 

975 **kwargs, 

976 ) -> None: 

977 super().__init__() 

978 self.archive_name = archive_name 

979 self.name = name 

980 # error: Incompatible types in assignment (expression has type "TarFile", 

981 # base class "_BufferedWriter" defined the type as "BytesIO") 

982 self.buffer: tarfile.TarFile = tarfile.TarFile.open( # type: ignore[assignment] 

983 name=name, 

984 mode=self.extend_mode(mode), 

985 fileobj=fileobj, 

986 **kwargs, 

987 ) 

988 

989 def extend_mode(self, mode: str) -> str: 

990 mode = mode.replace("b", "") 

991 if mode != "w": 

992 return mode 

993 if self.name is not None: 

994 suffix = Path(self.name).suffix 

995 if suffix in (".gz", ".xz", ".bz2"): 

996 mode = f"{mode}:{suffix[1:]}" 

997 return mode 

998 

999 def infer_filename(self) -> str | None: 

1000 """ 

1001 If an explicit archive_name is not given, we still want the file inside the zip 

1002 file not to be named something.tar, because that causes confusion (GH39465). 

1003 """ 

1004 if self.name is None: 

1005 return None 

1006 

1007 filename = Path(self.name) 

1008 if filename.suffix == ".tar": 

1009 return filename.with_suffix("").name 

1010 elif filename.suffix in (".tar.gz", ".tar.bz2", ".tar.xz"): 

1011 return filename.with_suffix("").with_suffix("").name 

1012 return filename.name 

1013 

1014 def write_to_buffer(self) -> None: 

1015 # TarFile needs a non-empty string 

1016 archive_name = self.archive_name or self.infer_filename() or "tar" 

1017 tarinfo = tarfile.TarInfo(name=archive_name) 

1018 tarinfo.size = len(self.getvalue()) 

1019 self.buffer.addfile(tarinfo, self) 

1020 

1021 

1022class _BytesZipFile(_BufferedWriter): 

1023 def __init__( 

1024 self, 

1025 file: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], 

1026 mode: str, 

1027 archive_name: str | None = None, 

1028 **kwargs, 

1029 ) -> None: 

1030 super().__init__() 

1031 mode = mode.replace("b", "") 

1032 self.archive_name = archive_name 

1033 

1034 kwargs.setdefault("compression", zipfile.ZIP_DEFLATED) 

1035 # error: Incompatible types in assignment (expression has type "ZipFile", 

1036 # base class "_BufferedWriter" defined the type as "BytesIO") 

1037 self.buffer: zipfile.ZipFile = zipfile.ZipFile( # type: ignore[assignment] 

1038 file, mode, **kwargs 

1039 ) 

1040 

1041 def infer_filename(self) -> str | None: 

1042 """ 

1043 If an explicit archive_name is not given, we still want the file inside the zip 

1044 file not to be named something.zip, because that causes confusion (GH39465). 

1045 """ 

1046 if isinstance(self.buffer.filename, (os.PathLike, str)): 

1047 filename = Path(self.buffer.filename) 

1048 if filename.suffix == ".zip": 

1049 return filename.with_suffix("").name 

1050 return filename.name 

1051 return None 

1052 

1053 def write_to_buffer(self) -> None: 

1054 # ZipFile needs a non-empty string 

1055 archive_name = self.archive_name or self.infer_filename() or "zip" 

1056 self.buffer.writestr(archive_name, self.getvalue()) 

1057 

1058 

1059class _IOWrapper: 

1060 # TextIOWrapper is overly strict: it request that the buffer has seekable, readable, 

1061 # and writable. If we have a read-only buffer, we shouldn't need writable and vice 

1062 # versa. Some buffers, are seek/read/writ-able but they do not have the "-able" 

1063 # methods, e.g., tempfile.SpooledTemporaryFile. 

1064 # If a buffer does not have the above "-able" methods, we simple assume they are 

1065 # seek/read/writ-able. 

1066 def __init__(self, buffer: BaseBuffer) -> None: 

1067 self.buffer = buffer 

1068 

1069 def __getattr__(self, name: str): 

1070 return getattr(self.buffer, name) 

1071 

1072 def readable(self) -> bool: 

1073 if hasattr(self.buffer, "readable"): 

1074 return self.buffer.readable() 

1075 return True 

1076 

1077 def seekable(self) -> bool: 

1078 if hasattr(self.buffer, "seekable"): 

1079 return self.buffer.seekable() 

1080 return True 

1081 

1082 def writable(self) -> bool: 

1083 if hasattr(self.buffer, "writable"): 

1084 return self.buffer.writable() 

1085 return True 

1086 

1087 

1088class _BytesIOWrapper: 

1089 # Wrapper that wraps a StringIO buffer and reads bytes from it 

1090 # Created for compat with pyarrow read_csv 

1091 def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8") -> None: 

1092 self.buffer = buffer 

1093 self.encoding = encoding 

1094 # Because a character can be represented by more than 1 byte, 

1095 # it is possible that reading will produce more bytes than n 

1096 # We store the extra bytes in this overflow variable, and append the 

1097 # overflow to the front of the bytestring the next time reading is performed 

1098 self.overflow = b"" 

1099 

1100 def __getattr__(self, attr: str): 

1101 return getattr(self.buffer, attr) 

1102 

1103 def read(self, n: int | None = -1) -> bytes: 

1104 assert self.buffer is not None 

1105 bytestring = self.buffer.read(n).encode(self.encoding) 

1106 # When n=-1/n greater than remaining bytes: Read entire file/rest of file 

1107 combined_bytestring = self.overflow + bytestring 

1108 if n is None or n < 0 or n >= len(combined_bytestring): 

1109 self.overflow = b"" 

1110 return combined_bytestring 

1111 else: 

1112 to_return = combined_bytestring[:n] 

1113 self.overflow = combined_bytestring[n:] 

1114 return to_return 

1115 

1116 

1117def _maybe_memory_map( 

1118 handle: str | BaseBuffer, memory_map: bool 

1119) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]: 

1120 """Try to memory map file/buffer.""" 

1121 handles: list[BaseBuffer] = [] 

1122 memory_map &= hasattr(handle, "fileno") or isinstance(handle, str) 

1123 if not memory_map: 

1124 return handle, memory_map, handles 

1125 

1126 # mmap used by only read_csv 

1127 handle = cast(ReadCsvBuffer, handle) 

1128 

1129 # need to open the file first 

1130 if isinstance(handle, str): 

1131 handle = open(handle, "rb") 

1132 handles.append(handle) 

1133 

1134 try: 

1135 # open mmap and adds *-able 

1136 # error: Argument 1 to "_IOWrapper" has incompatible type "mmap"; 

1137 # expected "BaseBuffer" 

1138 wrapped = _IOWrapper( 

1139 mmap.mmap( 

1140 handle.fileno(), 0, access=mmap.ACCESS_READ # type: ignore[arg-type] 

1141 ) 

1142 ) 

1143 finally: 

1144 for handle in reversed(handles): 

1145 # error: "BaseBuffer" has no attribute "close" 

1146 handle.close() # type: ignore[attr-defined] 

1147 

1148 return wrapped, memory_map, [wrapped] 

1149 

1150 

1151def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool: 

1152 """Test whether file exists.""" 

1153 exists = False 

1154 filepath_or_buffer = stringify_path(filepath_or_buffer) 

1155 if not isinstance(filepath_or_buffer, str): 

1156 return exists 

1157 try: 

1158 exists = os.path.exists(filepath_or_buffer) 

1159 # gh-5874: if the filepath is too long will raise here 

1160 except (TypeError, ValueError): 

1161 pass 

1162 return exists 

1163 

1164 

1165def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool: 

1166 """Whether the handle is opened in binary mode""" 

1167 # specified by user 

1168 if "t" in mode or "b" in mode: 

1169 return "b" in mode 

1170 

1171 # exceptions 

1172 text_classes = ( 

1173 # classes that expect string but have 'b' in mode 

1174 codecs.StreamWriter, 

1175 codecs.StreamReader, 

1176 codecs.StreamReaderWriter, 

1177 ) 

1178 if issubclass(type(handle), text_classes): 

1179 return False 

1180 

1181 return isinstance(handle, _get_binary_io_classes()) or "b" in getattr( 

1182 handle, "mode", mode 

1183 ) 

1184 

1185 

1186@functools.lru_cache 

1187def _get_binary_io_classes() -> tuple[type, ...]: 

1188 """IO classes that that expect bytes""" 

1189 binary_classes: tuple[type, ...] = (BufferedIOBase, RawIOBase) 

1190 

1191 # python-zstandard doesn't use any of the builtin base classes; instead we 

1192 # have to use the `zstd.ZstdDecompressionReader` class for isinstance checks. 

1193 # Unfortunately `zstd.ZstdDecompressionReader` isn't exposed by python-zstandard 

1194 # so we have to get it from a `zstd.ZstdDecompressor` instance. 

1195 # See also https://github.com/indygreg/python-zstandard/pull/165. 

1196 zstd = import_optional_dependency("zstandard", errors="ignore") 

1197 if zstd is not None: 

1198 with zstd.ZstdDecompressor().stream_reader(b"") as reader: 

1199 binary_classes += (type(reader),) 

1200 

1201 return binary_classes 

1202 

1203 

1204def is_potential_multi_index( 

1205 columns: Sequence[Hashable] | MultiIndex, 

1206 index_col: bool | Sequence[int] | None = None, 

1207) -> bool: 

1208 """ 

1209 Check whether or not the `columns` parameter 

1210 could be converted into a MultiIndex. 

1211 

1212 Parameters 

1213 ---------- 

1214 columns : array-like 

1215 Object which may or may not be convertible into a MultiIndex 

1216 index_col : None, bool or list, optional 

1217 Column or columns to use as the (possibly hierarchical) index 

1218 

1219 Returns 

1220 ------- 

1221 bool : Whether or not columns could become a MultiIndex 

1222 """ 

1223 if index_col is None or isinstance(index_col, bool): 

1224 index_col = [] 

1225 

1226 return bool( 

1227 len(columns) 

1228 and not isinstance(columns, ABCMultiIndex) 

1229 and all(isinstance(c, tuple) for c in columns if c not in list(index_col)) 

1230 ) 

1231 

1232 

1233def dedup_names( 

1234 names: Sequence[Hashable], is_potential_multiindex: bool 

1235) -> Sequence[Hashable]: 

1236 """ 

1237 Rename column names if duplicates exist. 

1238 

1239 Currently the renaming is done by appending a period and an autonumeric, 

1240 but a custom pattern may be supported in the future. 

1241 

1242 Examples 

1243 -------- 

1244 >>> dedup_names(["x", "y", "x", "x"], is_potential_multiindex=False) 

1245 ['x', 'y', 'x.1', 'x.2'] 

1246 """ 

1247 names = list(names) # so we can index 

1248 counts: DefaultDict[Hashable, int] = defaultdict(int) 

1249 

1250 for i, col in enumerate(names): 

1251 cur_count = counts[col] 

1252 

1253 while cur_count > 0: 

1254 counts[col] = cur_count + 1 

1255 

1256 if is_potential_multiindex: 

1257 # for mypy 

1258 assert isinstance(col, tuple) 

1259 col = col[:-1] + (f"{col[-1]}.{cur_count}",) 

1260 else: 

1261 col = f"{col}.{cur_count}" 

1262 cur_count = counts[col] 

1263 

1264 names[i] = col 

1265 counts[col] = cur_count + 1 

1266 

1267 return names