Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/io/common.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

444 statements  

1"""Common IO api utilities""" 

2from __future__ import annotations 

3 

4from abc import ( 

5 ABC, 

6 abstractmethod, 

7) 

8import codecs 

9from collections import defaultdict 

10import dataclasses 

11import functools 

12import gzip 

13from io import ( 

14 BufferedIOBase, 

15 BytesIO, 

16 RawIOBase, 

17 StringIO, 

18 TextIOBase, 

19 TextIOWrapper, 

20) 

21import mmap 

22import os 

23from pathlib import Path 

24import re 

25import tarfile 

26from typing import ( 

27 IO, 

28 Any, 

29 AnyStr, 

30 DefaultDict, 

31 Generic, 

32 Hashable, 

33 Literal, 

34 Mapping, 

35 Sequence, 

36 TypeVar, 

37 cast, 

38 overload, 

39) 

40from urllib.parse import ( 

41 urljoin, 

42 urlparse as parse_url, 

43 uses_netloc, 

44 uses_params, 

45 uses_relative, 

46) 

47import warnings 

48import zipfile 

49 

50from pandas._typing import ( 

51 BaseBuffer, 

52 CompressionDict, 

53 CompressionOptions, 

54 FilePath, 

55 ReadBuffer, 

56 ReadCsvBuffer, 

57 StorageOptions, 

58 WriteBuffer, 

59) 

60from pandas.compat import get_lzma_file 

61from pandas.compat._optional import import_optional_dependency 

62from pandas.compat.compressors import BZ2File as _BZ2File 

63from pandas.util._decorators import doc 

64from pandas.util._exceptions import find_stack_level 

65 

66from pandas.core.dtypes.common import ( 

67 is_bool, 

68 is_file_like, 

69 is_integer, 

70 is_list_like, 

71) 

72 

73from pandas.core.indexes.api import MultiIndex 

74from pandas.core.shared_docs import _shared_docs 

75 

76_VALID_URLS = set(uses_relative + uses_netloc + uses_params) 

77_VALID_URLS.discard("") 

78_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://") 

79 

80BaseBufferT = TypeVar("BaseBufferT", bound=BaseBuffer) 

81 

82 

83@dataclasses.dataclass 

84class IOArgs: 

85 """ 

86 Return value of io/common.py:_get_filepath_or_buffer. 

87 """ 

88 

89 filepath_or_buffer: str | BaseBuffer 

90 encoding: str 

91 mode: str 

92 compression: CompressionDict 

93 should_close: bool = False 

94 

95 

96@dataclasses.dataclass 

97class IOHandles(Generic[AnyStr]): 

98 """ 

99 Return value of io/common.py:get_handle 

100 

101 Can be used as a context manager. 

102 

103 This is used to easily close created buffers and to handle corner cases when 

104 TextIOWrapper is inserted. 

105 

106 handle: The file handle to be used. 

107 created_handles: All file handles that are created by get_handle 

108 is_wrapped: Whether a TextIOWrapper needs to be detached. 

109 """ 

110 

111 # handle might not implement the IO-interface 

112 handle: IO[AnyStr] 

113 compression: CompressionDict 

114 created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list) 

115 is_wrapped: bool = False 

116 

117 def close(self) -> None: 

118 """ 

119 Close all created buffers. 

120 

121 Note: If a TextIOWrapper was inserted, it is flushed and detached to 

122 avoid closing the potentially user-created buffer. 

123 """ 

124 if self.is_wrapped: 

125 assert isinstance(self.handle, TextIOWrapper) 

126 self.handle.flush() 

127 self.handle.detach() 

128 self.created_handles.remove(self.handle) 

129 for handle in self.created_handles: 

130 handle.close() 

131 self.created_handles = [] 

132 self.is_wrapped = False 

133 

134 def __enter__(self) -> IOHandles[AnyStr]: 

135 return self 

136 

137 def __exit__(self, *args: Any) -> None: 

138 self.close() 

139 

140 

141def is_url(url: object) -> bool: 

142 """ 

143 Check to see if a URL has a valid protocol. 

144 

145 Parameters 

146 ---------- 

147 url : str or unicode 

148 

149 Returns 

150 ------- 

151 isurl : bool 

152 If `url` has a valid protocol return True otherwise False. 

153 """ 

154 if not isinstance(url, str): 

155 return False 

156 return parse_url(url).scheme in _VALID_URLS 

157 

158 

159@overload 

160def _expand_user(filepath_or_buffer: str) -> str: 

161 ... 

162 

163 

164@overload 

165def _expand_user(filepath_or_buffer: BaseBufferT) -> BaseBufferT: 

166 ... 

167 

168 

169def _expand_user(filepath_or_buffer: str | BaseBufferT) -> str | BaseBufferT: 

170 """ 

171 Return the argument with an initial component of ~ or ~user 

172 replaced by that user's home directory. 

173 

174 Parameters 

175 ---------- 

176 filepath_or_buffer : object to be converted if possible 

177 

178 Returns 

179 ------- 

180 expanded_filepath_or_buffer : an expanded filepath or the 

181 input if not expandable 

182 """ 

183 if isinstance(filepath_or_buffer, str): 

184 return os.path.expanduser(filepath_or_buffer) 

185 return filepath_or_buffer 

186 

187 

188def validate_header_arg(header: object) -> None: 

189 if header is None: 

190 return 

191 if is_integer(header): 

192 header = cast(int, header) 

193 if header < 0: 

194 # GH 27779 

