Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/io/common.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Common IO api utilities"""
2from __future__ import annotations
4from abc import (
5 ABC,
6 abstractmethod,
7)
8import codecs
9from collections import defaultdict
10import dataclasses
11import functools
12import gzip
13from io import (
14 BufferedIOBase,
15 BytesIO,
16 RawIOBase,
17 StringIO,
18 TextIOBase,
19 TextIOWrapper,
20)
21import mmap
22import os
23from pathlib import Path
24import re
25import tarfile
26from typing import (
27 IO,
28 Any,
29 AnyStr,
30 DefaultDict,
31 Generic,
32 Hashable,
33 Literal,
34 Mapping,
35 Sequence,
36 TypeVar,
37 cast,
38 overload,
39)
40from urllib.parse import (
41 urljoin,
42 urlparse as parse_url,
43 uses_netloc,
44 uses_params,
45 uses_relative,
46)
47import warnings
48import zipfile
50from pandas._typing import (
51 BaseBuffer,
52 CompressionDict,
53 CompressionOptions,
54 FilePath,
55 ReadBuffer,
56 ReadCsvBuffer,
57 StorageOptions,
58 WriteBuffer,
59)
60from pandas.compat import get_lzma_file
61from pandas.compat._optional import import_optional_dependency
62from pandas.compat.compressors import BZ2File as _BZ2File
63from pandas.util._decorators import doc
64from pandas.util._exceptions import find_stack_level
66from pandas.core.dtypes.common import (
67 is_bool,
68 is_file_like,
69 is_integer,
70 is_list_like,
71)
73from pandas.core.indexes.api import MultiIndex
74from pandas.core.shared_docs import _shared_docs
76_VALID_URLS = set(uses_relative + uses_netloc + uses_params)
77_VALID_URLS.discard("")
78_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://")
80BaseBufferT = TypeVar("BaseBufferT", bound=BaseBuffer)
83@dataclasses.dataclass
84class IOArgs:
85 """
86 Return value of io/common.py:_get_filepath_or_buffer.
87 """
89 filepath_or_buffer: str | BaseBuffer
90 encoding: str
91 mode: str
92 compression: CompressionDict
93 should_close: bool = False
96@dataclasses.dataclass
97class IOHandles(Generic[AnyStr]):
98 """
99 Return value of io/common.py:get_handle
101 Can be used as a context manager.
103 This is used to easily close created buffers and to handle corner cases when
104 TextIOWrapper is inserted.
106 handle: The file handle to be used.
107 created_handles: All file handles that are created by get_handle
108 is_wrapped: Whether a TextIOWrapper needs to be detached.
109 """
111 # handle might not implement the IO-interface
112 handle: IO[AnyStr]
113 compression: CompressionDict
114 created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list)
115 is_wrapped: bool = False
117 def close(self) -> None:
118 """
119 Close all created buffers.
121 Note: If a TextIOWrapper was inserted, it is flushed and detached to
122 avoid closing the potentially user-created buffer.
123 """
124 if self.is_wrapped:
125 assert isinstance(self.handle, TextIOWrapper)
126 self.handle.flush()
127 self.handle.detach()
128 self.created_handles.remove(self.handle)
129 for handle in self.created_handles:
130 handle.close()
131 self.created_handles = []
132 self.is_wrapped = False
134 def __enter__(self) -> IOHandles[AnyStr]:
135 return self
137 def __exit__(self, *args: Any) -> None:
138 self.close()
141def is_url(url: object) -> bool:
142 """
143 Check to see if a URL has a valid protocol.
145 Parameters
146 ----------
147 url : str or unicode
149 Returns
150 -------
151 isurl : bool
152 If `url` has a valid protocol return True otherwise False.
153 """
154 if not isinstance(url, str):
155 return False
156 return parse_url(url).scheme in _VALID_URLS
159@overload
160def _expand_user(filepath_or_buffer: str) -> str:
161 ...
164@overload
165def _expand_user(filepath_or_buffer: BaseBufferT) -> BaseBufferT:
166 ...
169def _expand_user(filepath_or_buffer: str | BaseBufferT) -> str | BaseBufferT:
170 """
171 Return the argument with an initial component of ~ or ~user
172 replaced by that user's home directory.
174 Parameters
175 ----------
176 filepath_or_buffer : object to be converted if possible
178 Returns
179 -------
180 expanded_filepath_or_buffer : an expanded filepath or the
181 input if not expandable
182 """
183 if isinstance(filepath_or_buffer, str):
184 return os.path.expanduser(filepath_or_buffer)
185 return filepath_or_buffer
188def validate_header_arg(header: object) -> None:
189 if header is None:
190 return
191 if is_integer(header):
192 header = cast(int, header)
193 if header < 0:
194 # GH 27779
195 raise ValueError(
196 "Passing negative integer to header is invalid. "
197 "For no header, use header=None instead"
198 )
199 return
200 if is_list_like(header, allow_sets=False):
201 header = cast(Sequence, header)
202 if not all(map(is_integer, header)):
203 raise ValueError("header must be integer or list of integers")
204 if any(i < 0 for i in header):
205 raise ValueError("cannot specify multi-index header with negative integers")
206 return
207 if is_bool(header):
208 raise TypeError(
209 "Passing a bool to header is invalid. Use header=None for no header or "
210 "header=int or list-like of ints to specify "
211 "the row(s) making up the column names"
212 )
213 # GH 16338
214 raise ValueError("header must be integer or list of integers")
217@overload
218def stringify_path(filepath_or_buffer: FilePath, convert_file_like: bool = ...) -> str:
219 ...
222@overload
223def stringify_path(
224 filepath_or_buffer: BaseBufferT, convert_file_like: bool = ...
225) -> BaseBufferT:
226 ...
229def stringify_path(
230 filepath_or_buffer: FilePath | BaseBufferT,
231 convert_file_like: bool = False,
232) -> str | BaseBufferT:
233 """
234 Attempt to convert a path-like object to a string.
236 Parameters
237 ----------
238 filepath_or_buffer : object to be converted
240 Returns
241 -------
242 str_filepath_or_buffer : maybe a string version of the object
244 Notes
245 -----
246 Objects supporting the fspath protocol (python 3.6+) are coerced
247 according to its __fspath__ method.
249 Any other object is passed through unchanged, which includes bytes,
250 strings, buffers, or anything else that's not even path-like.
251 """
252 if not convert_file_like and is_file_like(filepath_or_buffer):
253 # GH 38125: some fsspec objects implement os.PathLike but have already opened a
254 # file. This prevents opening the file a second time. infer_compression calls
255 # this function with convert_file_like=True to infer the compression.
256 return cast(BaseBufferT, filepath_or_buffer)
258 if isinstance(filepath_or_buffer, os.PathLike):
259 filepath_or_buffer = filepath_or_buffer.__fspath__()
260 return _expand_user(filepath_or_buffer)
263def urlopen(*args, **kwargs):
264 """
265 Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of
266 the stdlib.
267 """
268 import urllib.request
270 return urllib.request.urlopen(*args, **kwargs)
273def is_fsspec_url(url: FilePath | BaseBuffer) -> bool:
274 """
275 Returns true if the given URL looks like
276 something fsspec can handle
277 """
278 return (
279 isinstance(url, str)
280 and bool(_RFC_3986_PATTERN.match(url))
281 and not url.startswith(("http://", "https://"))
282 )
285@doc(
286 storage_options=_shared_docs["storage_options"],
287 compression_options=_shared_docs["compression_options"] % "filepath_or_buffer",
288)
289def _get_filepath_or_buffer(
290 filepath_or_buffer: FilePath | BaseBuffer,
291 encoding: str = "utf-8",
292 compression: CompressionOptions = None,
293 mode: str = "r",
294 storage_options: StorageOptions = None,
295) -> IOArgs:
296 """
297 If the filepath_or_buffer is a url, translate and return the buffer.
298 Otherwise passthrough.
300 Parameters
301 ----------
302 filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path),
303 or buffer
304 {compression_options}
306 .. versionchanged:: 1.4.0 Zstandard support.
308 encoding : the encoding to use to decode bytes, default is 'utf-8'
309 mode : str, optional
311 {storage_options}
313 .. versionadded:: 1.2.0
315 ..versionchange:: 1.2.0
317 Returns the dataclass IOArgs.
318 """
319 filepath_or_buffer = stringify_path(filepath_or_buffer)
321 # handle compression dict
322 compression_method, compression = get_compression_method(compression)
323 compression_method = infer_compression(filepath_or_buffer, compression_method)
325 # GH21227 internal compression is not used for non-binary handles.
326 if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode:
327 warnings.warn(
328 "compression has no effect when passing a non-binary object as input.",
329 RuntimeWarning,
330 stacklevel=find_stack_level(),
331 )
332 compression_method = None
334 compression = dict(compression, method=compression_method)
336 # bz2 and xz do not write the byte order mark for utf-16 and utf-32
337 # print a warning when writing such files
338 if (
339 "w" in mode
340 and compression_method in ["bz2", "xz"]
341 and encoding in ["utf-16", "utf-32"]
342 ):
343 warnings.warn(
344 f"{compression} will not write the byte order mark for {encoding}",
345 UnicodeWarning,
346 stacklevel=find_stack_level(),
347 )
349 # Use binary mode when converting path-like objects to file-like objects (fsspec)
350 # except when text mode is explicitly requested. The original mode is returned if
351 # fsspec is not used.
352 fsspec_mode = mode
353 if "t" not in fsspec_mode and "b" not in fsspec_mode:
354 fsspec_mode += "b"
356 if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
357 # TODO: fsspec can also handle HTTP via requests, but leaving this
358 # unchanged. using fsspec appears to break the ability to infer if the
359 # server responded with gzipped data
360 storage_options = storage_options or {}
362 # waiting until now for importing to match intended lazy logic of
363 # urlopen function defined elsewhere in this module
364 import urllib.request
366 # assuming storage_options is to be interpreted as headers
367 req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options)
368 with urlopen(req_info) as req:
369 content_encoding = req.headers.get("Content-Encoding", None)
370 if content_encoding == "gzip":
371 # Override compression based on Content-Encoding header
372 compression = {"method": "gzip"}
373 reader = BytesIO(req.read())
374 return IOArgs(
375 filepath_or_buffer=reader,
376 encoding=encoding,
377 compression=compression,
378 should_close=True,
379 mode=fsspec_mode,
380 )
382 if is_fsspec_url(filepath_or_buffer):
383 assert isinstance(
384 filepath_or_buffer, str
385 ) # just to appease mypy for this branch
386 # two special-case s3-like protocols; these have special meaning in Hadoop,
387 # but are equivalent to just "s3" from fsspec's point of view
388 # cc #11071
389 if filepath_or_buffer.startswith("s3a://"):
390 filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://")
391 if filepath_or_buffer.startswith("s3n://"):
392 filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://")
393 fsspec = import_optional_dependency("fsspec")
395 # If botocore is installed we fallback to reading with anon=True
396 # to allow reads from public buckets
397 err_types_to_retry_with_anon: list[Any] = []
398 try:
399 import_optional_dependency("botocore")
400 from botocore.exceptions import (
401 ClientError,
402 NoCredentialsError,
403 )
405 err_types_to_retry_with_anon = [
406 ClientError,
407 NoCredentialsError,
408 PermissionError,
409 ]
410 except ImportError:
411 pass
413 try:
414 file_obj = fsspec.open(
415 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
416 ).open()
417 # GH 34626 Reads from Public Buckets without Credentials needs anon=True
418 except tuple(err_types_to_retry_with_anon):
419 if storage_options is None:
420 storage_options = {"anon": True}
421 else:
422 # don't mutate user input.
423 storage_options = dict(storage_options)
424 storage_options["anon"] = True
425 file_obj = fsspec.open(
426 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
427 ).open()
429 return IOArgs(
430 filepath_or_buffer=file_obj,
431 encoding=encoding,
432 compression=compression,
433 should_close=True,
434 mode=fsspec_mode,
435 )
436 elif storage_options:
437 raise ValueError(
438 "storage_options passed with file object or non-fsspec file path"
439 )
441 if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
442 return IOArgs(
443 filepath_or_buffer=_expand_user(filepath_or_buffer),
444 encoding=encoding,
445 compression=compression,
446 should_close=False,
447 mode=mode,
448 )
450 # is_file_like requires (read | write) & __iter__ but __iter__ is only
451 # needed for read_csv(engine=python)
452 if not (
453 hasattr(filepath_or_buffer, "read") or hasattr(filepath_or_buffer, "write")
454 ):
455 msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}"
456 raise ValueError(msg)
458 return IOArgs(
459 filepath_or_buffer=filepath_or_buffer,
460 encoding=encoding,
461 compression=compression,
462 should_close=False,
463 mode=mode,
464 )
467def file_path_to_url(path: str) -> str:
468 """
469 converts an absolute native path to a FILE URL.
471 Parameters
472 ----------
473 path : a path in native format
475 Returns
476 -------
477 a valid FILE URL
478 """
479 # lazify expensive import (~30ms)
480 from urllib.request import pathname2url
482 return urljoin("file:", pathname2url(path))
485extension_to_compression = {
486 ".tar": "tar",
487 ".tar.gz": "tar",
488 ".tar.bz2": "tar",
489 ".tar.xz": "tar",
490 ".gz": "gzip",
491 ".bz2": "bz2",
492 ".zip": "zip",
493 ".xz": "xz",
494 ".zst": "zstd",
495}
496_supported_compressions = set(extension_to_compression.values())
499def get_compression_method(
500 compression: CompressionOptions,
501) -> tuple[str | None, CompressionDict]:
502 """
503 Simplifies a compression argument to a compression method string and
504 a mapping containing additional arguments.
506 Parameters
507 ----------
508 compression : str or mapping
509 If string, specifies the compression method. If mapping, value at key
510 'method' specifies compression method.
512 Returns
513 -------
514 tuple of ({compression method}, Optional[str]
515 {compression arguments}, Dict[str, Any])
517 Raises
518 ------
519 ValueError on mapping missing 'method' key
520 """
521 compression_method: str | None
522 if isinstance(compression, Mapping):
523 compression_args = dict(compression)
524 try:
525 compression_method = compression_args.pop("method")
526 except KeyError as err:
527 raise ValueError("If mapping, compression must have key 'method'") from err
528 else:
529 compression_args = {}
530 compression_method = compression
531 return compression_method, compression_args
534@doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer")
535def infer_compression(
536 filepath_or_buffer: FilePath | BaseBuffer, compression: str | None
537) -> str | None:
538 """
539 Get the compression method for filepath_or_buffer. If compression='infer',
540 the inferred compression method is returned. Otherwise, the input
541 compression method is returned unchanged, unless it's invalid, in which
542 case an error is raised.
544 Parameters
545 ----------
546 filepath_or_buffer : str or file handle
547 File path or object.
548 {compression_options}
550 .. versionchanged:: 1.4.0 Zstandard support.
552 Returns
553 -------
554 string or None
556 Raises
557 ------
558 ValueError on invalid compression specified.
559 """
560 if compression is None:
561 return None
563 # Infer compression
564 if compression == "infer":
565 # Convert all path types (e.g. pathlib.Path) to strings
566 filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True)
567 if not isinstance(filepath_or_buffer, str):
568 # Cannot infer compression of a buffer, assume no compression
569 return None
571 # Infer compression from the filename/URL extension
572 for extension, compression in extension_to_compression.items():
573 if filepath_or_buffer.lower().endswith(extension):
574 return compression
575 return None
577 # Compression has been specified. Check that it's valid
578 if compression in _supported_compressions:
579 return compression
581 valid = ["infer", None] + sorted(_supported_compressions)
582 msg = (
583 f"Unrecognized compression type: {compression}\n"
584 f"Valid compression types are {valid}"
585 )
586 raise ValueError(msg)
589def check_parent_directory(path: Path | str) -> None:
590 """
591 Check if parent directory of a file exists, raise OSError if it does not
593 Parameters
594 ----------
595 path: Path or str
596 Path to check parent directory of
597 """
598 parent = Path(path).parent
599 if not parent.is_dir():
600 raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'")
603@overload
604def get_handle(
605 path_or_buf: FilePath | BaseBuffer,
606 mode: str,
607 *,
608 encoding: str | None = ...,
609 compression: CompressionOptions = ...,
610 memory_map: bool = ...,
611 is_text: Literal[False],
612 errors: str | None = ...,
613 storage_options: StorageOptions = ...,
614) -> IOHandles[bytes]:
615 ...
618@overload
619def get_handle(
620 path_or_buf: FilePath | BaseBuffer,
621 mode: str,
622 *,
623 encoding: str | None = ...,
624 compression: CompressionOptions = ...,
625 memory_map: bool = ...,
626 is_text: Literal[True] = ...,
627 errors: str | None = ...,
628 storage_options: StorageOptions = ...,
629) -> IOHandles[str]:
630 ...
633@overload
634def get_handle(
635 path_or_buf: FilePath | BaseBuffer,
636 mode: str,
637 *,
638 encoding: str | None = ...,
639 compression: CompressionOptions = ...,
640 memory_map: bool = ...,
641 is_text: bool = ...,
642 errors: str | None = ...,
643 storage_options: StorageOptions = ...,
644) -> IOHandles[str] | IOHandles[bytes]:
645 ...
648@doc(compression_options=_shared_docs["compression_options"] % "path_or_buf")
649def get_handle(
650 path_or_buf: FilePath | BaseBuffer,
651 mode: str,
652 *,
653 encoding: str | None = None,
654 compression: CompressionOptions = None,
655 memory_map: bool = False,
656 is_text: bool = True,
657 errors: str | None = None,
658 storage_options: StorageOptions = None,
659) -> IOHandles[str] | IOHandles[bytes]:
660 """
661 Get file handle for given path/buffer and mode.
663 Parameters
664 ----------
665 path_or_buf : str or file handle
666 File path or object.
667 mode : str
668 Mode to open path_or_buf with.
669 encoding : str or None
670 Encoding to use.
671 {compression_options}
673 .. versionchanged:: 1.0.0
674 May now be a dict with key 'method' as compression mode
675 and other keys as compression options if compression
676 mode is 'zip'.
678 .. versionchanged:: 1.1.0
679 Passing compression options as keys in dict is now
680 supported for compression modes 'gzip', 'bz2', 'zstd' and 'zip'.
682 .. versionchanged:: 1.4.0 Zstandard support.
684 memory_map : bool, default False
685 See parsers._parser_params for more information. Only used by read_csv.
686 is_text : bool, default True
687 Whether the type of the content passed to the file/buffer is string or
688 bytes. This is not the same as `"b" not in mode`. If a string content is
689 passed to a binary file/buffer, a wrapper is inserted.
690 errors : str, default 'strict'
691 Specifies how encoding and decoding errors are to be handled.
692 See the errors argument for :func:`open` for a full list
693 of options.
694 storage_options: StorageOptions = None
695 Passed to _get_filepath_or_buffer
697 .. versionchanged:: 1.2.0
699 Returns the dataclass IOHandles
700 """
701 # Windows does not default to utf-8. Set to utf-8 for a consistent behavior
702 encoding = encoding or "utf-8"
704 errors = errors or "strict"
706 # read_csv does not know whether the buffer is opened in binary/text mode
707 if _is_binary_mode(path_or_buf, mode) and "b" not in mode:
708 mode += "b"
710 # validate encoding and errors
711 codecs.lookup(encoding)
712 if isinstance(errors, str):
713 codecs.lookup_error(errors)
715 # open URLs
716 ioargs = _get_filepath_or_buffer(
717 path_or_buf,
718 encoding=encoding,
719 compression=compression,
720 mode=mode,
721 storage_options=storage_options,
722 )
724 handle = ioargs.filepath_or_buffer
725 handles: list[BaseBuffer]
727 # memory mapping needs to be the first step
728 # only used for read_csv
729 handle, memory_map, handles = _maybe_memory_map(handle, memory_map)
731 is_path = isinstance(handle, str)
732 compression_args = dict(ioargs.compression)
733 compression = compression_args.pop("method")
735 # Only for write methods
736 if "r" not in mode and is_path:
737 check_parent_directory(str(handle))
739 if compression:
740 if compression != "zstd":
741 # compression libraries do not like an explicit text-mode
742 ioargs.mode = ioargs.mode.replace("t", "")
743 elif compression == "zstd" and "b" not in ioargs.mode:
744 # python-zstandard defaults to text mode, but we always expect
745 # compression libraries to use binary mode.
746 ioargs.mode += "b"
748 # GZ Compression
749 if compression == "gzip":
750 if isinstance(handle, str):
751 # error: Incompatible types in assignment (expression has type
752 # "GzipFile", variable has type "Union[str, BaseBuffer]")
753 handle = gzip.GzipFile( # type: ignore[assignment]
754 filename=handle,
755 mode=ioargs.mode,
756 **compression_args,
757 )
758 else:
759 handle = gzip.GzipFile(
760 # No overload variant of "GzipFile" matches argument types
761 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]"
762 fileobj=handle, # type: ignore[call-overload]
763 mode=ioargs.mode,
764 **compression_args,
765 )
767 # BZ Compression
768 elif compression == "bz2":
769 # Overload of "BZ2File" to handle pickle protocol 5
770 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]"
771 handle = _BZ2File( # type: ignore[call-overload]
772 handle,
773 mode=ioargs.mode,
774 **compression_args,
775 )
777 # ZIP Compression
778 elif compression == "zip":
779 # error: Argument 1 to "_BytesZipFile" has incompatible type
780 # "Union[str, BaseBuffer]"; expected "Union[Union[str, PathLike[str]],
781 # ReadBuffer[bytes], WriteBuffer[bytes]]"
782 handle = _BytesZipFile(
783 handle, ioargs.mode, **compression_args # type: ignore[arg-type]
784 )
785 if handle.buffer.mode == "r":
786 handles.append(handle)
787 zip_names = handle.buffer.namelist()
788 if len(zip_names) == 1:
789 handle = handle.buffer.open(zip_names.pop())
790 elif not zip_names:
791 raise ValueError(f"Zero files found in ZIP file {path_or_buf}")
792 else:
793 raise ValueError(
794 "Multiple files found in ZIP file. "
795 f"Only one file per ZIP: {zip_names}"
796 )
798 # TAR Encoding
799 elif compression == "tar":
800 compression_args.setdefault("mode", ioargs.mode)
801 if isinstance(handle, str):
802 handle = _BytesTarFile(name=handle, **compression_args)
803 else:
804 # error: Argument "fileobj" to "_BytesTarFile" has incompatible
805 # type "BaseBuffer"; expected "Union[ReadBuffer[bytes],
806 # WriteBuffer[bytes], None]"
807 handle = _BytesTarFile(
808 fileobj=handle, **compression_args # type: ignore[arg-type]
809 )
810 assert isinstance(handle, _BytesTarFile)
811 if "r" in handle.buffer.mode:
812 handles.append(handle)
813 files = handle.buffer.getnames()
814 if len(files) == 1:
815 file = handle.buffer.extractfile(files[0])
816 assert file is not None
817 handle = file
818 elif not files:
819 raise ValueError(f"Zero files found in TAR archive {path_or_buf}")
820 else:
821 raise ValueError(
822 "Multiple files found in TAR archive. "
823 f"Only one file per TAR archive: {files}"
824 )
826 # XZ Compression
827 elif compression == "xz":
828 # error: Argument 1 to "LZMAFile" has incompatible type "Union[str,
829 # BaseBuffer]"; expected "Optional[Union[Union[str, bytes, PathLike[str],
830 # PathLike[bytes]], IO[bytes]]]"
831 handle = get_lzma_file()(handle, ioargs.mode) # type: ignore[arg-type]
833 # Zstd Compression
834 elif compression == "zstd":
835 zstd = import_optional_dependency("zstandard")
836 if "r" in ioargs.mode:
837 open_args = {"dctx": zstd.ZstdDecompressor(**compression_args)}
838 else:
839 open_args = {"cctx": zstd.ZstdCompressor(**compression_args)}
840 handle = zstd.open(
841 handle,
842 mode=ioargs.mode,
843 **open_args,
844 )
846 # Unrecognized Compression
847 else:
848 msg = f"Unrecognized compression type: {compression}"
849 raise ValueError(msg)
851 assert not isinstance(handle, str)
852 handles.append(handle)
854 elif isinstance(handle, str):
855 # Check whether the filename is to be opened in binary mode.
856 # Binary mode does not support 'encoding' and 'newline'.
857 if ioargs.encoding and "b" not in ioargs.mode:
858 # Encoding
859 handle = open(
860 handle,
861 ioargs.mode,
862 encoding=ioargs.encoding,
863 errors=errors,
864 newline="",
865 )
866 else:
867 # Binary mode
868 handle = open(handle, ioargs.mode)
869 handles.append(handle)
871 # Convert BytesIO or file objects passed with an encoding
872 is_wrapped = False
873 if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase):
874 # not added to handles as it does not open/buffer resources
875 handle = _BytesIOWrapper(
876 handle,
877 encoding=ioargs.encoding,
878 )
879 elif is_text and (
880 compression or memory_map or _is_binary_mode(handle, ioargs.mode)
881 ):
882 if (
883 not hasattr(handle, "readable")
884 or not hasattr(handle, "writable")
885 or not hasattr(handle, "seekable")
886 ):
887 handle = _IOWrapper(handle)
888 # error: Argument 1 to "TextIOWrapper" has incompatible type
889 # "_IOWrapper"; expected "IO[bytes]"
890 handle = TextIOWrapper(
891 handle, # type: ignore[arg-type]
892 encoding=ioargs.encoding,
893 errors=errors,
894 newline="",
895 )
896 handles.append(handle)
897 # only marked as wrapped when the caller provided a handle
898 is_wrapped = not (
899 isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close
900 )
902 if "r" in ioargs.mode and not hasattr(handle, "read"):
903 raise TypeError(
904 "Expected file path name or file-like object, "
905 f"got {type(ioargs.filepath_or_buffer)} type"
906 )
908 handles.reverse() # close the most recently added buffer first
909 if ioargs.should_close:
910 assert not isinstance(ioargs.filepath_or_buffer, str)
911 handles.append(ioargs.filepath_or_buffer)
913 return IOHandles(
914 # error: Argument "handle" to "IOHandles" has incompatible type
915 # "Union[TextIOWrapper, GzipFile, BaseBuffer, typing.IO[bytes],
916 # typing.IO[Any]]"; expected "pandas._typing.IO[Any]"
917 handle=handle, # type: ignore[arg-type]
918 # error: Argument "created_handles" to "IOHandles" has incompatible type
919 # "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]"
920 created_handles=handles, # type: ignore[arg-type]
921 is_wrapped=is_wrapped,
922 compression=ioargs.compression,
923 )
926# error: Definition of "__enter__" in base class "IOBase" is incompatible
927# with definition in base class "BinaryIO"
928class _BufferedWriter(BytesIO, ABC): # type: ignore[misc]
929 """
930 Some objects do not support multiple .write() calls (TarFile and ZipFile).
931 This wrapper writes to the underlying buffer on close.
932 """
934 @abstractmethod
935 def write_to_buffer(self) -> None:
936 ...
938 def close(self) -> None:
939 if self.closed:
940 # already closed
941 return
942 if self.getvalue():
943 # write to buffer
944 self.seek(0)
945 # error: "_BufferedWriter" has no attribute "buffer"
946 with self.buffer: # type: ignore[attr-defined]
947 self.write_to_buffer()
948 else:
949 # error: "_BufferedWriter" has no attribute "buffer"
950 self.buffer.close() # type: ignore[attr-defined]
951 super().close()
954class _BytesTarFile(_BufferedWriter):
955 def __init__(
956 self,
957 name: str | None = None,
958 mode: Literal["r", "a", "w", "x"] = "r",
959 fileobj: ReadBuffer[bytes] | WriteBuffer[bytes] | None = None,
960 archive_name: str | None = None,
961 **kwargs,
962 ) -> None:
963 super().__init__()
964 self.archive_name = archive_name
965 self.name = name
966 # error: Argument "fileobj" to "open" of "TarFile" has incompatible
967 # type "Union[ReadBuffer[bytes], WriteBuffer[bytes], None]"; expected
968 # "Optional[IO[bytes]]"
969 self.buffer = tarfile.TarFile.open(
970 name=name,
971 mode=self.extend_mode(mode),
972 fileobj=fileobj, # type: ignore[arg-type]
973 **kwargs,
974 )
976 def extend_mode(self, mode: str) -> str:
977 mode = mode.replace("b", "")
978 if mode != "w":
979 return mode
980 if self.name is not None:
981 suffix = Path(self.name).suffix
982 if suffix in (".gz", ".xz", ".bz2"):
983 mode = f"{mode}:{suffix[1:]}"
984 return mode
986 def infer_filename(self) -> str | None:
987 """
988 If an explicit archive_name is not given, we still want the file inside the zip
989 file not to be named something.tar, because that causes confusion (GH39465).
990 """
991 if self.name is None:
992 return None
994 filename = Path(self.name)
995 if filename.suffix == ".tar":
996 return filename.with_suffix("").name
997 elif filename.suffix in (".tar.gz", ".tar.bz2", ".tar.xz"):
998 return filename.with_suffix("").with_suffix("").name
999 return filename.name
1001 def write_to_buffer(self) -> None:
1002 # TarFile needs a non-empty string
1003 archive_name = self.archive_name or self.infer_filename() or "tar"
1004 tarinfo = tarfile.TarInfo(name=archive_name)
1005 tarinfo.size = len(self.getvalue())
1006 self.buffer.addfile(tarinfo, self)
1009class _BytesZipFile(_BufferedWriter):
1010 def __init__(
1011 self,
1012 file: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],
1013 mode: str,
1014 archive_name: str | None = None,
1015 **kwargs,
1016 ) -> None:
1017 super().__init__()
1018 mode = mode.replace("b", "")
1019 self.archive_name = archive_name
1021 kwargs.setdefault("compression", zipfile.ZIP_DEFLATED)
1022 # error: Argument 1 to "ZipFile" has incompatible type "Union[
1023 # Union[str, PathLike[str]], ReadBuffer[bytes], WriteBuffer[bytes]]";
1024 # expected "Union[Union[str, PathLike[str]], IO[bytes]]"
1025 self.buffer = zipfile.ZipFile(file, mode, **kwargs) # type: ignore[arg-type]
1027 def infer_filename(self) -> str | None:
1028 """
1029 If an explicit archive_name is not given, we still want the file inside the zip
1030 file not to be named something.zip, because that causes confusion (GH39465).
1031 """
1032 if isinstance(self.buffer.filename, (os.PathLike, str)):
1033 filename = Path(self.buffer.filename)
1034 if filename.suffix == ".zip":
1035 return filename.with_suffix("").name
1036 return filename.name
1037 return None
1039 def write_to_buffer(self) -> None:
1040 # ZipFile needs a non-empty string
1041 archive_name = self.archive_name or self.infer_filename() or "zip"
1042 self.buffer.writestr(archive_name, self.getvalue())
1045class _IOWrapper:
1046 # TextIOWrapper is overly strict: it request that the buffer has seekable, readable,
1047 # and writable. If we have a read-only buffer, we shouldn't need writable and vice
1048 # versa. Some buffers, are seek/read/writ-able but they do not have the "-able"
1049 # methods, e.g., tempfile.SpooledTemporaryFile.
1050 # If a buffer does not have the above "-able" methods, we simple assume they are
1051 # seek/read/writ-able.
1052 def __init__(self, buffer: BaseBuffer) -> None:
1053 self.buffer = buffer
1055 def __getattr__(self, name: str):
1056 return getattr(self.buffer, name)
1058 def readable(self) -> bool:
1059 if hasattr(self.buffer, "readable"):
1060 return self.buffer.readable()
1061 return True
1063 def seekable(self) -> bool:
1064 if hasattr(self.buffer, "seekable"):
1065 return self.buffer.seekable()
1066 return True
1068 def writable(self) -> bool:
1069 if hasattr(self.buffer, "writable"):
1070 return self.buffer.writable()
1071 return True
1074class _BytesIOWrapper:
1075 # Wrapper that wraps a StringIO buffer and reads bytes from it
1076 # Created for compat with pyarrow read_csv
1077 def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8") -> None:
1078 self.buffer = buffer
1079 self.encoding = encoding
1080 # Because a character can be represented by more than 1 byte,
1081 # it is possible that reading will produce more bytes than n
1082 # We store the extra bytes in this overflow variable, and append the
1083 # overflow to the front of the bytestring the next time reading is performed
1084 self.overflow = b""
1086 def __getattr__(self, attr: str):
1087 return getattr(self.buffer, attr)
1089 def read(self, n: int | None = -1) -> bytes:
1090 assert self.buffer is not None
1091 bytestring = self.buffer.read(n).encode(self.encoding)
1092 # When n=-1/n greater than remaining bytes: Read entire file/rest of file
1093 combined_bytestring = self.overflow + bytestring
1094 if n is None or n < 0 or n >= len(combined_bytestring):
1095 self.overflow = b""
1096 return combined_bytestring
1097 else:
1098 to_return = combined_bytestring[:n]
1099 self.overflow = combined_bytestring[n:]
1100 return to_return
1103def _maybe_memory_map(
1104 handle: str | BaseBuffer, memory_map: bool
1105) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]:
1106 """Try to memory map file/buffer."""
1107 handles: list[BaseBuffer] = []
1108 memory_map &= hasattr(handle, "fileno") or isinstance(handle, str)
1109 if not memory_map:
1110 return handle, memory_map, handles
1112 # mmap used by only read_csv
1113 handle = cast(ReadCsvBuffer, handle)
1115 # need to open the file first
1116 if isinstance(handle, str):
1117 handle = open(handle, "rb")
1118 handles.append(handle)
1120 try:
1121 # open mmap and adds *-able
1122 # error: Argument 1 to "_IOWrapper" has incompatible type "mmap";
1123 # expected "BaseBuffer"
1124 wrapped = _IOWrapper(
1125 mmap.mmap(
1126 handle.fileno(), 0, access=mmap.ACCESS_READ # type: ignore[arg-type]
1127 )
1128 )
1129 finally:
1130 for handle in reversed(handles):
1131 # error: "BaseBuffer" has no attribute "close"
1132 handle.close() # type: ignore[attr-defined]
1134 return wrapped, memory_map, [wrapped]
1137def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool:
1138 """Test whether file exists."""
1139 exists = False
1140 filepath_or_buffer = stringify_path(filepath_or_buffer)
1141 if not isinstance(filepath_or_buffer, str):
1142 return exists
1143 try:
1144 exists = os.path.exists(filepath_or_buffer)
1145 # gh-5874: if the filepath is too long will raise here
1146 except (TypeError, ValueError):
1147 pass
1148 return exists
1151def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool:
1152 """Whether the handle is opened in binary mode"""
1153 # specified by user
1154 if "t" in mode or "b" in mode:
1155 return "b" in mode
1157 # exceptions
1158 text_classes = (
1159 # classes that expect string but have 'b' in mode
1160 codecs.StreamWriter,
1161 codecs.StreamReader,
1162 codecs.StreamReaderWriter,
1163 )
1164 if issubclass(type(handle), text_classes):
1165 return False
1167 return isinstance(handle, _get_binary_io_classes()) or "b" in getattr(
1168 handle, "mode", mode
1169 )
1172@functools.lru_cache
1173def _get_binary_io_classes() -> tuple[type, ...]:
1174 """IO classes that that expect bytes"""
1175 binary_classes: tuple[type, ...] = (BufferedIOBase, RawIOBase)
1177 # python-zstandard doesn't use any of the builtin base classes; instead we
1178 # have to use the `zstd.ZstdDecompressionReader` class for isinstance checks.
1179 # Unfortunately `zstd.ZstdDecompressionReader` isn't exposed by python-zstandard
1180 # so we have to get it from a `zstd.ZstdDecompressor` instance.
1181 # See also https://github.com/indygreg/python-zstandard/pull/165.
1182 zstd = import_optional_dependency("zstandard", errors="ignore")
1183 if zstd is not None:
1184 with zstd.ZstdDecompressor().stream_reader(b"") as reader:
1185 binary_classes += (type(reader),)
1187 return binary_classes
1190def is_potential_multi_index(
1191 columns: Sequence[Hashable] | MultiIndex,
1192 index_col: bool | Sequence[int] | None = None,
1193) -> bool:
1194 """
1195 Check whether or not the `columns` parameter
1196 could be converted into a MultiIndex.
1198 Parameters
1199 ----------
1200 columns : array-like
1201 Object which may or may not be convertible into a MultiIndex
1202 index_col : None, bool or list, optional
1203 Column or columns to use as the (possibly hierarchical) index
1205 Returns
1206 -------
1207 bool : Whether or not columns could become a MultiIndex
1208 """
1209 if index_col is None or isinstance(index_col, bool):
1210 index_col = []
1212 return bool(
1213 len(columns)
1214 and not isinstance(columns, MultiIndex)
1215 and all(isinstance(c, tuple) for c in columns if c not in list(index_col))
1216 )
1219def dedup_names(
1220 names: Sequence[Hashable], is_potential_multiindex: bool
1221) -> Sequence[Hashable]:
1222 """
1223 Rename column names if duplicates exist.
1225 Currently the renaming is done by appending a period and an autonumeric,
1226 but a custom pattern may be supported in the future.
1228 Examples
1229 --------
1230 >>> dedup_names(["x", "y", "x", "x"], is_potential_multiindex=False)
1231 ['x', 'y', 'x.1', 'x.2']
1232 """
1233 names = list(names) # so we can index
1234 counts: DefaultDict[Hashable, int] = defaultdict(int)
1236 for i, col in enumerate(names):
1237 cur_count = counts[col]
1239 while cur_count > 0:
1240 counts[col] = cur_count + 1
1242 if is_potential_multiindex:
1243 # for mypy
1244 assert isinstance(col, tuple)
1245 col = col[:-1] + (f"{col[-1]}.{cur_count}",)
1246 else:
1247 col = f"{col}.{cur_count}"
1248 cur_count = counts[col]
1250 names[i] = col
1251 counts[col] = cur_count + 1
1253 return names