1""" parquet compat """
2from __future__ import annotations
3
4import io
5import os
6from typing import (
7 Any,
8 Literal,
9)
10import warnings
11from warnings import catch_warnings
12
13from pandas._libs import lib
14from pandas._typing import (
15 DtypeBackend,
16 FilePath,
17 ReadBuffer,
18 StorageOptions,
19 WriteBuffer,
20)
21from pandas.compat._optional import import_optional_dependency
22from pandas.errors import AbstractMethodError
23from pandas.util._decorators import doc
24from pandas.util._exceptions import find_stack_level
25from pandas.util._validators import check_dtype_backend
26
27import pandas as pd
28from pandas import (
29 DataFrame,
30 get_option,
31)
32from pandas.core.shared_docs import _shared_docs
33from pandas.util.version import Version
34
35from pandas.io.common import (
36 IOHandles,
37 get_handle,
38 is_fsspec_url,
39 is_url,
40 stringify_path,
41)
42
43
44def get_engine(engine: str) -> BaseImpl:
45 """return our implementation"""
46 if engine == "auto":
47 engine = get_option("io.parquet.engine")
48
49 if engine == "auto":
50 # try engines in this order
51 engine_classes = [PyArrowImpl, FastParquetImpl]
52
53 error_msgs = ""
54 for engine_class in engine_classes:
55 try:
56 return engine_class()
57 except ImportError as err:
58 error_msgs += "\n - " + str(err)
59
60 raise ImportError(
61 "Unable to find a usable engine; "
62 "tried using: 'pyarrow', 'fastparquet'.\n"
63 "A suitable version of "
64 "pyarrow or fastparquet is required for parquet "
65 "support.\n"
66 "Trying to import the above resulted in these errors:"
67 f"{error_msgs}"
68 )
69
70 if engine == "pyarrow":
71 return PyArrowImpl()
72 elif engine == "fastparquet":
73 return FastParquetImpl()
74
75 raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
76
77
78def _get_path_or_handle(
79 path: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],
80 fs: Any,
81 storage_options: StorageOptions = None,
82 mode: str = "rb",
83 is_dir: bool = False,
84) -> tuple[
85 FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], IOHandles[bytes] | None, Any
86]:
87 """File handling for PyArrow."""
88 path_or_handle = stringify_path(path)
89 if is_fsspec_url(path_or_handle) and fs is None:
90 fsspec = import_optional_dependency("fsspec")
91
92 fs, path_or_handle = fsspec.core.url_to_fs(
93 path_or_handle, **(storage_options or {})
94 )
95 elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
96 # can't write to a remote url
97 # without making use of fsspec at the moment
98 raise ValueError("storage_options passed with buffer, or non-supported URL")
99
100 handles = None
101 if (
102 not fs
103 and not is_dir
104 and isinstance(path_or_handle, str)
105 and not os.path.isdir(path_or_handle)
106 ):
107 # use get_handle only when we are very certain that it is not a directory
108 # fsspec resources can also point to directories
109 # this branch is used for example when reading from non-fsspec URLs
110 handles = get_handle(
111 path_or_handle, mode, is_text=False, storage_options=storage_options
112 )
113 fs = None
114 path_or_handle = handles.handle
115 return path_or_handle, handles, fs
116
117
118class BaseImpl:
119 @staticmethod
120 def validate_dataframe(df: DataFrame) -> None:
121 if not isinstance(df, DataFrame):
122 raise ValueError("to_parquet only supports IO with DataFrames")
123
124 def write(self, df: DataFrame, path, compression, **kwargs):
125 raise AbstractMethodError(self)
126
127 def read(self, path, columns=None, **kwargs) -> DataFrame:
128 raise AbstractMethodError(self)
129
130
131class PyArrowImpl(BaseImpl):
132 def __init__(self) -> None:
133 import_optional_dependency(
134 "pyarrow", extra="pyarrow is required for parquet support."
135 )
136 import pyarrow.parquet
137
138 # import utils to register the pyarrow extension types
139 import pandas.core.arrays.arrow.extension_types # pyright: ignore # noqa:F401
140
141 self.api = pyarrow
142
143 def write(
144 self,
145 df: DataFrame,
146 path: FilePath | WriteBuffer[bytes],
147 compression: str | None = "snappy",
148 index: bool | None = None,
149 storage_options: StorageOptions = None,
150 partition_cols: list[str] | None = None,
151 **kwargs,
152 ) -> None:
153 self.validate_dataframe(df)
154
155 from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)}
156 if index is not None:
157 from_pandas_kwargs["preserve_index"] = index
158
159 table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
160
161 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
162 path,
163 kwargs.pop("filesystem", None),
164 storage_options=storage_options,
165 mode="wb",
166 is_dir=partition_cols is not None,
167 )
168 if (
169 isinstance(path_or_handle, io.BufferedWriter)
170 and hasattr(path_or_handle, "name")
171 and isinstance(path_or_handle.name, (str, bytes))
172 ):
173 path_or_handle = path_or_handle.name
174 if isinstance(path_or_handle, bytes):
175 path_or_handle = path_or_handle.decode()
176
177 try:
178 if partition_cols is not None:
179 # writes to multiple files under the given path
180 self.api.parquet.write_to_dataset(
181 table,
182 path_or_handle,
183 compression=compression,
184 partition_cols=partition_cols,
185 **kwargs,
186 )
187 else:
188 # write to single output file
189 self.api.parquet.write_table(
190 table, path_or_handle, compression=compression, **kwargs
191 )
192 finally:
193 if handles is not None:
194 handles.close()
195
196 def read(
197 self,
198 path,
199 columns=None,
200 use_nullable_dtypes: bool = False,
201 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
202 storage_options: StorageOptions = None,
203 **kwargs,
204 ) -> DataFrame:
205 kwargs["use_pandas_metadata"] = True
206
207 to_pandas_kwargs = {}
208 if dtype_backend == "numpy_nullable":
209 from pandas.io._util import _arrow_dtype_mapping
210
211 mapping = _arrow_dtype_mapping()
212 to_pandas_kwargs["types_mapper"] = mapping.get
213 elif dtype_backend == "pyarrow":
214 to_pandas_kwargs["types_mapper"] = pd.ArrowDtype # type: ignore[assignment] # noqa
215
216 manager = get_option("mode.data_manager")
217 if manager == "array":
218 to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
219
220 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
221 path,
222 kwargs.pop("filesystem", None),
223 storage_options=storage_options,
224 mode="rb",
225 )
226 try:
227 pa_table = self.api.parquet.read_table(
228 path_or_handle, columns=columns, **kwargs
229 )
230 result = pa_table.to_pandas(**to_pandas_kwargs)
231
232 if manager == "array":
233 result = result._as_manager("array", copy=False)
234 return result
235 finally:
236 if handles is not None:
237 handles.close()
238
239
240class FastParquetImpl(BaseImpl):
241 def __init__(self) -> None:
242 # since pandas is a dependency of fastparquet
243 # we need to import on first use
244 fastparquet = import_optional_dependency(
245 "fastparquet", extra="fastparquet is required for parquet support."
246 )
247 self.api = fastparquet
248
249 def write(
250 self,
251 df: DataFrame,
252 path,
253 compression: Literal["snappy", "gzip", "brotli"] | None = "snappy",
254 index=None,
255 partition_cols=None,
256 storage_options: StorageOptions = None,
257 **kwargs,
258 ) -> None:
259 self.validate_dataframe(df)
260
261 if "partition_on" in kwargs and partition_cols is not None:
262 raise ValueError(
263 "Cannot use both partition_on and "
264 "partition_cols. Use partition_cols for partitioning data"
265 )
266 if "partition_on" in kwargs:
267 partition_cols = kwargs.pop("partition_on")
268
269 if partition_cols is not None:
270 kwargs["file_scheme"] = "hive"
271
272 # cannot use get_handle as write() does not accept file buffers
273 path = stringify_path(path)
274 if is_fsspec_url(path):
275 fsspec = import_optional_dependency("fsspec")
276
277 # if filesystem is provided by fsspec, file must be opened in 'wb' mode.
278 kwargs["open_with"] = lambda path, _: fsspec.open(
279 path, "wb", **(storage_options or {})
280 ).open()
281 elif storage_options:
282 raise ValueError(
283 "storage_options passed with file object or non-fsspec file path"
284 )
285
286 with catch_warnings(record=True):
287 self.api.write(
288 path,
289 df,
290 compression=compression,
291 write_index=index,
292 partition_on=partition_cols,
293 **kwargs,
294 )
295
296 def read(
297 self, path, columns=None, storage_options: StorageOptions = None, **kwargs
298 ) -> DataFrame:
299 parquet_kwargs: dict[str, Any] = {}
300 use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
301 dtype_backend = kwargs.pop("dtype_backend", lib.no_default)
302 if Version(self.api.__version__) >= Version("0.7.1"):
303 # We are disabling nullable dtypes for fastparquet pending discussion
304 parquet_kwargs["pandas_nulls"] = False
305 if use_nullable_dtypes:
306 raise ValueError(
307 "The 'use_nullable_dtypes' argument is not supported for the "
308 "fastparquet engine"
309 )
310 if dtype_backend is not lib.no_default:
311 raise ValueError(
312 "The 'dtype_backend' argument is not supported for the "
313 "fastparquet engine"
314 )
315 path = stringify_path(path)
316 handles = None
317 if is_fsspec_url(path):
318 fsspec = import_optional_dependency("fsspec")
319
320 if Version(self.api.__version__) > Version("0.6.1"):
321 parquet_kwargs["fs"] = fsspec.open(
322 path, "rb", **(storage_options or {})
323 ).fs
324 else:
325 parquet_kwargs["open_with"] = lambda path, _: fsspec.open(
326 path, "rb", **(storage_options or {})
327 ).open()
328 elif isinstance(path, str) and not os.path.isdir(path):
329 # use get_handle only when we are very certain that it is not a directory
330 # fsspec resources can also point to directories
331 # this branch is used for example when reading from non-fsspec URLs
332 handles = get_handle(
333 path, "rb", is_text=False, storage_options=storage_options
334 )
335 path = handles.handle
336
337 try:
338 parquet_file = self.api.ParquetFile(path, **parquet_kwargs)
339 return parquet_file.to_pandas(columns=columns, **kwargs)
340 finally:
341 if handles is not None:
342 handles.close()
343
344
345@doc(storage_options=_shared_docs["storage_options"])
346def to_parquet(
347 df: DataFrame,
348 path: FilePath | WriteBuffer[bytes] | None = None,
349 engine: str = "auto",
350 compression: str | None = "snappy",
351 index: bool | None = None,
352 storage_options: StorageOptions = None,
353 partition_cols: list[str] | None = None,
354 **kwargs,
355) -> bytes | None:
356 """
357 Write a DataFrame to the parquet format.
358
359 Parameters
360 ----------
361 df : DataFrame
362 path : str, path object, file-like object, or None, default None
363 String, path object (implementing ``os.PathLike[str]``), or file-like
364 object implementing a binary ``write()`` function. If None, the result is
365 returned as bytes. If a string, it will be used as Root Directory path
366 when writing a partitioned dataset. The engine fastparquet does not
367 accept file-like objects.
368
369 .. versionchanged:: 1.2.0
370
371 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
372 Parquet library to use. If 'auto', then the option
373 ``io.parquet.engine`` is used. The default ``io.parquet.engine``
374 behavior is to try 'pyarrow', falling back to 'fastparquet' if
375 'pyarrow' is unavailable.
376 compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},
377 default 'snappy'. Name of the compression to use. Use ``None``
378 for no compression. The supported compression methods actually
379 depend on which engine is used. For 'pyarrow', 'snappy', 'gzip',
380 'brotli', 'lz4', 'zstd' are all supported. For 'fastparquet',
381 only 'gzip' and 'snappy' are supported.
382 index : bool, default None
383 If ``True``, include the dataframe's index(es) in the file output. If
384 ``False``, they will not be written to the file.
385 If ``None``, similar to ``True`` the dataframe's index(es)
386 will be saved. However, instead of being saved as values,
387 the RangeIndex will be stored as a range in the metadata so it
388 doesn't require much space and is faster. Other indexes will
389 be included as columns in the file output.
390 partition_cols : str or list, optional, default None
391 Column names by which to partition the dataset.
392 Columns are partitioned in the order they are given.
393 Must be None if path is not a string.
394 {storage_options}
395
396 .. versionadded:: 1.2.0
397
398 kwargs
399 Additional keyword arguments passed to the engine
400
401 Returns
402 -------
403 bytes if no path argument is provided else None
404 """
405 if isinstance(partition_cols, str):
406 partition_cols = [partition_cols]
407 impl = get_engine(engine)
408
409 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
410
411 impl.write(
412 df,
413 path_or_buf,
414 compression=compression,
415 index=index,
416 partition_cols=partition_cols,
417 storage_options=storage_options,
418 **kwargs,
419 )
420
421 if path is None:
422 assert isinstance(path_or_buf, io.BytesIO)
423 return path_or_buf.getvalue()
424 else:
425 return None
426
427
428@doc(storage_options=_shared_docs["storage_options"])
429def read_parquet(
430 path: FilePath | ReadBuffer[bytes],
431 engine: str = "auto",
432 columns: list[str] | None = None,
433 storage_options: StorageOptions = None,
434 use_nullable_dtypes: bool | lib.NoDefault = lib.no_default,
435 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
436 **kwargs,
437) -> DataFrame:
438 """
439 Load a parquet object from the file path, returning a DataFrame.
440
441 Parameters
442 ----------
443 path : str, path object or file-like object
444 String, path object (implementing ``os.PathLike[str]``), or file-like
445 object implementing a binary ``read()`` function.
446 The string could be a URL. Valid URL schemes include http, ftp, s3,
447 gs, and file. For file URLs, a host is expected. A local file could be:
448 ``file://localhost/path/to/table.parquet``.
449 A file URL can also be a path to a directory that contains multiple
450 partitioned parquet files. Both pyarrow and fastparquet support
451 paths to directories as well as file URLs. A directory path could be:
452 ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
453 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
454 Parquet library to use. If 'auto', then the option
455 ``io.parquet.engine`` is used. The default ``io.parquet.engine``
456 behavior is to try 'pyarrow', falling back to 'fastparquet' if
457 'pyarrow' is unavailable.
458 columns : list, default=None
459 If not None, only these columns will be read from the file.
460
461 {storage_options}
462
463 .. versionadded:: 1.3.0
464
465 use_nullable_dtypes : bool, default False
466 If True, use dtypes that use ``pd.NA`` as missing value indicator
467 for the resulting DataFrame. (only applicable for the ``pyarrow``
468 engine)
469 As new dtypes are added that support ``pd.NA`` in the future, the
470 output with this option will change to use those dtypes.
471 Note: this is an experimental option, and behaviour (e.g. additional
472 support dtypes) may change without notice.
473
474 .. deprecated:: 2.0
475
476 dtype_backend : {{"numpy_nullable", "pyarrow"}}, defaults to NumPy backed DataFrames
477 Which dtype_backend to use, e.g. whether a DataFrame should have NumPy
478 arrays, nullable dtypes are used for all dtypes that have a nullable
479 implementation when "numpy_nullable" is set, pyarrow is used for all
480 dtypes if "pyarrow" is set.
481
482 The dtype_backends are still experimential.
483
484 .. versionadded:: 2.0
485
486 **kwargs
487 Any additional kwargs are passed to the engine.
488
489 Returns
490 -------
491 DataFrame
492 """
493 impl = get_engine(engine)
494
495 if use_nullable_dtypes is not lib.no_default:
496 msg = (
497 "The argument 'use_nullable_dtypes' is deprecated and will be removed "
498 "in a future version."
499 )
500 if use_nullable_dtypes is True:
501 msg += (
502 "Use dtype_backend='numpy_nullable' instead of use_nullable_dtype=True."
503 )
504 warnings.warn(msg, FutureWarning, stacklevel=find_stack_level())
505 else:
506 use_nullable_dtypes = False
507 check_dtype_backend(dtype_backend)
508
509 return impl.read(
510 path,
511 columns=columns,
512 storage_options=storage_options,
513 use_nullable_dtypes=use_nullable_dtypes,
514 dtype_backend=dtype_backend,
515 **kwargs,
516 )