195 raise ValueError( 

196 "Passing negative integer to header is invalid. " 

197 "For no header, use header=None instead" 

198 ) 

199 return 

200 if is_list_like(header, allow_sets=False): 

201 header = cast(Sequence, header) 

202 if not all(map(is_integer, header)): 

203 raise ValueError("header must be integer or list of integers") 

204 if any(i < 0 for i in header): 

205 raise ValueError("cannot specify multi-index header with negative integers") 

206 return 

207 if is_bool(header): 

208 raise TypeError( 

209 "Passing a bool to header is invalid. Use header=None for no header or " 

210 "header=int or list-like of ints to specify " 

211 "the row(s) making up the column names" 

212 ) 

213 # GH 16338 

214 raise ValueError("header must be integer or list of integers") 

215 

216 

217@overload 

218def stringify_path(filepath_or_buffer: FilePath, convert_file_like: bool = ...) -> str: 

219 ... 

220 

221 

222@overload 

223def stringify_path( 

224 filepath_or_buffer: BaseBufferT, convert_file_like: bool = ... 

225) -> BaseBufferT: 

226 ... 

227 

228 

229def stringify_path( 

230 filepath_or_buffer: FilePath | BaseBufferT, 

231 convert_file_like: bool = False, 

232) -> str | BaseBufferT: 

233 """ 

234 Attempt to convert a path-like object to a string. 

235 

236 Parameters 

237 ---------- 

238 filepath_or_buffer : object to be converted 

239 

240 Returns 

241 ------- 

242 str_filepath_or_buffer : maybe a string version of the object 

243 

244 Notes 

245 ----- 

246 Objects supporting the fspath protocol (python 3.6+) are coerced 

247 according to its __fspath__ method. 

248 

249 Any other object is passed through unchanged, which includes bytes, 

250 strings, buffers, or anything else that's not even path-like. 

251 """ 

252 if not convert_file_like and is_file_like(filepath_or_buffer): 

253 # GH 38125: some fsspec objects implement os.PathLike but have already opened a 

254 # file. This prevents opening the file a second time. infer_compression calls 

255 # this function with convert_file_like=True to infer the compression. 

256 return cast(BaseBufferT, filepath_or_buffer) 

257 

258 if isinstance(filepath_or_buffer, os.PathLike): 

259 filepath_or_buffer = filepath_or_buffer.__fspath__() 

260 return _expand_user(filepath_or_buffer) 

261 

262 

263def urlopen(*args, **kwargs): 

264 """ 

265 Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of 

266 the stdlib. 

267 """ 

268 import urllib.request 

269 

270 return urllib.request.urlopen(*args, **kwargs) 

271 

272 

273def is_fsspec_url(url: FilePath | BaseBuffer) -> bool: 

274 """ 

275 Returns true if the given URL looks like 

276 something fsspec can handle 

277 """ 

278 return ( 

279 isinstance(url, str) 

280 and bool(_RFC_3986_PATTERN.match(url)) 

281 and not url.startswith(("http://", "https://")) 

282 ) 

283 

284 

285@doc( 

286 storage_options=_shared_docs["storage_options"], 

287 compression_options=_shared_docs["compression_options"] % "filepath_or_buffer", 

288) 

289def _get_filepath_or_buffer( 

290 filepath_or_buffer: FilePath | BaseBuffer, 

291 encoding: str = "utf-8", 

292 compression: CompressionOptions = None, 

293 mode: str = "r", 

294 storage_options: StorageOptions = None, 

295) -> IOArgs: 

296 """ 

297 If the filepath_or_buffer is a url, translate and return the buffer. 

298 Otherwise passthrough. 

299 

300 Parameters 

301 ---------- 

302 filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), 

303 or buffer 

304 {compression_options} 

305 

306 .. versionchanged:: 1.4.0 Zstandard support. 

307 

308 encoding : the encoding to use to decode bytes, default is 'utf-8' 

309 mode : str, optional 

310 

311 {storage_options} 

312 

313 .. versionadded:: 1.2.0 

314 

315 ..versionchange:: 1.2.0 

316 

317 Returns the dataclass IOArgs. 

318 """ 

319 filepath_or_buffer = stringify_path(filepath_or_buffer) 

320 

321 # handle compression dict 

322 compression_method, compression = get_compression_method(compression) 

323 compression_method = infer_compression(filepath_or_buffer, compression_method) 

324 

325 # GH21227 internal compression is not used for non-binary handles. 

326 if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode: 

327 warnings.warn( 

328 "compression has no effect when passing a non-binary object as input.", 

329 RuntimeWarning, 

330 stacklevel=find_stack_level(), 

331 ) 

332 compression_method = None 

333 

334 compression = dict(compression, method=compression_method) 

335 

336 # bz2 and xz do not write the byte order mark for utf-16 and utf-32 

337 # print a warning when writing such files 

338 if ( 

339 "w" in mode 

340 and compression_method in ["bz2", "xz"] 

341 and encoding in ["utf-16", "utf-32"] 

342 ): 

343 warnings.warn( 

344 f"{compression} will not write the byte order mark for {encoding}", 

345 UnicodeWarning, 

346 stacklevel=find_stack_level(), 

347 ) 

348 

349 # Use binary mode when converting path-like objects to file-like objects (fsspec) 

350 # except when text mode is explicitly requested. The original mode is returned if 

351 # fsspec is not used. 

352 fsspec_mode = mode 

353 if "t" not in fsspec_mode and "b" not in fsspec_mode: 

