1""" parquet compat """
2from __future__ import annotations
3
4import io
5import json
6import os
7from typing import (
8 TYPE_CHECKING,
9 Any,
10 Literal,
11)
12import warnings
13from warnings import catch_warnings
14
15from pandas._config import using_pyarrow_string_dtype
16from pandas._config.config import _get_option
17
18from pandas._libs import lib
19from pandas.compat._optional import import_optional_dependency
20from pandas.errors import AbstractMethodError
21from pandas.util._decorators import doc
22from pandas.util._exceptions import find_stack_level
23from pandas.util._validators import check_dtype_backend
24
25import pandas as pd
26from pandas import (
27 DataFrame,
28 get_option,
29)
30from pandas.core.shared_docs import _shared_docs
31
32from pandas.io._util import arrow_string_types_mapper
33from pandas.io.common import (
34 IOHandles,
35 get_handle,
36 is_fsspec_url,
37 is_url,
38 stringify_path,
39)
40
41if TYPE_CHECKING:
42 from pandas._typing import (
43 DtypeBackend,
44 FilePath,
45 ReadBuffer,
46 StorageOptions,
47 WriteBuffer,
48 )
49
50
51def get_engine(engine: str) -> BaseImpl:
52 """return our implementation"""
53 if engine == "auto":
54 engine = get_option("io.parquet.engine")
55
56 if engine == "auto":
57 # try engines in this order
58 engine_classes = [PyArrowImpl, FastParquetImpl]
59
60 error_msgs = ""
61 for engine_class in engine_classes:
62 try:
63 return engine_class()
64 except ImportError as err:
65 error_msgs += "\n - " + str(err)
66
67 raise ImportError(
68 "Unable to find a usable engine; "
69 "tried using: 'pyarrow', 'fastparquet'.\n"
70 "A suitable version of "
71 "pyarrow or fastparquet is required for parquet "
72 "support.\n"
73 "Trying to import the above resulted in these errors:"
74 f"{error_msgs}"
75 )
76
77 if engine == "pyarrow":
78 return PyArrowImpl()
79 elif engine == "fastparquet":
80 return FastParquetImpl()
81
82 raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
83
84
85def _get_path_or_handle(
86 path: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],
87 fs: Any,
88 storage_options: StorageOptions | None = None,
89 mode: str = "rb",
90 is_dir: bool = False,
91) -> tuple[
92 FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], IOHandles[bytes] | None, Any
93]:
94 """File handling for PyArrow."""
95 path_or_handle = stringify_path(path)
96 if fs is not None:
97 pa_fs = import_optional_dependency("pyarrow.fs", errors="ignore")
98 fsspec = import_optional_dependency("fsspec", errors="ignore")
99 if pa_fs is not None and isinstance(fs, pa_fs.FileSystem):
100 if storage_options:
101 raise NotImplementedError(
102 "storage_options not supported with a pyarrow FileSystem."
103 )
104 elif fsspec is not None and isinstance(fs, fsspec.spec.AbstractFileSystem):
105 pass
106 else:
107 raise ValueError(
108 f"filesystem must be a pyarrow or fsspec FileSystem, "
109 f"not a {type(fs).__name__}"
110 )
111 if is_fsspec_url(path_or_handle) and fs is None:
112 if storage_options is None:
113 pa = import_optional_dependency("pyarrow")
114 pa_fs = import_optional_dependency("pyarrow.fs")
115
116 try:
117 fs, path_or_handle = pa_fs.FileSystem.from_uri(path)
118 except (TypeError, pa.ArrowInvalid):
119 pass
120 if fs is None:
121 fsspec = import_optional_dependency("fsspec")
122 fs, path_or_handle = fsspec.core.url_to_fs(
123 path_or_handle, **(storage_options or {})
124 )
125 elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
126 # can't write to a remote url
127 # without making use of fsspec at the moment
128 raise ValueError("storage_options passed with buffer, or non-supported URL")
129
130 handles = None
131 if (
132 not fs
133 and not is_dir
134 and isinstance(path_or_handle, str)
135 and not os.path.isdir(path_or_handle)
136 ):
137 # use get_handle only when we are very certain that it is not a directory
138 # fsspec resources can also point to directories
139 # this branch is used for example when reading from non-fsspec URLs
140 handles = get_handle(
141 path_or_handle, mode, is_text=False, storage_options=storage_options
142 )
143 fs = None
144 path_or_handle = handles.handle
145 return path_or_handle, handles, fs
146
147
148class BaseImpl:
149 @staticmethod
150 def validate_dataframe(df: DataFrame) -> None:
151 if not isinstance(df, DataFrame):
152 raise ValueError("to_parquet only supports IO with DataFrames")
153
154 def write(self, df: DataFrame, path, compression, **kwargs):
155 raise AbstractMethodError(self)
156
157 def read(self, path, columns=None, **kwargs) -> DataFrame:
158 raise AbstractMethodError(self)
159
160
161class PyArrowImpl(BaseImpl):
162 def __init__(self) -> None:
163 import_optional_dependency(
164 "pyarrow", extra="pyarrow is required for parquet support."
165 )
166 import pyarrow.parquet
167
168 # import utils to register the pyarrow extension types
169 import pandas.core.arrays.arrow.extension_types # pyright: ignore[reportUnusedImport] # noqa: F401
170
171 self.api = pyarrow
172
173 def write(
174 self,
175 df: DataFrame,
176 path: FilePath | WriteBuffer[bytes],
177 compression: str | None = "snappy",
178 index: bool | None = None,
179 storage_options: StorageOptions | None = None,
180 partition_cols: list[str] | None = None,
181 filesystem=None,
182 **kwargs,
183 ) -> None:
184 self.validate_dataframe(df)
185
186 from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)}
187 if index is not None:
188 from_pandas_kwargs["preserve_index"] = index
189
190 table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
191
192 if df.attrs:
193 df_metadata = {"PANDAS_ATTRS": json.dumps(df.attrs)}
194 existing_metadata = table.schema.metadata
195 merged_metadata = {**existing_metadata, **df_metadata}
196 table = table.replace_schema_metadata(merged_metadata)
197
198 path_or_handle, handles, filesystem = _get_path_or_handle(
199 path,
200 filesystem,
201 storage_options=storage_options,
202 mode="wb",
203 is_dir=partition_cols is not None,
204 )
205 if (
206 isinstance(path_or_handle, io.BufferedWriter)
207 and hasattr(path_or_handle, "name")
208 and isinstance(path_or_handle.name, (str, bytes))
209 ):
210 if isinstance(path_or_handle.name, bytes):
211 path_or_handle = path_or_handle.name.decode()
212 else:
213 path_or_handle = path_or_handle.name
214
215 try:
216 if partition_cols is not None:
217 # writes to multiple files under the given path
218 self.api.parquet.write_to_dataset(
219 table,
220 path_or_handle,
221 compression=compression,
222 partition_cols=partition_cols,
223 filesystem=filesystem,
224 **kwargs,
225 )
226 else:
227 # write to single output file
228 self.api.parquet.write_table(
229 table,
230 path_or_handle,
231 compression=compression,
232 filesystem=filesystem,
233 **kwargs,
234 )
235 finally:
236 if handles is not None:
237 handles.close()
238
239 def read(
240 self,
241 path,
242 columns=None,
243 filters=None,
244 use_nullable_dtypes: bool = False,
245 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
246 storage_options: StorageOptions | None = None,
247 filesystem=None,
248 **kwargs,
249 ) -> DataFrame:
250 kwargs["use_pandas_metadata"] = True
251
252 to_pandas_kwargs = {}
253 if dtype_backend == "numpy_nullable":
254 from pandas.io._util import _arrow_dtype_mapping
255
256 mapping = _arrow_dtype_mapping()
257 to_pandas_kwargs["types_mapper"] = mapping.get
258 elif dtype_backend == "pyarrow":
259 to_pandas_kwargs["types_mapper"] = pd.ArrowDtype # type: ignore[assignment]
260 elif using_pyarrow_string_dtype():
261 to_pandas_kwargs["types_mapper"] = arrow_string_types_mapper()
262
263 manager = _get_option("mode.data_manager", silent=True)
264 if manager == "array":
265 to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
266
267 path_or_handle, handles, filesystem = _get_path_or_handle(
268 path,
269 filesystem,
270 storage_options=storage_options,
271 mode="rb",
272 )
273 try:
274 pa_table = self.api.parquet.read_table(
275 path_or_handle,
276 columns=columns,
277 filesystem=filesystem,
278 filters=filters,
279 **kwargs,
280 )
281 result = pa_table.to_pandas(**to_pandas_kwargs)
282
283 if manager == "array":
284 result = result._as_manager("array", copy=False)
285
286 if pa_table.schema.metadata:
287 if b"PANDAS_ATTRS" in pa_table.schema.metadata:
288 df_metadata = pa_table.schema.metadata[b"PANDAS_ATTRS"]
289 result.attrs = json.loads(df_metadata)
290 return result
291 finally:
292 if handles is not None:
293 handles.close()
294
295
296class FastParquetImpl(BaseImpl):
297 def __init__(self) -> None:
298 # since pandas is a dependency of fastparquet
299 # we need to import on first use
300 fastparquet = import_optional_dependency(
301 "fastparquet", extra="fastparquet is required for parquet support."
302 )
303 self.api = fastparquet
304
305 def write(
306 self,
307 df: DataFrame,
308 path,
309 compression: Literal["snappy", "gzip", "brotli"] | None = "snappy",
310 index=None,
311 partition_cols=None,
312 storage_options: StorageOptions | None = None,
313 filesystem=None,
314 **kwargs,
315 ) -> None:
316 self.validate_dataframe(df)
317
318 if "partition_on" in kwargs and partition_cols is not None:
319 raise ValueError(
320 "Cannot use both partition_on and "
321 "partition_cols. Use partition_cols for partitioning data"
322 )
323 if "partition_on" in kwargs:
324 partition_cols = kwargs.pop("partition_on")
325
326 if partition_cols is not None:
327 kwargs["file_scheme"] = "hive"
328
329 if filesystem is not None:
330 raise NotImplementedError(
331 "filesystem is not implemented for the fastparquet engine."
332 )
333
334 # cannot use get_handle as write() does not accept file buffers
335 path = stringify_path(path)
336 if is_fsspec_url(path):
337 fsspec = import_optional_dependency("fsspec")
338
339 # if filesystem is provided by fsspec, file must be opened in 'wb' mode.
340 kwargs["open_with"] = lambda path, _: fsspec.open(
341 path, "wb", **(storage_options or {})
342 ).open()
343 elif storage_options:
344 raise ValueError(
345 "storage_options passed with file object or non-fsspec file path"
346 )
347
348 with catch_warnings(record=True):
349 self.api.write(
350 path,
351 df,
352 compression=compression,
353 write_index=index,
354 partition_on=partition_cols,
355 **kwargs,
356 )
357
358 def read(
359 self,
360 path,
361 columns=None,
362 filters=None,
363 storage_options: StorageOptions | None = None,
364 filesystem=None,
365 **kwargs,
366 ) -> DataFrame:
367 parquet_kwargs: dict[str, Any] = {}
368 use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
369 dtype_backend = kwargs.pop("dtype_backend", lib.no_default)
370 # We are disabling nullable dtypes for fastparquet pending discussion
371 parquet_kwargs["pandas_nulls"] = False
372 if use_nullable_dtypes:
373 raise ValueError(
374 "The 'use_nullable_dtypes' argument is not supported for the "
375 "fastparquet engine"
376 )
377 if dtype_backend is not lib.no_default:
378 raise ValueError(
379 "The 'dtype_backend' argument is not supported for the "
380 "fastparquet engine"
381 )
382 if filesystem is not None:
383 raise NotImplementedError(
384 "filesystem is not implemented for the fastparquet engine."
385 )
386 path = stringify_path(path)
387 handles = None
388 if is_fsspec_url(path):
389 fsspec = import_optional_dependency("fsspec")
390
391 parquet_kwargs["fs"] = fsspec.open(path, "rb", **(storage_options or {})).fs
392 elif isinstance(path, str) and not os.path.isdir(path):
393 # use get_handle only when we are very certain that it is not a directory
394 # fsspec resources can also point to directories
395 # this branch is used for example when reading from non-fsspec URLs
396 handles = get_handle(
397 path, "rb", is_text=False, storage_options=storage_options
398 )
399 path = handles.handle
400
401 try:
402 parquet_file = self.api.ParquetFile(path, **parquet_kwargs)
403 return parquet_file.to_pandas(columns=columns, filters=filters, **kwargs)
404 finally:
405 if handles is not None:
406 handles.close()
407
408
409@doc(storage_options=_shared_docs["storage_options"])
410def to_parquet(
411 df: DataFrame,
412 path: FilePath | WriteBuffer[bytes] | None = None,
413 engine: str = "auto",
414 compression: str | None = "snappy",
415 index: bool | None = None,
416 storage_options: StorageOptions | None = None,
417 partition_cols: list[str] | None = None,
418 filesystem: Any = None,
419 **kwargs,
420) -> bytes | None:
421 """
422 Write a DataFrame to the parquet format.
423
424 Parameters
425 ----------
426 df : DataFrame
427 path : str, path object, file-like object, or None, default None
428 String, path object (implementing ``os.PathLike[str]``), or file-like
429 object implementing a binary ``write()`` function. If None, the result is
430 returned as bytes. If a string, it will be used as Root Directory path
431 when writing a partitioned dataset. The engine fastparquet does not
432 accept file-like objects.
433 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
434 Parquet library to use. If 'auto', then the option
435 ``io.parquet.engine`` is used. The default ``io.parquet.engine``
436 behavior is to try 'pyarrow', falling back to 'fastparquet' if
437 'pyarrow' is unavailable.
438
439 When using the ``'pyarrow'`` engine and no storage options are provided
440 and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec``
441 (e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first.
442 Use the filesystem keyword with an instantiated fsspec filesystem
443 if you wish to use its implementation.
444 compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},
445 default 'snappy'. Name of the compression to use. Use ``None``
446 for no compression.
447 index : bool, default None
448 If ``True``, include the dataframe's index(es) in the file output. If
449 ``False``, they will not be written to the file.
450 If ``None``, similar to ``True`` the dataframe's index(es)
451 will be saved. However, instead of being saved as values,
452 the RangeIndex will be stored as a range in the metadata so it
453 doesn't require much space and is faster. Other indexes will
454 be included as columns in the file output.
455 partition_cols : str or list, optional, default None
456 Column names by which to partition the dataset.
457 Columns are partitioned in the order they are given.
458 Must be None if path is not a string.
459 {storage_options}
460
461 filesystem : fsspec or pyarrow filesystem, default None
462 Filesystem object to use when reading the parquet file. Only implemented
463 for ``engine="pyarrow"``.
464
465 .. versionadded:: 2.1.0
466
467 kwargs
468 Additional keyword arguments passed to the engine
469
470 Returns
471 -------
472 bytes if no path argument is provided else None
473 """
474 if isinstance(partition_cols, str):
475 partition_cols = [partition_cols]
476 impl = get_engine(engine)
477
478 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
479
480 impl.write(
481 df,
482 path_or_buf,
483 compression=compression,
484 index=index,
485 partition_cols=partition_cols,
486 storage_options=storage_options,
487 filesystem=filesystem,
488 **kwargs,
489 )
490
491 if path is None:
492 assert isinstance(path_or_buf, io.BytesIO)
493 return path_or_buf.getvalue()
494 else:
495 return None
496
497
498@doc(storage_options=_shared_docs["storage_options"])
499def read_parquet(
500 path: FilePath | ReadBuffer[bytes],
501 engine: str = "auto",
502 columns: list[str] | None = None,
503 storage_options: StorageOptions | None = None,
504 use_nullable_dtypes: bool | lib.NoDefault = lib.no_default,
505 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
506 filesystem: Any = None,
507 filters: list[tuple] | list[list[tuple]] | None = None,
508 **kwargs,
509) -> DataFrame:
510 """
511 Load a parquet object from the file path, returning a DataFrame.
512
513 Parameters
514 ----------
515 path : str, path object or file-like object
516 String, path object (implementing ``os.PathLike[str]``), or file-like
517 object implementing a binary ``read()`` function.
518 The string could be a URL. Valid URL schemes include http, ftp, s3,
519 gs, and file. For file URLs, a host is expected. A local file could be:
520 ``file://localhost/path/to/table.parquet``.
521 A file URL can also be a path to a directory that contains multiple
522 partitioned parquet files. Both pyarrow and fastparquet support
523 paths to directories as well as file URLs. A directory path could be:
524 ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
525 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
526 Parquet library to use. If 'auto', then the option
527 ``io.parquet.engine`` is used. The default ``io.parquet.engine``
528 behavior is to try 'pyarrow', falling back to 'fastparquet' if
529 'pyarrow' is unavailable.
530
531 When using the ``'pyarrow'`` engine and no storage options are provided
532 and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec``
533 (e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first.
534 Use the filesystem keyword with an instantiated fsspec filesystem
535 if you wish to use its implementation.
536 columns : list, default=None
537 If not None, only these columns will be read from the file.
538 {storage_options}
539
540 .. versionadded:: 1.3.0
541
542 use_nullable_dtypes : bool, default False
543 If True, use dtypes that use ``pd.NA`` as missing value indicator
544 for the resulting DataFrame. (only applicable for the ``pyarrow``
545 engine)
546 As new dtypes are added that support ``pd.NA`` in the future, the
547 output with this option will change to use those dtypes.
548 Note: this is an experimental option, and behaviour (e.g. additional
549 support dtypes) may change without notice.
550
551 .. deprecated:: 2.0
552
553 dtype_backend : {{'numpy_nullable', 'pyarrow'}}, default 'numpy_nullable'
554 Back-end data type applied to the resultant :class:`DataFrame`
555 (still experimental). Behaviour is as follows:
556
557 * ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame`
558 (default).
559 * ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype`
560 DataFrame.
561
562 .. versionadded:: 2.0
563
564 filesystem : fsspec or pyarrow filesystem, default None
565 Filesystem object to use when reading the parquet file. Only implemented
566 for ``engine="pyarrow"``.
567
568 .. versionadded:: 2.1.0
569
570 filters : List[Tuple] or List[List[Tuple]], default None
571 To filter out data.
572 Filter syntax: [[(column, op, val), ...],...]
573 where op is [==, =, >, >=, <, <=, !=, in, not in]
574 The innermost tuples are transposed into a set of filters applied
575 through an `AND` operation.
576 The outer list combines these sets of filters through an `OR`
577 operation.
578 A single list of tuples can also be used, meaning that no `OR`
579 operation between set of filters is to be conducted.
580
581 Using this argument will NOT result in row-wise filtering of the final
582 partitions unless ``engine="pyarrow"`` is also specified. For
583 other engines, filtering is only performed at the partition level, that is,
584 to prevent the loading of some row-groups and/or files.
585
586 .. versionadded:: 2.1.0
587
588 **kwargs
589 Any additional kwargs are passed to the engine.
590
591 Returns
592 -------
593 DataFrame
594
595 See Also
596 --------
597 DataFrame.to_parquet : Create a parquet object that serializes a DataFrame.
598
599 Examples
600 --------
601 >>> original_df = pd.DataFrame(
602 ... {{"foo": range(5), "bar": range(5, 10)}}
603 ... )
604 >>> original_df
605 foo bar
606 0 0 5
607 1 1 6
608 2 2 7
609 3 3 8
610 4 4 9
611 >>> df_parquet_bytes = original_df.to_parquet()
612 >>> from io import BytesIO
613 >>> restored_df = pd.read_parquet(BytesIO(df_parquet_bytes))
614 >>> restored_df
615 foo bar
616 0 0 5
617 1 1 6
618 2 2 7
619 3 3 8
620 4 4 9
621 >>> restored_df.equals(original_df)
622 True
623 >>> restored_bar = pd.read_parquet(BytesIO(df_parquet_bytes), columns=["bar"])
624 >>> restored_bar
625 bar
626 0 5
627 1 6
628 2 7
629 3 8
630 4 9
631 >>> restored_bar.equals(original_df[['bar']])
632 True
633
634 The function uses `kwargs` that are passed directly to the engine.
635 In the following example, we use the `filters` argument of the pyarrow
636 engine to filter the rows of the DataFrame.
637
638 Since `pyarrow` is the default engine, we can omit the `engine` argument.
639 Note that the `filters` argument is implemented by the `pyarrow` engine,
640 which can benefit from multithreading and also potentially be more
641 economical in terms of memory.
642
643 >>> sel = [("foo", ">", 2)]
644 >>> restored_part = pd.read_parquet(BytesIO(df_parquet_bytes), filters=sel)
645 >>> restored_part
646 foo bar
647 0 3 8
648 1 4 9
649 """
650
651 impl = get_engine(engine)
652
653 if use_nullable_dtypes is not lib.no_default:
654 msg = (
655 "The argument 'use_nullable_dtypes' is deprecated and will be removed "
656 "in a future version."
657 )
658 if use_nullable_dtypes is True:
659 msg += (
660 "Use dtype_backend='numpy_nullable' instead of use_nullable_dtype=True."
661 )
662 warnings.warn(msg, FutureWarning, stacklevel=find_stack_level())
663 else:
664 use_nullable_dtypes = False
665 check_dtype_backend(dtype_backend)
666
667 return impl.read(
668 path,
669 columns=columns,
670 filters=filters,
671 storage_options=storage_options,
672 use_nullable_dtypes=use_nullable_dtypes,
673 dtype_backend=dtype_backend,
674 filesystem=filesystem,
675 **kwargs,
676 )