Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/io/common.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Common IO api utilities"""
2from __future__ import annotations
4from abc import (
5 ABC,
6 abstractmethod,
7)
8import codecs
9from collections import defaultdict
10from collections.abc import (
11 Hashable,
12 Mapping,
13 Sequence,
14)
15import dataclasses
16import functools
17import gzip
18from io import (
19 BufferedIOBase,
20 BytesIO,
21 RawIOBase,
22 StringIO,
23 TextIOBase,
24 TextIOWrapper,
25)
26import mmap
27import os
28from pathlib import Path
29import re
30import tarfile
31from typing import (
32 IO,
33 TYPE_CHECKING,
34 Any,
35 AnyStr,
36 DefaultDict,
37 Generic,
38 Literal,
39 TypeVar,
40 cast,
41 overload,
42)
43from urllib.parse import (
44 urljoin,
45 urlparse as parse_url,
46 uses_netloc,
47 uses_params,
48 uses_relative,
49)
50import warnings
51import zipfile
53from pandas._typing import (
54 BaseBuffer,
55 ReadCsvBuffer,
56)
57from pandas.compat import (
58 get_bz2_file,
59 get_lzma_file,
60)
61from pandas.compat._optional import import_optional_dependency
62from pandas.util._decorators import doc
63from pandas.util._exceptions import find_stack_level
65from pandas.core.dtypes.common import (
66 is_bool,
67 is_file_like,
68 is_integer,
69 is_list_like,
70)
71from pandas.core.dtypes.generic import ABCMultiIndex
73from pandas.core.shared_docs import _shared_docs
75_VALID_URLS = set(uses_relative + uses_netloc + uses_params)
76_VALID_URLS.discard("")
77_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://")
79BaseBufferT = TypeVar("BaseBufferT", bound=BaseBuffer)
82if TYPE_CHECKING:
83 from types import TracebackType
85 from pandas._typing import (
86 CompressionDict,
87 CompressionOptions,
88 FilePath,
89 ReadBuffer,
90 StorageOptions,
91 WriteBuffer,
92 )
94 from pandas import MultiIndex
97@dataclasses.dataclass
98class IOArgs:
99 """
100 Return value of io/common.py:_get_filepath_or_buffer.
101 """
103 filepath_or_buffer: str | BaseBuffer
104 encoding: str
105 mode: str
106 compression: CompressionDict
107 should_close: bool = False
110@dataclasses.dataclass
111class IOHandles(Generic[AnyStr]):
112 """
113 Return value of io/common.py:get_handle
115 Can be used as a context manager.
117 This is used to easily close created buffers and to handle corner cases when
118 TextIOWrapper is inserted.
120 handle: The file handle to be used.
121 created_handles: All file handles that are created by get_handle
122 is_wrapped: Whether a TextIOWrapper needs to be detached.
123 """
125 # handle might not implement the IO-interface
126 handle: IO[AnyStr]
127 compression: CompressionDict
128 created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list)
129 is_wrapped: bool = False
131 def close(self) -> None:
132 """
133 Close all created buffers.
135 Note: If a TextIOWrapper was inserted, it is flushed and detached to
136 avoid closing the potentially user-created buffer.
137 """
138 if self.is_wrapped:
139 assert isinstance(self.handle, TextIOWrapper)
140 self.handle.flush()
141 self.handle.detach()
142 self.created_handles.remove(self.handle)
143 for handle in self.created_handles:
144 handle.close()
145 self.created_handles = []
146 self.is_wrapped = False
148 def __enter__(self) -> IOHandles[AnyStr]:
149 return self
151 def __exit__(
152 self,
153 exc_type: type[BaseException] | None,
154 exc_value: BaseException | None,
155 traceback: TracebackType | None,
156 ) -> None:
157 self.close()
160def is_url(url: object) -> bool:
161 """
162 Check to see if a URL has a valid protocol.
164 Parameters
165 ----------
166 url : str or unicode
168 Returns
169 -------
170 isurl : bool
171 If `url` has a valid protocol return True otherwise False.
172 """
173 if not isinstance(url, str):
174 return False
175 return parse_url(url).scheme in _VALID_URLS
178@overload
179def _expand_user(filepath_or_buffer: str) -> str:
180 ...
183@overload
184def _expand_user(filepath_or_buffer: BaseBufferT) -> BaseBufferT:
185 ...
188def _expand_user(filepath_or_buffer: str | BaseBufferT) -> str | BaseBufferT:
189 """
190 Return the argument with an initial component of ~ or ~user
191 replaced by that user's home directory.
193 Parameters
194 ----------
195 filepath_or_buffer : object to be converted if possible
197 Returns
198 -------
199 expanded_filepath_or_buffer : an expanded filepath or the
200 input if not expandable
201 """
202 if isinstance(filepath_or_buffer, str):
203 return os.path.expanduser(filepath_or_buffer)
204 return filepath_or_buffer
207def validate_header_arg(header: object) -> None:
208 if header is None:
209 return
210 if is_integer(header):
211 header = cast(int, header)
212 if header < 0:
213 # GH 27779
214 raise ValueError(
215 "Passing negative integer to header is invalid. "
216 "For no header, use header=None instead"
217 )
218 return
219 if is_list_like(header, allow_sets=False):
220 header = cast(Sequence, header)
221 if not all(map(is_integer, header)):
222 raise ValueError("header must be integer or list of integers")
223 if any(i < 0 for i in header):
224 raise ValueError("cannot specify multi-index header with negative integers")
225 return
226 if is_bool(header):
227 raise TypeError(
228 "Passing a bool to header is invalid. Use header=None for no header or "
229 "header=int or list-like of ints to specify "
230 "the row(s) making up the column names"
231 )
232 # GH 16338
233 raise ValueError("header must be integer or list of integers")
236@overload
237def stringify_path(filepath_or_buffer: FilePath, convert_file_like: bool = ...) -> str:
238 ...
241@overload
242def stringify_path(
243 filepath_or_buffer: BaseBufferT, convert_file_like: bool = ...
244) -> BaseBufferT:
245 ...
248def stringify_path(
249 filepath_or_buffer: FilePath | BaseBufferT,
250 convert_file_like: bool = False,
251) -> str | BaseBufferT:
252 """
253 Attempt to convert a path-like object to a string.
255 Parameters
256 ----------
257 filepath_or_buffer : object to be converted
259 Returns
260 -------
261 str_filepath_or_buffer : maybe a string version of the object
263 Notes
264 -----
265 Objects supporting the fspath protocol are coerced
266 according to its __fspath__ method.
268 Any other object is passed through unchanged, which includes bytes,
269 strings, buffers, or anything else that's not even path-like.
270 """
271 if not convert_file_like and is_file_like(filepath_or_buffer):
272 # GH 38125: some fsspec objects implement os.PathLike but have already opened a
273 # file. This prevents opening the file a second time. infer_compression calls
274 # this function with convert_file_like=True to infer the compression.
275 return cast(BaseBufferT, filepath_or_buffer)
277 if isinstance(filepath_or_buffer, os.PathLike):
278 filepath_or_buffer = filepath_or_buffer.__fspath__()
279 return _expand_user(filepath_or_buffer)
282def urlopen(*args, **kwargs):
283 """
284 Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of
285 the stdlib.
286 """
287 import urllib.request
289 return urllib.request.urlopen(*args, **kwargs)
292def is_fsspec_url(url: FilePath | BaseBuffer) -> bool:
293 """
294 Returns true if the given URL looks like
295 something fsspec can handle
296 """
297 return (
298 isinstance(url, str)
299 and bool(_RFC_3986_PATTERN.match(url))
300 and not url.startswith(("http://", "https://"))
301 )
304@doc(
305 storage_options=_shared_docs["storage_options"],
306 compression_options=_shared_docs["compression_options"] % "filepath_or_buffer",
307)
308def _get_filepath_or_buffer(
309 filepath_or_buffer: FilePath | BaseBuffer,
310 encoding: str = "utf-8",
311 compression: CompressionOptions | None = None,
312 mode: str = "r",
313 storage_options: StorageOptions | None = None,
314) -> IOArgs:
315 """
316 If the filepath_or_buffer is a url, translate and return the buffer.
317 Otherwise passthrough.
319 Parameters
320 ----------
321 filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path),
322 or buffer
323 {compression_options}
325 .. versionchanged:: 1.4.0 Zstandard support.
327 encoding : the encoding to use to decode bytes, default is 'utf-8'
328 mode : str, optional
330 {storage_options}
333 Returns the dataclass IOArgs.
334 """
335 filepath_or_buffer = stringify_path(filepath_or_buffer)
337 # handle compression dict
338 compression_method, compression = get_compression_method(compression)
339 compression_method = infer_compression(filepath_or_buffer, compression_method)
341 # GH21227 internal compression is not used for non-binary handles.
342 if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode:
343 warnings.warn(
344 "compression has no effect when passing a non-binary object as input.",
345 RuntimeWarning,
346 stacklevel=find_stack_level(),
347 )
348 compression_method = None
350 compression = dict(compression, method=compression_method)
352 # bz2 and xz do not write the byte order mark for utf-16 and utf-32
353 # print a warning when writing such files
354 if (
355 "w" in mode
356 and compression_method in ["bz2", "xz"]
357 and encoding in ["utf-16", "utf-32"]
358 ):
359 warnings.warn(
360 f"{compression} will not write the byte order mark for {encoding}",
361 UnicodeWarning,
362 stacklevel=find_stack_level(),
363 )
365 # Use binary mode when converting path-like objects to file-like objects (fsspec)
366 # except when text mode is explicitly requested. The original mode is returned if
367 # fsspec is not used.
368 fsspec_mode = mode
369 if "t" not in fsspec_mode and "b" not in fsspec_mode:
370 fsspec_mode += "b"
372 if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
373 # TODO: fsspec can also handle HTTP via requests, but leaving this
374 # unchanged. using fsspec appears to break the ability to infer if the
375 # server responded with gzipped data
376 storage_options = storage_options or {}
378 # waiting until now for importing to match intended lazy logic of
379 # urlopen function defined elsewhere in this module
380 import urllib.request
382 # assuming storage_options is to be interpreted as headers
383 req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options)
384 with urlopen(req_info) as req:
385 content_encoding = req.headers.get("Content-Encoding", None)
386 if content_encoding == "gzip":
387 # Override compression based on Content-Encoding header
388 compression = {"method": "gzip"}
389 reader = BytesIO(req.read())
390 return IOArgs(
391 filepath_or_buffer=reader,
392 encoding=encoding,
393 compression=compression,
394 should_close=True,
395 mode=fsspec_mode,
396 )
398 if is_fsspec_url(filepath_or_buffer):
399 assert isinstance(
400 filepath_or_buffer, str
401 ) # just to appease mypy for this branch
402 # two special-case s3-like protocols; these have special meaning in Hadoop,
403 # but are equivalent to just "s3" from fsspec's point of view
404 # cc #11071
405 if filepath_or_buffer.startswith("s3a://"):
406 filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://")
407 if filepath_or_buffer.startswith("s3n://"):
408 filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://")
409 fsspec = import_optional_dependency("fsspec")
411 # If botocore is installed we fallback to reading with anon=True
412 # to allow reads from public buckets
413 err_types_to_retry_with_anon: list[Any] = []
414 try:
415 import_optional_dependency("botocore")
416 from botocore.exceptions import (
417 ClientError,
418 NoCredentialsError,
419 )
421 err_types_to_retry_with_anon = [
422 ClientError,
423 NoCredentialsError,
424 PermissionError,
425 ]
426 except ImportError:
427 pass
429 try:
430 file_obj = fsspec.open(
431 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
432 ).open()
433 # GH 34626 Reads from Public Buckets without Credentials needs anon=True
434 except tuple(err_types_to_retry_with_anon):
435 if storage_options is None:
436 storage_options = {"anon": True}
437 else:
438 # don't mutate user input.
439 storage_options = dict(storage_options)
440 storage_options["anon"] = True
441 file_obj = fsspec.open(
442 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
443 ).open()
445 return IOArgs(
446 filepath_or_buffer=file_obj,
447 encoding=encoding,
448 compression=compression,
449 should_close=True,
450 mode=fsspec_mode,
451 )
452 elif storage_options:
453 raise ValueError(
454 "storage_options passed with file object or non-fsspec file path"
455 )
457 if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
458 return IOArgs(
459 filepath_or_buffer=_expand_user(filepath_or_buffer),
460 encoding=encoding,
461 compression=compression,
462 should_close=False,
463 mode=mode,
464 )
466 # is_file_like requires (read | write) & __iter__ but __iter__ is only
467 # needed for read_csv(engine=python)
468 if not (
469 hasattr(filepath_or_buffer, "read") or hasattr(filepath_or_buffer, "write")
470 ):
471 msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}"
472 raise ValueError(msg)
474 return IOArgs(
475 filepath_or_buffer=filepath_or_buffer,
476 encoding=encoding,
477 compression=compression,
478 should_close=False,
479 mode=mode,
480 )
483def file_path_to_url(path: str) -> str:
484 """
485 converts an absolute native path to a FILE URL.
487 Parameters
488 ----------
489 path : a path in native format
491 Returns
492 -------
493 a valid FILE URL
494 """
495 # lazify expensive import (~30ms)
496 from urllib.request import pathname2url
498 return urljoin("file:", pathname2url(path))
501extension_to_compression = {
502 ".tar": "tar",
503 ".tar.gz": "tar",
504 ".tar.bz2": "tar",
505 ".tar.xz": "tar",
506 ".gz": "gzip",
507 ".bz2": "bz2",
508 ".zip": "zip",
509 ".xz": "xz",
510 ".zst": "zstd",
511}
512_supported_compressions = set(extension_to_compression.values())
515def get_compression_method(
516 compression: CompressionOptions,
517) -> tuple[str | None, CompressionDict]:
518 """
519 Simplifies a compression argument to a compression method string and
520 a mapping containing additional arguments.
522 Parameters
523 ----------
524 compression : str or mapping
525 If string, specifies the compression method. If mapping, value at key
526 'method' specifies compression method.
528 Returns
529 -------
530 tuple of ({compression method}, Optional[str]
531 {compression arguments}, Dict[str, Any])
533 Raises
534 ------
535 ValueError on mapping missing 'method' key
536 """
537 compression_method: str | None
538 if isinstance(compression, Mapping):
539 compression_args = dict(compression)
540 try:
541 compression_method = compression_args.pop("method")
542 except KeyError as err:
543 raise ValueError("If mapping, compression must have key 'method'") from err
544 else:
545 compression_args = {}
546 compression_method = compression
547 return compression_method, compression_args
550@doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer")
551def infer_compression(
552 filepath_or_buffer: FilePath | BaseBuffer, compression: str | None
553) -> str | None:
554 """
555 Get the compression method for filepath_or_buffer. If compression='infer',
556 the inferred compression method is returned. Otherwise, the input
557 compression method is returned unchanged, unless it's invalid, in which
558 case an error is raised.
560 Parameters
561 ----------
562 filepath_or_buffer : str or file handle
563 File path or object.
564 {compression_options}
566 .. versionchanged:: 1.4.0 Zstandard support.
568 Returns
569 -------
570 string or None
572 Raises
573 ------
574 ValueError on invalid compression specified.
575 """
576 if compression is None:
577 return None
579 # Infer compression
580 if compression == "infer":
581 # Convert all path types (e.g. pathlib.Path) to strings
582 filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True)
583 if not isinstance(filepath_or_buffer, str):
584 # Cannot infer compression of a buffer, assume no compression
585 return None
587 # Infer compression from the filename/URL extension
588 for extension, compression in extension_to_compression.items():
589 if filepath_or_buffer.lower().endswith(extension):
590 return compression
591 return None
593 # Compression has been specified. Check that it's valid
594 if compression in _supported_compressions:
595 return compression
597 valid = ["infer", None] + sorted(_supported_compressions)
598 msg = (
599 f"Unrecognized compression type: {compression}\n"
600 f"Valid compression types are {valid}"
601 )
602 raise ValueError(msg)
605def check_parent_directory(path: Path | str) -> None:
606 """
607 Check if parent directory of a file exists, raise OSError if it does not
609 Parameters
610 ----------
611 path: Path or str
612 Path to check parent directory of
613 """
614 parent = Path(path).parent
615 if not parent.is_dir():
616 raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'")
619@overload
620def get_handle(
621 path_or_buf: FilePath | BaseBuffer,
622 mode: str,
623 *,
624 encoding: str | None = ...,
625 compression: CompressionOptions = ...,
626 memory_map: bool = ...,
627 is_text: Literal[False],
628 errors: str | None = ...,
629 storage_options: StorageOptions = ...,
630) -> IOHandles[bytes]:
631 ...
634@overload
635def get_handle(
636 path_or_buf: FilePath | BaseBuffer,
637 mode: str,
638 *,
639 encoding: str | None = ...,
640 compression: CompressionOptions = ...,
641 memory_map: bool = ...,
642 is_text: Literal[True] = ...,
643 errors: str | None = ...,
644 storage_options: StorageOptions = ...,
645) -> IOHandles[str]:
646 ...
649@overload
650def get_handle(
651 path_or_buf: FilePath | BaseBuffer,
652 mode: str,
653 *,
654 encoding: str | None = ...,
655 compression: CompressionOptions = ...,
656 memory_map: bool = ...,
657 is_text: bool = ...,
658 errors: str | None = ...,
659 storage_options: StorageOptions = ...,
660) -> IOHandles[str] | IOHandles[bytes]:
661 ...
664@doc(compression_options=_shared_docs["compression_options"] % "path_or_buf")
665def get_handle(
666 path_or_buf: FilePath | BaseBuffer,
667 mode: str,
668 *,
669 encoding: str | None = None,
670 compression: CompressionOptions | None = None,
671 memory_map: bool = False,
672 is_text: bool = True,
673 errors: str | None = None,
674 storage_options: StorageOptions | None = None,
675) -> IOHandles[str] | IOHandles[bytes]:
676 """
677 Get file handle for given path/buffer and mode.
679 Parameters
680 ----------
681 path_or_buf : str or file handle
682 File path or object.
683 mode : str
684 Mode to open path_or_buf with.
685 encoding : str or None
686 Encoding to use.
687 {compression_options}
689 May be a dict with key 'method' as compression mode
690 and other keys as compression options if compression
691 mode is 'zip'.
693 Passing compression options as keys in dict is
694 supported for compression modes 'gzip', 'bz2', 'zstd' and 'zip'.
696 .. versionchanged:: 1.4.0 Zstandard support.
698 memory_map : bool, default False
699 See parsers._parser_params for more information. Only used by read_csv.
700 is_text : bool, default True
701 Whether the type of the content passed to the file/buffer is string or
702 bytes. This is not the same as `"b" not in mode`. If a string content is
703 passed to a binary file/buffer, a wrapper is inserted.
704 errors : str, default 'strict'
705 Specifies how encoding and decoding errors are to be handled.
706 See the errors argument for :func:`open` for a full list
707 of options.
708 storage_options: StorageOptions = None
709 Passed to _get_filepath_or_buffer
711 Returns the dataclass IOHandles
712 """
713 # Windows does not default to utf-8. Set to utf-8 for a consistent behavior
714 encoding = encoding or "utf-8"
716 errors = errors or "strict"
718 # read_csv does not know whether the buffer is opened in binary/text mode
719 if _is_binary_mode(path_or_buf, mode) and "b" not in mode:
720 mode += "b"
722 # validate encoding and errors
723 codecs.lookup(encoding)
724 if isinstance(errors, str):
725 codecs.lookup_error(errors)
727 # open URLs
728 ioargs = _get_filepath_or_buffer(
729 path_or_buf,
730 encoding=encoding,
731 compression=compression,
732 mode=mode,
733 storage_options=storage_options,
734 )
736 handle = ioargs.filepath_or_buffer
737 handles: list[BaseBuffer]
739 # memory mapping needs to be the first step
740 # only used for read_csv
741 handle, memory_map, handles = _maybe_memory_map(handle, memory_map)
743 is_path = isinstance(handle, str)
744 compression_args = dict(ioargs.compression)
745 compression = compression_args.pop("method")
747 # Only for write methods
748 if "r" not in mode and is_path:
749 check_parent_directory(str(handle))
751 if compression:
752 if compression != "zstd":
753 # compression libraries do not like an explicit text-mode
754 ioargs.mode = ioargs.mode.replace("t", "")
755 elif compression == "zstd" and "b" not in ioargs.mode:
756 # python-zstandard defaults to text mode, but we always expect
757 # compression libraries to use binary mode.
758 ioargs.mode += "b"
760 # GZ Compression
761 if compression == "gzip":
762 if isinstance(handle, str):
763 # error: Incompatible types in assignment (expression has type
764 # "GzipFile", variable has type "Union[str, BaseBuffer]")
765 handle = gzip.GzipFile( # type: ignore[assignment]
766 filename=handle,
767 mode=ioargs.mode,
768 **compression_args,
769 )
770 else:
771 handle = gzip.GzipFile(
772 # No overload variant of "GzipFile" matches argument types
773 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]"
774 fileobj=handle, # type: ignore[call-overload]
775 mode=ioargs.mode,
776 **compression_args,
777 )
779 # BZ Compression
780 elif compression == "bz2":
781 # Overload of "BZ2File" to handle pickle protocol 5
782 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]"
783 handle = get_bz2_file()( # type: ignore[call-overload]
784 handle,
785 mode=ioargs.mode,
786 **compression_args,
787 )
789 # ZIP Compression
790 elif compression == "zip":
791 # error: Argument 1 to "_BytesZipFile" has incompatible type
792 # "Union[str, BaseBuffer]"; expected "Union[Union[str, PathLike[str]],
793 # ReadBuffer[bytes], WriteBuffer[bytes]]"
794 handle = _BytesZipFile(
795 handle, ioargs.mode, **compression_args # type: ignore[arg-type]
796 )
797 if handle.buffer.mode == "r":
798 handles.append(handle)
799 zip_names = handle.buffer.namelist()
800 if len(zip_names) == 1:
801 handle = handle.buffer.open(zip_names.pop())
802 elif not zip_names:
803 raise ValueError(f"Zero files found in ZIP file {path_or_buf}")
804 else:
805 raise ValueError(
806 "Multiple files found in ZIP file. "
807 f"Only one file per ZIP: {zip_names}"
808 )
810 # TAR Encoding
811 elif compression == "tar":
812 compression_args.setdefault("mode", ioargs.mode)
813 if isinstance(handle, str):
814 handle = _BytesTarFile(name=handle, **compression_args)
815 else:
816 # error: Argument "fileobj" to "_BytesTarFile" has incompatible
817 # type "BaseBuffer"; expected "Union[ReadBuffer[bytes],
818 # WriteBuffer[bytes], None]"
819 handle = _BytesTarFile(
820 fileobj=handle, **compression_args # type: ignore[arg-type]
821 )
822 assert isinstance(handle, _BytesTarFile)
823 if "r" in handle.buffer.mode:
824 handles.append(handle)
825 files = handle.buffer.getnames()
826 if len(files) == 1:
827 file = handle.buffer.extractfile(files[0])
828 assert file is not None
829 handle = file
830 elif not files:
831 raise ValueError(f"Zero files found in TAR archive {path_or_buf}")
832 else:
833 raise ValueError(
834 "Multiple files found in TAR archive. "
835 f"Only one file per TAR archive: {files}"
836 )
838 # XZ Compression
839 elif compression == "xz":
840 # error: Argument 1 to "LZMAFile" has incompatible type "Union[str,
841 # BaseBuffer]"; expected "Optional[Union[Union[str, bytes, PathLike[str],
842 # PathLike[bytes]], IO[bytes]], None]"
843 handle = get_lzma_file()(
844 handle, ioargs.mode, **compression_args # type: ignore[arg-type]
845 )
847 # Zstd Compression
848 elif compression == "zstd":
849 zstd = import_optional_dependency("zstandard")
850 if "r" in ioargs.mode:
851 open_args = {"dctx": zstd.ZstdDecompressor(**compression_args)}
852 else:
853 open_args = {"cctx": zstd.ZstdCompressor(**compression_args)}
854 handle = zstd.open(
855 handle,
856 mode=ioargs.mode,
857 **open_args,
858 )
860 # Unrecognized Compression
861 else:
862 msg = f"Unrecognized compression type: {compression}"
863 raise ValueError(msg)
865 assert not isinstance(handle, str)
866 handles.append(handle)
868 elif isinstance(handle, str):
869 # Check whether the filename is to be opened in binary mode.
870 # Binary mode does not support 'encoding' and 'newline'.
871 if ioargs.encoding and "b" not in ioargs.mode:
872 # Encoding
873 handle = open(
874 handle,
875 ioargs.mode,
876 encoding=ioargs.encoding,
877 errors=errors,
878 newline="",
879 )
880 else:
881 # Binary mode
882 handle = open(handle, ioargs.mode)
883 handles.append(handle)
885 # Convert BytesIO or file objects passed with an encoding
886 is_wrapped = False
887 if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase):
888 # not added to handles as it does not open/buffer resources
889 handle = _BytesIOWrapper(
890 handle,
891 encoding=ioargs.encoding,
892 )
893 elif is_text and (
894 compression or memory_map or _is_binary_mode(handle, ioargs.mode)
895 ):
896 if (
897 not hasattr(handle, "readable")
898 or not hasattr(handle, "writable")
899 or not hasattr(handle, "seekable")
900 ):
901 handle = _IOWrapper(handle)
902 # error: Argument 1 to "TextIOWrapper" has incompatible type
903 # "_IOWrapper"; expected "IO[bytes]"
904 handle = TextIOWrapper(
905 handle, # type: ignore[arg-type]
906 encoding=ioargs.encoding,
907 errors=errors,
908 newline="",
909 )
910 handles.append(handle)
911 # only marked as wrapped when the caller provided a handle
912 is_wrapped = not (
913 isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close
914 )
916 if "r" in ioargs.mode and not hasattr(handle, "read"):
917 raise TypeError(
918 "Expected file path name or file-like object, "
919 f"got {type(ioargs.filepath_or_buffer)} type"
920 )
922 handles.reverse() # close the most recently added buffer first
923 if ioargs.should_close:
924 assert not isinstance(ioargs.filepath_or_buffer, str)
925 handles.append(ioargs.filepath_or_buffer)
927 return IOHandles(
928 # error: Argument "handle" to "IOHandles" has incompatible type
929 # "Union[TextIOWrapper, GzipFile, BaseBuffer, typing.IO[bytes],
930 # typing.IO[Any]]"; expected "pandas._typing.IO[Any]"
931 handle=handle, # type: ignore[arg-type]
932 # error: Argument "created_handles" to "IOHandles" has incompatible type
933 # "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]"
934 created_handles=handles, # type: ignore[arg-type]
935 is_wrapped=is_wrapped,
936 compression=ioargs.compression,
937 )
940# error: Definition of "__enter__" in base class "IOBase" is incompatible
941# with definition in base class "BinaryIO"
942class _BufferedWriter(BytesIO, ABC): # type: ignore[misc]
943 """
944 Some objects do not support multiple .write() calls (TarFile and ZipFile).
945 This wrapper writes to the underlying buffer on close.
946 """
948 buffer = BytesIO()
950 @abstractmethod
951 def write_to_buffer(self) -> None:
952 ...
954 def close(self) -> None:
955 if self.closed:
956 # already closed
957 return
958 if self.getbuffer().nbytes:
959 # write to buffer
960 self.seek(0)
961 with self.buffer:
962 self.write_to_buffer()
963 else:
964 self.buffer.close()
965 super().close()
968class _BytesTarFile(_BufferedWriter):
969 def __init__(
970 self,
971 name: str | None = None,
972 mode: Literal["r", "a", "w", "x"] = "r",
973 fileobj: ReadBuffer[bytes] | WriteBuffer[bytes] | None = None,
974 archive_name: str | None = None,
975 **kwargs,
976 ) -> None:
977 super().__init__()
978 self.archive_name = archive_name
979 self.name = name
980 # error: Incompatible types in assignment (expression has type "TarFile",
981 # base class "_BufferedWriter" defined the type as "BytesIO")
982 self.buffer: tarfile.TarFile = tarfile.TarFile.open( # type: ignore[assignment]
983 name=name,
984 mode=self.extend_mode(mode),
985 fileobj=fileobj,
986 **kwargs,
987 )
989 def extend_mode(self, mode: str) -> str:
990 mode = mode.replace("b", "")
991 if mode != "w":
992 return mode
993 if self.name is not None:
994 suffix = Path(self.name).suffix
995 if suffix in (".gz", ".xz", ".bz2"):
996 mode = f"{mode}:{suffix[1:]}"
997 return mode
999 def infer_filename(self) -> str | None:
1000 """
1001 If an explicit archive_name is not given, we still want the file inside the zip
1002 file not to be named something.tar, because that causes confusion (GH39465).
1003 """
1004 if self.name is None:
1005 return None
1007 filename = Path(self.name)
1008 if filename.suffix == ".tar":
1009 return filename.with_suffix("").name
1010 elif filename.suffix in (".tar.gz", ".tar.bz2", ".tar.xz"):
1011 return filename.with_suffix("").with_suffix("").name
1012 return filename.name
1014 def write_to_buffer(self) -> None:
1015 # TarFile needs a non-empty string
1016 archive_name = self.archive_name or self.infer_filename() or "tar"
1017 tarinfo = tarfile.TarInfo(name=archive_name)
1018 tarinfo.size = len(self.getvalue())
1019 self.buffer.addfile(tarinfo, self)
1022class _BytesZipFile(_BufferedWriter):
1023 def __init__(
1024 self,
1025 file: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],
1026 mode: str,
1027 archive_name: str | None = None,
1028 **kwargs,
1029 ) -> None:
1030 super().__init__()
1031 mode = mode.replace("b", "")
1032 self.archive_name = archive_name
1034 kwargs.setdefault("compression", zipfile.ZIP_DEFLATED)
1035 # error: Incompatible types in assignment (expression has type "ZipFile",
1036 # base class "_BufferedWriter" defined the type as "BytesIO")
1037 self.buffer: zipfile.ZipFile = zipfile.ZipFile( # type: ignore[assignment]
1038 file, mode, **kwargs
1039 )
1041 def infer_filename(self) -> str | None:
1042 """
1043 If an explicit archive_name is not given, we still want the file inside the zip
1044 file not to be named something.zip, because that causes confusion (GH39465).
1045 """
1046 if isinstance(self.buffer.filename, (os.PathLike, str)):
1047 filename = Path(self.buffer.filename)
1048 if filename.suffix == ".zip":
1049 return filename.with_suffix("").name
1050 return filename.name
1051 return None
1053 def write_to_buffer(self) -> None:
1054 # ZipFile needs a non-empty string
1055 archive_name = self.archive_name or self.infer_filename() or "zip"
1056 self.buffer.writestr(archive_name, self.getvalue())
1059class _IOWrapper:
1060 # TextIOWrapper is overly strict: it request that the buffer has seekable, readable,
1061 # and writable. If we have a read-only buffer, we shouldn't need writable and vice
1062 # versa. Some buffers, are seek/read/writ-able but they do not have the "-able"
1063 # methods, e.g., tempfile.SpooledTemporaryFile.
1064 # If a buffer does not have the above "-able" methods, we simple assume they are
1065 # seek/read/writ-able.
1066 def __init__(self, buffer: BaseBuffer) -> None:
1067 self.buffer = buffer
1069 def __getattr__(self, name: str):
1070 return getattr(self.buffer, name)
1072 def readable(self) -> bool:
1073 if hasattr(self.buffer, "readable"):
1074 return self.buffer.readable()
1075 return True
1077 def seekable(self) -> bool:
1078 if hasattr(self.buffer, "seekable"):
1079 return self.buffer.seekable()
1080 return True
1082 def writable(self) -> bool:
1083 if hasattr(self.buffer, "writable"):
1084 return self.buffer.writable()
1085 return True
1088class _BytesIOWrapper:
1089 # Wrapper that wraps a StringIO buffer and reads bytes from it
1090 # Created for compat with pyarrow read_csv
1091 def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8") -> None:
1092 self.buffer = buffer
1093 self.encoding = encoding
1094 # Because a character can be represented by more than 1 byte,
1095 # it is possible that reading will produce more bytes than n
1096 # We store the extra bytes in this overflow variable, and append the
1097 # overflow to the front of the bytestring the next time reading is performed
1098 self.overflow = b""
1100 def __getattr__(self, attr: str):
1101 return getattr(self.buffer, attr)
1103 def read(self, n: int | None = -1) -> bytes:
1104 assert self.buffer is not None
1105 bytestring = self.buffer.read(n).encode(self.encoding)
1106 # When n=-1/n greater than remaining bytes: Read entire file/rest of file
1107 combined_bytestring = self.overflow + bytestring
1108 if n is None or n < 0 or n >= len(combined_bytestring):
1109 self.overflow = b""
1110 return combined_bytestring
1111 else:
1112 to_return = combined_bytestring[:n]
1113 self.overflow = combined_bytestring[n:]
1114 return to_return
1117def _maybe_memory_map(
1118 handle: str | BaseBuffer, memory_map: bool
1119) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]:
1120 """Try to memory map file/buffer."""
1121 handles: list[BaseBuffer] = []
1122 memory_map &= hasattr(handle, "fileno") or isinstance(handle, str)
1123 if not memory_map:
1124 return handle, memory_map, handles
1126 # mmap used by only read_csv
1127 handle = cast(ReadCsvBuffer, handle)
1129 # need to open the file first
1130 if isinstance(handle, str):
1131 handle = open(handle, "rb")
1132 handles.append(handle)
1134 try:
1135 # open mmap and adds *-able
1136 # error: Argument 1 to "_IOWrapper" has incompatible type "mmap";
1137 # expected "BaseBuffer"
1138 wrapped = _IOWrapper(
1139 mmap.mmap(
1140 handle.fileno(), 0, access=mmap.ACCESS_READ # type: ignore[arg-type]
1141 )
1142 )
1143 finally:
1144 for handle in reversed(handles):
1145 # error: "BaseBuffer" has no attribute "close"
1146 handle.close() # type: ignore[attr-defined]
1148 return wrapped, memory_map, [wrapped]
1151def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool:
1152 """Test whether file exists."""
1153 exists = False
1154 filepath_or_buffer = stringify_path(filepath_or_buffer)
1155 if not isinstance(filepath_or_buffer, str):
1156 return exists
1157 try:
1158 exists = os.path.exists(filepath_or_buffer)
1159 # gh-5874: if the filepath is too long will raise here
1160 except (TypeError, ValueError):
1161 pass
1162 return exists
1165def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool:
1166 """Whether the handle is opened in binary mode"""
1167 # specified by user
1168 if "t" in mode or "b" in mode:
1169 return "b" in mode
1171 # exceptions
1172 text_classes = (
1173 # classes that expect string but have 'b' in mode
1174 codecs.StreamWriter,
1175 codecs.StreamReader,
1176 codecs.StreamReaderWriter,
1177 )
1178 if issubclass(type(handle), text_classes):
1179 return False
1181 return isinstance(handle, _get_binary_io_classes()) or "b" in getattr(
1182 handle, "mode", mode
1183 )
1186@functools.lru_cache
1187def _get_binary_io_classes() -> tuple[type, ...]:
1188 """IO classes that that expect bytes"""
1189 binary_classes: tuple[type, ...] = (BufferedIOBase, RawIOBase)
1191 # python-zstandard doesn't use any of the builtin base classes; instead we
1192 # have to use the `zstd.ZstdDecompressionReader` class for isinstance checks.
1193 # Unfortunately `zstd.ZstdDecompressionReader` isn't exposed by python-zstandard
1194 # so we have to get it from a `zstd.ZstdDecompressor` instance.
1195 # See also https://github.com/indygreg/python-zstandard/pull/165.
1196 zstd = import_optional_dependency("zstandard", errors="ignore")
1197 if zstd is not None:
1198 with zstd.ZstdDecompressor().stream_reader(b"") as reader:
1199 binary_classes += (type(reader),)
1201 return binary_classes
1204def is_potential_multi_index(
1205 columns: Sequence[Hashable] | MultiIndex,
1206 index_col: bool | Sequence[int] | None = None,
1207) -> bool:
1208 """
1209 Check whether or not the `columns` parameter
1210 could be converted into a MultiIndex.
1212 Parameters
1213 ----------
1214 columns : array-like
1215 Object which may or may not be convertible into a MultiIndex
1216 index_col : None, bool or list, optional
1217 Column or columns to use as the (possibly hierarchical) index
1219 Returns
1220 -------
1221 bool : Whether or not columns could become a MultiIndex
1222 """
1223 if index_col is None or isinstance(index_col, bool):
1224 index_col = []
1226 return bool(
1227 len(columns)
1228 and not isinstance(columns, ABCMultiIndex)
1229 and all(isinstance(c, tuple) for c in columns if c not in list(index_col))
1230 )
1233def dedup_names(
1234 names: Sequence[Hashable], is_potential_multiindex: bool
1235) -> Sequence[Hashable]:
1236 """
1237 Rename column names if duplicates exist.
1239 Currently the renaming is done by appending a period and an autonumeric,
1240 but a custom pattern may be supported in the future.
1242 Examples
1243 --------
1244 >>> dedup_names(["x", "y", "x", "x"], is_potential_multiindex=False)
1245 ['x', 'y', 'x.1', 'x.2']
1246 """
1247 names = list(names) # so we can index
1248 counts: DefaultDict[Hashable, int] = defaultdict(int)
1250 for i, col in enumerate(names):
1251 cur_count = counts[col]
1253 while cur_count > 0:
1254 counts[col] = cur_count + 1
1256 if is_potential_multiindex:
1257 # for mypy
1258 assert isinstance(col, tuple)
1259 col = col[:-1] + (f"{col[-1]}.{cur_count}",)
1260 else:
1261 col = f"{col}.{cur_count}"
1262 cur_count = counts[col]
1264 names[i] = col
1265 counts[col] = cur_count + 1
1267 return names