354 fsspec_mode += "b" 

355 

356 if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): 

357 # TODO: fsspec can also handle HTTP via requests, but leaving this 

358 # unchanged. using fsspec appears to break the ability to infer if the 

359 # server responded with gzipped data 

360 storage_options = storage_options or {} 

361 

362 # waiting until now for importing to match intended lazy logic of 

363 # urlopen function defined elsewhere in this module 

364 import urllib.request 

365 

366 # assuming storage_options is to be interpreted as headers 

367 req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options) 

368 with urlopen(req_info) as req: 

369 content_encoding = req.headers.get("Content-Encoding", None) 

370 if content_encoding == "gzip": 

371 # Override compression based on Content-Encoding header 

372 compression = {"method": "gzip"} 

373 reader = BytesIO(req.read()) 

374 return IOArgs( 

375 filepath_or_buffer=reader, 

376 encoding=encoding, 

377 compression=compression, 

378 should_close=True, 

379 mode=fsspec_mode, 

380 ) 

381 

382 if is_fsspec_url(filepath_or_buffer): 

383 assert isinstance( 

384 filepath_or_buffer, str 

385 ) # just to appease mypy for this branch 

386 # two special-case s3-like protocols; these have special meaning in Hadoop, 

387 # but are equivalent to just "s3" from fsspec's point of view 

388 # cc #11071 

389 if filepath_or_buffer.startswith("s3a://"): 

390 filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://") 

391 if filepath_or_buffer.startswith("s3n://"): 

392 filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://") 

393 fsspec = import_optional_dependency("fsspec") 

394 

395 # If botocore is installed we fallback to reading with anon=True 

396 # to allow reads from public buckets 

397 err_types_to_retry_with_anon: list[Any] = [] 

398 try: 

399 import_optional_dependency("botocore") 

400 from botocore.exceptions import ( 

401 ClientError, 

402 NoCredentialsError, 

403 ) 

404 

405 err_types_to_retry_with_anon = [ 

406 ClientError, 

407 NoCredentialsError, 

408 PermissionError, 

409 ] 

410 except ImportError: 

411 pass 

412 

413 try: 

414 file_obj = fsspec.open( 

415 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) 

416 ).open() 

417 # GH 34626 Reads from Public Buckets without Credentials needs anon=True 

418 except tuple(err_types_to_retry_with_anon): 

419 if storage_options is None: 

420 storage_options = {"anon": True} 

421 else: 

422 # don't mutate user input. 

423 storage_options = dict(storage_options) 

424 storage_options["anon"] = True 

425 file_obj = fsspec.open( 

426 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) 

427 ).open() 

428 

429 return IOArgs( 

430 filepath_or_buffer=file_obj, 

431 encoding=encoding, 

432 compression=compression, 

433 should_close=True, 

434 mode=fsspec_mode, 

435 ) 

436 elif storage_options: 

437 raise ValueError( 

438 "storage_options passed with file object or non-fsspec file path" 

439 ) 

440 

441 if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): 

442 return IOArgs( 

443 filepath_or_buffer=_expand_user(filepath_or_buffer), 

444 encoding=encoding, 

445 compression=compression, 

446 should_close=False, 

447 mode=mode, 

448 ) 

449 

450 # is_file_like requires (read | write) & __iter__ but __iter__ is only 

451 # needed for read_csv(engine=python) 

452 if not ( 

453 hasattr(filepath_or_buffer, "read") or hasattr(filepath_or_buffer, "write") 

454 ): 

455 msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}" 

456 raise ValueError(msg) 

457 

458 return IOArgs( 

459 filepath_or_buffer=filepath_or_buffer, 

460 encoding=encoding, 

461 compression=compression, 

462 should_close=False, 

463 mode=mode, 

464 ) 

465 

466 

467def file_path_to_url(path: str) -> str: 

468 """ 

469 converts an absolute native path to a FILE URL. 

470 

471 Parameters 

472 ---------- 

473 path : a path in native format 

474 

475 Returns 

476 ------- 

477 a valid FILE URL 

478 """ 

479 # lazify expensive import (~30ms) 

480 from urllib.request import pathname2url 

481 

482 return urljoin("file:", pathname2url(path)) 

483 

484 

485extension_to_compression = { 

486 ".tar": "tar", 

487 ".tar.gz": "tar", 

488 ".tar.bz2": "tar", 

489 ".tar.xz": "tar", 

490 ".gz": "gzip", 

491 ".bz2": "bz2", 

492 ".zip": "zip", 

493 ".xz": "xz", 

494 ".zst": "zstd", 

495} 

496_supported_compressions = set(extension_to_compression.values()) 

497 

498 

499def get_compression_method( 

500 compression: CompressionOptions, 

501) -> tuple[str | None, CompressionDict]: 

502 """ 

503 Simplifies a compression argument to a compression method string and 

504 a mapping containing additional arguments. 

505 

506 Parameters 

507 ---------- 

508 compression : str or mapping 

509 If string, specifies the compression method. If mapping, value at key 

510 'method' specifies compression method. 

511 

512 Returns 

513 ------- 

514 tuple of ({compression method}, Optional[str] 

515 {compression arguments}, Dict[str, Any]) 

516 

517 Raises 

518 ------ 

519 ValueError on mapping missing 'method' key 

520 """ 

521 compression_method: str | None 

522 if isinstance(compression, Mapping): 

523 compression_args = dict(compression) 

524 try: 

525 compression_method = compression_args.pop("method") 

526 except KeyError as err: 

527 raise ValueError("If mapping, compression must have key 'method'") from err 

528 else: 

529 compression_args = {} 

530 compression_method = compression 

531 return compression_method, compression_args 

532 

533 

534@doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer") 

535def infer_compression( 

536 filepath_or_buffer: FilePath | BaseBuffer, compression: str | None 

537) -> str | None: 

538 """ 

539 Get the compression method for filepath_or_buffer. If compression='infer', 

540 the inferred compression method is returned. Otherwise, the input 

541 compression method is returned unchanged, unless it's invalid, in which 

542 case an error is raised. 

543 

544 Parameters 

545 ---------- 

546 filepath_or_buffer : str or file handle 

547 File path or object. 

548 {compression_options} 

549 

550 .. versionchanged:: 1.4.0 Zstandard support. 

551 

552 Returns 

553 ------- 

554 string or None 

555 

556 Raises 

557 ------ 

558 ValueError on invalid compression specified. 

559 """ 

560 if compression is None: 

561 return None 

562 

563 # Infer compression 

564 if compression == "infer": 

565 # Convert all path types (e.g. pathlib.Path) to strings 

566 filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True) 

567 if not isinstance(filepath_or_buffer, str): 

568 # Cannot infer compression of a buffer, assume no compression 

569 return None 

570 

571 # Infer compression from the filename/URL extension 

572 for extension, compression in extension_to_compression.items(): 

573 if filepath_or_buffer.lower().endswith(extension): 

574 return compression 

575 return None 

576 

577 # Compression has been specified. Check that it's valid 

578 if compression in _supported_compressions: 

579 return compression 

580 

581 valid = ["infer", None] + sorted(_supported_compressions) 

582 msg = ( 

583 f"Unrecognized compression type: {compression}\n" 

584 f"Valid compression types are {valid}" 

585 ) 

586 raise ValueError(msg) 

587 

588 

589def check_parent_directory(path: Path | str) -> None: 

590 """ 

591 Check if parent directory of a file exists, raise OSError if it does not 

592 

593 Parameters 

594 ---------- 

595 path: Path or str 

596 Path to check parent directory of 

597 """ 

598 parent = Path(path).parent 

599 if not parent.is_dir(): 

600 raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'") 

601 

602 

603@overload 

604def get_handle( 

605 path_or_buf: FilePath | BaseBuffer, 

606 mode: str, 

607 *, 

608 encoding: str | None = ..., 

609 compression: CompressionOptions = ..., 

610 memory_map: bool = ..., 

611 is_text: Literal[False], 

612 errors: str | None = ..., 

613 storage_options: StorageOptions = ..., 

614) -> IOHandles[bytes]: 

615 ... 

616 

617 

618@overload 

619def get_handle( 

620 path_or_buf: FilePath | BaseBuffer, 

621 mode: str, 

622 *, 

623 encoding: str | None = ..., 

624 compression: CompressionOptions = ..., 

625 memory_map: bool = ..., 

626 is_text: Literal[True] = ..., 

627 errors: str | None = ..., 

628 storage_options: StorageOptions = ..., 

629) -> IOHandles[str]: 

630 ... 

631 

632 

633@overload 

634def get_handle( 

635 path_or_buf: FilePath | BaseBuffer, 

636 mode: str, 

637 *, 

638 encoding: str | None = ..., 

639 compression: CompressionOptions = ..., 

640 memory_map: bool = ..., 

641 is_text: bool = ..., 

642 errors: str | None = ..., 

643 storage_options: StorageOptions = ..., 

644) -> IOHandles[str] | IOHandles[bytes]: 

645 ... 

646 

647 

648@doc(compression_options=_shared_docs["compression_options"] % "path_or_buf") 

649def get_handle( 

650 path_or_buf: FilePath | BaseBuffer, 

651 mode: str, 

652 *, 

653 encoding: str | None = None, 

654 compression: CompressionOptions = None, 

655 memory_map: bool = False, 

656 is_text: bool = True, 

657 errors: str | None = None, 

658 storage_options: StorageOptions = None, 

659) -> IOHandles[str] | IOHandles[bytes]: 

660 """ 

661 Get file handle for given path/buffer and mode. 

662 

663 Parameters 

664 ---------- 

665 path_or_buf : str or file handle 

666 File path or object. 

667 mode : str 

668 Mode to open path_or_buf with. 

669 encoding : str or None 

670 Encoding to use. 

671 {compression_options} 

672 

673 .. versionchanged:: 1.0.0 

674 May now be a dict with key 'method' as compression mode 

675 and other keys as compression options if compression 

676 mode is 'zip'. 

677 

678 .. versionchanged:: 1.1.0 

679 Passing compression options as keys in dict is now 

680 supported for compression modes 'gzip', 'bz2', 'zstd' and 'zip'. 

681 

682 .. versionchanged:: 1.4.0 Zstandard support. 

683 

684 memory_map : bool, default False 

685 See parsers._parser_params for more information. Only used by read_csv. 

686 is_text : bool, default True 

687 Whether the type of the content passed to the file/buffer is string or 

688 bytes. This is not the same as `"b" not in mode`. If a string content is 

689 passed to a binary file/buffer, a wrapper is inserted. 

690 errors : str, default 'strict' 

691 Specifies how encoding and decoding errors are to be handled. 

692 See the errors argument for :func:`open` for a full list 

693 of options. 

694 storage_options: StorageOptions = None 

695 Passed to _get_filepath_or_buffer 

696 

697 .. versionchanged:: 1.2.0 

698 

699 Returns the dataclass IOHandles 

700 """ 

701 # Windows does not default to utf-8. Set to utf-8 for a consistent behavior 

702 encoding = encoding or "utf-8" 

703 

704 errors = errors or "strict" 

705 

706 # read_csv does not know whether the buffer is opened in binary/text mode 

707 if _is_binary_mode(path_or_buf, mode) and "b" not in mode: 

708 mode += "b" 

709 

710 # validate encoding and errors 

711 codecs.lookup(encoding) 

712 if isinstance(errors, str): 

713 codecs.lookup_error(errors) 

714 

715 # open URLs 

716 ioargs = _get_filepath_or_buffer( 

717 path_or_buf, 

718 encoding=encoding, 

719 compression=compression, 

720 mode=mode, 

721 storage_options=storage_options, 

722 ) 

723 

724 handle = ioargs.filepath_or_buffer 

725 handles: list[BaseBuffer] 

726 

727 # memory mapping needs to be the first step 

728 # only used for read_csv 

729 handle, memory_map, handles = _maybe_memory_map(handle, memory_map) 

730 

731 is_path = isinstance(handle, str) 

732 compression_args = dict(ioargs.compression) 

733 compression = compression_args.pop("method") 

734 

735 # Only for write methods 

736 if "r" not in mode and is_path: 

737 check_parent_directory(str(handle)) 

738 

739 if compression: 

740 if compression != "zstd": 

741 # compression libraries do not like an explicit text-mode 

742 ioargs.mode = ioargs.mode.replace("t", "") 

743 elif compression == "zstd" and "b" not in ioargs.mode: 

744 # python-zstandard defaults to text mode, but we always expect 

745 # compression libraries to use binary mode. 

746 ioargs.mode += "b" 

747 

748 # GZ Compression 

749 if compression == "gzip": 

750 if isinstance(handle, str): 

751 # error: Incompatible types in assignment (expression has type 

752 # "GzipFile", variable has type "Union[str, BaseBuffer]") 

753 handle = gzip.GzipFile( # type: ignore[assignment] 

754 filename=handle, 

755 mode=ioargs.mode, 

756 **compression_args, 

757 ) 

758 else: 

759 handle = gzip.GzipFile( 

760 # No overload variant of "GzipFile" matches argument types 

761 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]" 

762 fileobj=handle, # type: ignore[call-overload] 

763 mode=ioargs.mode, 

764 **compression_args, 

765 ) 

766 

767 # BZ Compression 

768 elif compression == "bz2": 

769 # Overload of "BZ2File" to handle pickle protocol 5 

770 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]" 

771 handle = _BZ2File( # type: ignore[call-overload] 

772 handle, 

773 mode=ioargs.mode, 

774 **compression_args, 

775 ) 

776 

777 # ZIP Compression 

778 elif compression == "zip": 

779 # error: Argument 1 to "_BytesZipFile" has incompatible type 

780 # "Union[str, BaseBuffer]"; expected "Union[Union[str, PathLike[str]], 

781 # ReadBuffer[bytes], WriteBuffer[bytes]]" 

782 handle = _BytesZipFile( 

783 handle, ioargs.mode, **compression_args # type: ignore[arg-type] 

784 ) 

785 if handle.buffer.mode == "r": 

786 handles.append(handle) 

787 zip_names = handle.buffer.namelist() 

788 if len(zip_names) == 1: 

789 handle = handle.buffer.open(zip_names.pop()) 

790 elif not zip_names: 

791 raise ValueError(f"Zero files found in ZIP file {path_or_buf}") 

792 else: 

793 raise ValueError( 

794 "Multiple files found in ZIP file. " 

795 f"Only one file per ZIP: {zip_names}" 

796 ) 

797 

798 # TAR Encoding 

799 elif compression == "tar": 

800 compression_args.setdefault("mode", ioargs.mode) 

801 if isinstance(handle, str): 

802 handle = _BytesTarFile(name=handle, **compression_args) 

803 else: 

804 # error: Argument "fileobj" to "_BytesTarFile" has incompatible 

805 # type "BaseBuffer"; expected "Union[ReadBuffer[bytes], 

806 # WriteBuffer[bytes], None]" 

807 handle = _BytesTarFile( 

808 fileobj=handle, **compression_args # type: ignore[arg-type] 

809 ) 

810 assert isinstance(handle, _BytesTarFile) 

811 if "r" in handle.buffer.mode: 

812 handles.append(handle) 

813 files = handle.buffer.getnames() 

814 if len(files) == 1: 

815 file = handle.buffer.extractfile(files[0]) 

816 assert file is not None 

817 handle = file 

818 elif not files: 

819 raise ValueError(f"Zero files found in TAR archive {path_or_buf}") 

820 else: 

821 raise ValueError( 

822 "Multiple files found in TAR archive. " 

823 f"Only one file per TAR archive: {files}" 

824 ) 

825 

826 # XZ Compression 

827 elif compression == "xz": 

828 # error: Argument 1 to "LZMAFile" has incompatible type "Union[str, 

829 # BaseBuffer]"; expected "Optional[Union[Union[str, bytes, PathLike[str], 

830 # PathLike[bytes]], IO[bytes]]]" 

831 handle = get_lzma_file()(handle, ioargs.mode) # type: ignore[arg-type] 

832 

833 # Zstd Compression 

834 elif compression == "zstd": 

835 zstd = import_optional_dependency("zstandard") 

836 if "r" in ioargs.mode: 

837 open_args = {"dctx": zstd.ZstdDecompressor(**compression_args)} 

838 else: 

839 open_args = {"cctx": zstd.ZstdCompressor(**compression_args)} 

840 handle = zstd.open( 

841 handle, 

842 mode=ioargs.mode, 

843 **open_args, 

844 ) 

845 

846 # Unrecognized Compression 

847 else: 

848 msg = f"Unrecognized compression type: {compression}" 

849 raise ValueError(msg) 

850 

851 assert not isinstance(handle, str) 

852 handles.append(handle) 

853 

854 elif isinstance(handle, str): 

855 # Check whether the filename is to be opened in binary mode. 

856 # Binary mode does not support 'encoding' and 'newline'. 

857 if ioargs.encoding and "b" not in ioargs.mode: 

858 # Encoding 

859 handle = open( 

860 handle, 

861 ioargs.mode, 

862 encoding=ioargs.encoding, 

863 errors=errors, 

864 newline="", 

865 ) 

866 else: 

867 # Binary mode 

868 handle = open(handle, ioargs.mode) 

869 handles.append(handle) 

870 

871 # Convert BytesIO or file objects passed with an encoding 

872 is_wrapped = False 

873 if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase): 

874 # not added to handles as it does not open/buffer resources 

875 handle = _BytesIOWrapper( 

876 handle, 

877 encoding=ioargs.encoding, 

878 ) 

879 elif is_text and ( 

880 compression or memory_map or _is_binary_mode(handle, ioargs.mode) 

881 ): 

882 if ( 

883 not hasattr(handle, "readable") 

884 or not hasattr(handle, "writable") 

885 or not hasattr(handle, "seekable") 

886 ): 

887 handle = _IOWrapper(handle) 

888 # error: Argument 1 to "TextIOWrapper" has incompatible type 

889 # "_IOWrapper"; expected "IO[bytes]" 

890 handle = TextIOWrapper( 

891 handle, # type: ignore[arg-type] 

892 encoding=ioargs.encoding, 

893 errors=errors, 

894 newline="", 

895 ) 

896 handles.append(handle) 

897 # only marked as wrapped when the caller provided a handle 

898 is_wrapped = not ( 

899 isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close 

900 ) 

901 

902 if "r" in ioargs.mode and not hasattr(handle, "read"): 

903 raise TypeError( 

904 "Expected file path name or file-like object, " 

905 f"got {type(ioargs.filepath_or_buffer)} type" 

906 ) 

907 

908 handles.reverse() # close the most recently added buffer first 

909 if ioargs.should_close: 

910 assert not isinstance(ioargs.filepath_or_buffer, str) 

911 handles.append(ioargs.filepath_or_buffer) 

912 

913 return IOHandles( 

914 # error: Argument "handle" to "IOHandles" has incompatible type 

915 # "Union[TextIOWrapper, GzipFile, BaseBuffer, typing.IO[bytes], 

916 # typing.IO[Any]]"; expected "pandas._typing.IO[Any]" 

917 handle=handle, # type: ignore[arg-type] 

918 # error: Argument "created_handles" to "IOHandles" has incompatible type 

919 # "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]" 

920 created_handles=handles, # type: ignore[arg-type] 

921 is_wrapped=is_wrapped, 

922 compression=ioargs.compression, 

923 ) 

924 

925 

926# error: Definition of "__enter__" in base class "IOBase" is incompatible 

927# with definition in base class "BinaryIO" 

928class _BufferedWriter(BytesIO, ABC): # type: ignore[misc] 

929 """ 

930 Some objects do not support multiple .write() calls (TarFile and ZipFile). 

931 This wrapper writes to the underlying buffer on close. 

932 """ 

933 

934 @abstractmethod 

935 def write_to_buffer(self) -> None: 

936 ... 

937 

938 def close(self) -> None: 

939 if self.closed: 

940 # already closed 

941 return 

942 if self.getvalue(): 

943 # write to buffer 

944 self.seek(0) 

945 # error: "_BufferedWriter" has no attribute "buffer" 

946 with self.buffer: # type: ignore[attr-defined] 

947 self.write_to_buffer() 

948 else: 

949 # error: "_BufferedWriter" has no attribute "buffer" 

950 self.buffer.close() # type: ignore[attr-defined] 

951 super().close() 

952 

953 

954class _BytesTarFile(_BufferedWriter): 

955 def __init__( 

956 self, 

957 name: str | None = None, 

958 mode: Literal["r", "a", "w", "x"] = "r", 

959 fileobj: ReadBuffer[bytes] | WriteBuffer[bytes] | None = None, 

960 archive_name: str | None = None, 

961 **kwargs, 

962 ) -> None: 

963 super().__init__() 

964 self.archive_name = archive_name 

965 self.name = name 

966 # error: Argument "fileobj" to "open" of "TarFile" has incompatible 

967 # type "Union[ReadBuffer[bytes], WriteBuffer[bytes], None]"; expected 

968 # "Optional[IO[bytes]]" 

969 self.buffer = tarfile.TarFile.open( 

970 name=name, 

971 mode=self.extend_mode(mode), 

972 fileobj=fileobj, # type: ignore[arg-type] 

973 **kwargs, 

974 ) 

975 

976 def extend_mode(self, mode: str) -> str: 

977 mode = mode.replace("b", "") 

978 if mode != "w": 

979 return mode 

980 if self.name is not None: 

981 suffix = Path(self.name).suffix 

982 if suffix in (".gz", ".xz", ".bz2"): 

983 mode = f"{mode}:{suffix[1:]}" 

984 return mode 

985 

986 def infer_filename(self) -> str | None: 

987 """ 

988 If an explicit archive_name is not given, we still want the file inside the zip 

989 file not to be named something.tar, because that causes confusion (GH39465). 

990 """ 

991 if self.name is None: 

992 return None 

993 

994 filename = Path(self.name) 

995 if filename.suffix == ".tar": 

996 return filename.with_suffix("").name 

997 elif filename.suffix in (".tar.gz", ".tar.bz2", ".tar.xz"): 

998 return filename.with_suffix("").with_suffix("").name 

999 return filename.name 

1000 

1001 def write_to_buffer(self) -> None: 

1002 # TarFile needs a non-empty string 

1003 archive_name = self.archive_name or self.infer_filename() or "tar" 

1004 tarinfo = tarfile.TarInfo(name=archive_name) 

1005 tarinfo.size = len(self.getvalue()) 

1006 self.buffer.addfile(tarinfo, self) 

1007 

1008 

1009class _BytesZipFile(_BufferedWriter): 

1010 def __init__( 

1011 self, 

1012 file: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], 

1013 mode: str, 

1014 archive_name: str | None = None, 

1015 **kwargs, 

1016 ) -> None: 

1017 super().__init__() 

1018 mode = mode.replace("b", "") 

1019 self.archive_name = archive_name 

1020 

1021 kwargs.setdefault("compression", zipfile.ZIP_DEFLATED) 

1022 # error: Argument 1 to "ZipFile" has incompatible type "Union[ 

1023 # Union[str, PathLike[str]], ReadBuffer[bytes], WriteBuffer[bytes]]"; 

1024 # expected "Union[Union[str, PathLike[str]], IO[bytes]]" 

1025 self.buffer = zipfile.ZipFile(file, mode, **kwargs) # type: ignore[arg-type] 

1026 

1027 def infer_filename(self) -> str | None: 

1028 """ 

1029 If an explicit archive_name is not given, we still want the file inside the zip 

1030 file not to be named something.zip, because that causes confusion (GH39465). 

1031 """ 

1032 if isinstance(self.buffer.filename, (os.PathLike, str)): 

1033 filename = Path(self.buffer.filename) 

1034 if filename.suffix == ".zip": 

1035 return filename.with_suffix("").name 

1036 return filename.name 

1037 return None 

1038 

1039 def write_to_buffer(self) -> None: 

1040 # ZipFile needs a non-empty string 

1041 archive_name = self.archive_name or self.infer_filename() or "zip" 

1042 self.buffer.writestr(archive_name, self.getvalue()) 

1043 

1044 

1045class _IOWrapper: 

1046 # TextIOWrapper is overly strict: it request that the buffer has seekable, readable, 

1047 # and writable. If we have a read-only buffer, we shouldn't need writable and vice 

1048 # versa. Some buffers, are seek/read/writ-able but they do not have the "-able" 

1049 # methods, e.g., tempfile.SpooledTemporaryFile. 

1050 # If a buffer does not have the above "-able" methods, we simple assume they are 

1051 # seek/read/writ-able. 

1052 def __init__(self, buffer: BaseBuffer) -> None: 

1053 self.buffer = buffer 

1054 

1055 def __getattr__(self, name: str): 

1056 return getattr(self.buffer, name) 

1057 

1058 def readable(self) -> bool: 

1059 if hasattr(self.buffer, "readable"): 

1060 return self.buffer.readable() 

1061 return True 

1062 

1063 def seekable(self) -> bool: 

1064 if hasattr(self.buffer, "seekable"): 

1065 return self.buffer.seekable() 

1066 return True 

1067 

1068 def writable(self) -> bool: 

1069 if hasattr(self.buffer, "writable"): 

1070 return self.buffer.writable() 

1071 return True 

1072 

1073 

1074class _BytesIOWrapper: 

1075 # Wrapper that wraps a StringIO buffer and reads bytes from it 

1076 # Created for compat with pyarrow read_csv 

1077 def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8") -> None: 

1078 self.buffer = buffer 

1079 self.encoding = encoding 

1080 # Because a character can be represented by more than 1 byte, 

1081 # it is possible that reading will produce more bytes than n 

1082 # We store the extra bytes in this overflow variable, and append the 

1083 # overflow to the front of the bytestring the next time reading is performed 

1084 self.overflow = b"" 

1085 

1086 def __getattr__(self, attr: str): 

1087 return getattr(self.buffer, attr) 

1088 

1089 def read(self, n: int | None = -1) -> bytes: 

1090 assert self.buffer is not None 

1091 bytestring = self.buffer.read(n).encode(self.encoding) 

1092 # When n=-1/n greater than remaining bytes: Read entire file/rest of file 

1093 combined_bytestring = self.overflow + bytestring 

1094 if n is None or n < 0 or n >= len(combined_bytestring): 

1095 self.overflow = b"" 

1096 return combined_bytestring 

1097 else: 

1098 to_return = combined_bytestring[:n] 

1099 self.overflow = combined_bytestring[n:] 

1100 return to_return 

1101 

1102 

1103def _maybe_memory_map( 

1104 handle: str | BaseBuffer, memory_map: bool 

1105) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]: 

1106 """Try to memory map file/buffer.""" 

1107 handles: list[BaseBuffer] = [] 

1108 memory_map &= hasattr(handle, "fileno") or isinstance(handle, str) 

1109 if not memory_map: 

1110 return handle, memory_map, handles 

1111 

1112 # mmap used by only read_csv 

1113 handle = cast(ReadCsvBuffer, handle) 

1114 

1115 # need to open the file first 

1116 if isinstance(handle, str): 

1117 handle = open(handle, "rb") 

1118 handles.append(handle) 

1119 

1120 try: 

1121 # open mmap and adds *-able 

1122 # error: Argument 1 to "_IOWrapper" has incompatible type "mmap"; 

1123 # expected "BaseBuffer" 

1124 wrapped = _IOWrapper( 

1125 mmap.mmap( 

1126 handle.fileno(), 0, access=mmap.ACCESS_READ # type: ignore[arg-type] 

1127 ) 

1128 ) 

1129 finally: 

1130 for handle in reversed(handles): 

1131 # error: "BaseBuffer" has no attribute "close" 

1132 handle.close() # type: ignore[attr-defined] 

1133 

1134 return wrapped, memory_map, [wrapped] 

1135 

1136 

1137def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool: 

1138 """Test whether file exists.""" 

1139 exists = False 

1140 filepath_or_buffer = stringify_path(filepath_or_buffer) 

1141 if not isinstance(filepath_or_buffer, str): 

1142 return exists 

1143 try: 

1144 exists = os.path.exists(filepath_or_buffer) 

1145 # gh-5874: if the filepath is too long will raise here 

1146 except (TypeError, ValueError): 

1147 pass 

1148 return exists 

1149 

1150 

1151def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool: 

1152 """Whether the handle is opened in binary mode""" 

1153 # specified by user 

1154 if "t" in mode or "b" in mode: 

1155 return "b" in mode 

1156 

1157 # exceptions 

1158 text_classes = ( 

1159 # classes that expect string but have 'b' in mode 

1160 codecs.StreamWriter, 

1161 codecs.StreamReader, 

1162 codecs.StreamReaderWriter, 

1163 ) 

1164 if issubclass(type(handle), text_classes): 

1165 return False 

1166 

1167 return isinstance(handle, _get_binary_io_classes()) or "b" in getattr( 

1168 handle, "mode", mode 

1169 ) 

1170 

1171 

1172@functools.lru_cache 

1173def _get_binary_io_classes() -> tuple[type, ...]: 

1174 """IO classes that that expect bytes""" 

1175 binary_classes: tuple[type, ...] = (BufferedIOBase, RawIOBase) 

1176 

1177 # python-zstandard doesn't use any of the builtin base classes; instead we 

1178 # have to use the `zstd.ZstdDecompressionReader` class for isinstance checks. 

1179 # Unfortunately `zstd.ZstdDecompressionReader` isn't exposed by python-zstandard 

1180 # so we have to get it from a `zstd.ZstdDecompressor` instance. 

1181 # See also https://github.com/indygreg/python-zstandard/pull/165. 

1182 zstd = import_optional_dependency("zstandard", errors="ignore") 

1183 if zstd is not None: 

1184 with zstd.ZstdDecompressor().stream_reader(b"") as reader: 

1185 binary_classes += (type(reader),) 

1186 

1187 return binary_classes 

1188 

1189 

1190def is_potential_multi_index( 

1191 columns: Sequence[Hashable] | MultiIndex, 

1192 index_col: bool | Sequence[int] | None = None, 

1193) -> bool: 

1194 """ 

1195 Check whether or not the `columns` parameter 

1196 could be converted into a MultiIndex. 

1197 

1198 Parameters 

1199 ---------- 

1200 columns : array-like 

1201 Object which may or may not be convertible into a MultiIndex 

1202 index_col : None, bool or list, optional 

1203 Column or columns to use as the (possibly hierarchical) index 

1204 

1205 Returns 

1206 ------- 

1207 bool : Whether or not columns could become a MultiIndex 

1208 """ 

1209 if index_col is None or isinstance(index_col, bool): 

1210 index_col = [] 

1211 

1212 return bool( 

1213 len(columns) 

1214 and not isinstance(columns, MultiIndex) 

1215 and all(isinstance(c, tuple) for c in columns if c not in list(index_col)) 

1216 ) 

1217 

1218 

1219def dedup_names( 

1220 names: Sequence[Hashable], is_potential_multiindex: bool 

1221) -> Sequence[Hashable]: 

1222 """ 

1223 Rename column names if duplicates exist. 

1224 

1225 Currently the renaming is done by appending a period and an autonumeric, 

1226 but a custom pattern may be supported in the future. 

1227 

1228 Examples 

1229 -------- 

1230 >>> dedup_names(["x", "y", "x", "x"], is_potential_multiindex=False) 

1231 ['x', 'y', 'x.1', 'x.2'] 

1232 """ 

1233 names = list(names) # so we can index 

1234 counts: DefaultDict[Hashable, int] = defaultdict(int) 

1235 

1236 for i, col in enumerate(names): 

1237 cur_count = counts[col] 

1238 

1239 while cur_count > 0: 

1240 counts[col] = cur_count + 1 

1241 

1242 if is_potential_multiindex: 

1243 # for mypy 

1244 assert isinstance(col, tuple) 

1245 col = col[:-1] + (f"{col[-1]}.{cur_count}",) 

1246 else: 

1247 col = f"{col}.{cur_count}" 

1248 cur_count = counts[col] 

1249 

1250 names[i] = col 

1251 counts[col] = cur_count + 1 

1252 

1253 return names