Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/io/parquet.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1""" parquet compat """ 

2from __future__ import annotations 

3 

4import io 

5import os 

6from typing import ( 

7 Any, 

8 Literal, 

9) 

10import warnings 

11from warnings import catch_warnings 

12 

13from pandas._libs import lib 

14from pandas._typing import ( 

15 DtypeBackend, 

16 FilePath, 

17 ReadBuffer, 

18 StorageOptions, 

19 WriteBuffer, 

20) 

21from pandas.compat._optional import import_optional_dependency 

22from pandas.errors import AbstractMethodError 

23from pandas.util._decorators import doc 

24from pandas.util._exceptions import find_stack_level 

25from pandas.util._validators import check_dtype_backend 

26 

27import pandas as pd 

28from pandas import ( 

29 DataFrame, 

30 get_option, 

31) 

32from pandas.core.shared_docs import _shared_docs 

33from pandas.util.version import Version 

34 

35from pandas.io.common import ( 

36 IOHandles, 

37 get_handle, 

38 is_fsspec_url, 

39 is_url, 

40 stringify_path, 

41) 

42 

43 

44def get_engine(engine: str) -> BaseImpl: 

45 """return our implementation""" 

46 if engine == "auto": 

47 engine = get_option("io.parquet.engine") 

48 

49 if engine == "auto": 

50 # try engines in this order 

51 engine_classes = [PyArrowImpl, FastParquetImpl] 

52 

53 error_msgs = "" 

54 for engine_class in engine_classes: 

55 try: 

56 return engine_class() 

57 except ImportError as err: 

58 error_msgs += "\n - " + str(err) 

59 

60 raise ImportError( 

61 "Unable to find a usable engine; " 

62 "tried using: 'pyarrow', 'fastparquet'.\n" 

63 "A suitable version of " 

64 "pyarrow or fastparquet is required for parquet " 

65 "support.\n" 

66 "Trying to import the above resulted in these errors:" 

67 f"{error_msgs}" 

68 ) 

69 

70 if engine == "pyarrow": 

71 return PyArrowImpl() 

72 elif engine == "fastparquet": 

73 return FastParquetImpl() 

74 

75 raise ValueError("engine must be one of 'pyarrow', 'fastparquet'") 

76 

77 

78def _get_path_or_handle( 

79 path: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], 

80 fs: Any, 

81 storage_options: StorageOptions = None, 

82 mode: str = "rb", 

83 is_dir: bool = False, 

84) -> tuple[ 

85 FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], IOHandles[bytes] | None, Any 

86]: 

87 """File handling for PyArrow.""" 

88 path_or_handle = stringify_path(path) 

89 if is_fsspec_url(path_or_handle) and fs is None: 

90 fsspec = import_optional_dependency("fsspec") 

91 

92 fs, path_or_handle = fsspec.core.url_to_fs( 

93 path_or_handle, **(storage_options or {}) 

94 ) 

95 elif storage_options and (not is_url(path_or_handle) or mode != "rb"): 

96 # can't write to a remote url 

97 # without making use of fsspec at the moment 

98 raise ValueError("storage_options passed with buffer, or non-supported URL") 

99 

100 handles = None 

101 if ( 

102 not fs 

103 and not is_dir 

104 and isinstance(path_or_handle, str) 

105 and not os.path.isdir(path_or_handle) 

106 ): 

107 # use get_handle only when we are very certain that it is not a directory 

108 # fsspec resources can also point to directories 

109 # this branch is used for example when reading from non-fsspec URLs 

110 handles = get_handle( 

111 path_or_handle, mode, is_text=False, storage_options=storage_options 

112 ) 

113 fs = None 

114 path_or_handle = handles.handle 

115 return path_or_handle, handles, fs 

116 

117 

118class BaseImpl: 

119 @staticmethod 

120 def validate_dataframe(df: DataFrame) -> None: 

121 if not isinstance(df, DataFrame): 

122 raise ValueError("to_parquet only supports IO with DataFrames") 

123 

124 def write(self, df: DataFrame, path, compression, **kwargs): 

125 raise AbstractMethodError(self) 

126 

127 def read(self, path, columns=None, **kwargs) -> DataFrame: 

128 raise AbstractMethodError(self) 

129 

130 

131class PyArrowImpl(BaseImpl): 

132 def __init__(self) -> None: 

133 import_optional_dependency( 

134 "pyarrow", extra="pyarrow is required for parquet support." 

135 ) 

136 import pyarrow.parquet 

137 

138 # import utils to register the pyarrow extension types 

139 import pandas.core.arrays.arrow.extension_types # pyright: ignore # noqa:F401 

140 

141 self.api = pyarrow 

142 

143 def write( 

144 self, 

145 df: DataFrame, 

146 path: FilePath | WriteBuffer[bytes], 

147 compression: str | None = "snappy", 

148 index: bool | None = None, 

149 storage_options: StorageOptions = None, 

150 partition_cols: list[str] | None = None, 

151 **kwargs, 

152 ) -> None: 

153 self.validate_dataframe(df) 

154 

155 from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)} 

156 if index is not None: 

157 from_pandas_kwargs["preserve_index"] = index 

158 

159 table = self.api.Table.from_pandas(df, **from_pandas_kwargs) 

160 

161 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( 

162 path, 

163 kwargs.pop("filesystem", None), 

164 storage_options=storage_options, 

165 mode="wb", 

166 is_dir=partition_cols is not None, 

167 ) 

168 if ( 

169 isinstance(path_or_handle, io.BufferedWriter) 

170 and hasattr(path_or_handle, "name") 

171 and isinstance(path_or_handle.name, (str, bytes)) 

172 ): 

173 path_or_handle = path_or_handle.name 

174 if isinstance(path_or_handle, bytes): 

175 path_or_handle = path_or_handle.decode() 

176 

177 try: 

178 if partition_cols is not None: 

179 # writes to multiple files under the given path 

180 self.api.parquet.write_to_dataset( 

181 table, 

182 path_or_handle, 

183 compression=compression, 

184 partition_cols=partition_cols, 

185 **kwargs, 

186 ) 

187 else: 

188 # write to single output file 

189 self.api.parquet.write_table( 

190 table, path_or_handle, compression=compression, **kwargs 

191 ) 

192 finally: 

193 if handles is not None: 

194 handles.close() 

195 

196 def read( 

197 self, 

198 path, 

199 columns=None, 

200 use_nullable_dtypes: bool = False, 

201 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, 

202 storage_options: StorageOptions = None, 

203 **kwargs, 

204 ) -> DataFrame: 

205 kwargs["use_pandas_metadata"] = True 

206 

207 to_pandas_kwargs = {} 

208 if dtype_backend == "numpy_nullable": 

209 from pandas.io._util import _arrow_dtype_mapping 

210 

211 mapping = _arrow_dtype_mapping() 

212 to_pandas_kwargs["types_mapper"] = mapping.get 

213 elif dtype_backend == "pyarrow": 

214 to_pandas_kwargs["types_mapper"] = pd.ArrowDtype # type: ignore[assignment] # noqa 

215 

216 manager = get_option("mode.data_manager") 

217 if manager == "array": 

218 to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment] 

219 

220 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( 

221 path, 

222 kwargs.pop("filesystem", None), 

223 storage_options=storage_options, 

224 mode="rb", 

225 ) 

226 try: 

227 pa_table = self.api.parquet.read_table( 

228 path_or_handle, columns=columns, **kwargs 

229 ) 

230 result = pa_table.to_pandas(**to_pandas_kwargs) 

231 

232 if manager == "array": 

233 result = result._as_manager("array", copy=False) 

234 return result 

235 finally: 

236 if handles is not None: 

237 handles.close() 

238 

239 

240class FastParquetImpl(BaseImpl): 

241 def __init__(self) -> None: 

242 # since pandas is a dependency of fastparquet 

243 # we need to import on first use 

244 fastparquet = import_optional_dependency( 

245 "fastparquet", extra="fastparquet is required for parquet support." 

246 ) 

247 self.api = fastparquet 

248 

249 def write( 

250 self, 

251 df: DataFrame, 

252 path, 

253 compression: Literal["snappy", "gzip", "brotli"] | None = "snappy", 

254 index=None, 

255 partition_cols=None, 

256 storage_options: StorageOptions = None, 

257 **kwargs, 

258 ) -> None: 

259 self.validate_dataframe(df) 

260 

261 if "partition_on" in kwargs and partition_cols is not None: 

262 raise ValueError( 

263 "Cannot use both partition_on and " 

264 "partition_cols. Use partition_cols for partitioning data" 

265 ) 

266 if "partition_on" in kwargs: 

267 partition_cols = kwargs.pop("partition_on") 

268 

269 if partition_cols is not None: 

270 kwargs["file_scheme"] = "hive" 

271 

272 # cannot use get_handle as write() does not accept file buffers 

273 path = stringify_path(path) 

274 if is_fsspec_url(path): 

275 fsspec = import_optional_dependency("fsspec") 

276 

277 # if filesystem is provided by fsspec, file must be opened in 'wb' mode. 

278 kwargs["open_with"] = lambda path, _: fsspec.open( 

279 path, "wb", **(storage_options or {}) 

280 ).open() 

281 elif storage_options: 

282 raise ValueError( 

283 "storage_options passed with file object or non-fsspec file path" 

284 ) 

285 

286 with catch_warnings(record=True): 

287 self.api.write( 

288 path, 

289 df, 

290 compression=compression, 

291 write_index=index, 

292 partition_on=partition_cols, 

293 **kwargs, 

294 ) 

295 

296 def read( 

297 self, path, columns=None, storage_options: StorageOptions = None, **kwargs 

298 ) -> DataFrame: 

299 parquet_kwargs: dict[str, Any] = {} 

300 use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False) 

301 dtype_backend = kwargs.pop("dtype_backend", lib.no_default) 

302 if Version(self.api.__version__) >= Version("0.7.1"): 

303 # We are disabling nullable dtypes for fastparquet pending discussion 

304 parquet_kwargs["pandas_nulls"] = False 

305 if use_nullable_dtypes: 

306 raise ValueError( 

307 "The 'use_nullable_dtypes' argument is not supported for the " 

308 "fastparquet engine" 

309 ) 

310 if dtype_backend is not lib.no_default: 

311 raise ValueError( 

312 "The 'dtype_backend' argument is not supported for the " 

313 "fastparquet engine" 

314 ) 

315 path = stringify_path(path) 

316 handles = None 

317 if is_fsspec_url(path): 

318 fsspec = import_optional_dependency("fsspec") 

319 

320 if Version(self.api.__version__) > Version("0.6.1"): 

321 parquet_kwargs["fs"] = fsspec.open( 

322 path, "rb", **(storage_options or {}) 

323 ).fs 

324 else: 

325 parquet_kwargs["open_with"] = lambda path, _: fsspec.open( 

326 path, "rb", **(storage_options or {}) 

327 ).open() 

328 elif isinstance(path, str) and not os.path.isdir(path): 

329 # use get_handle only when we are very certain that it is not a directory 

330 # fsspec resources can also point to directories 

331 # this branch is used for example when reading from non-fsspec URLs 

332 handles = get_handle( 

333 path, "rb", is_text=False, storage_options=storage_options 

334 ) 

335 path = handles.handle 

336 

337 try: 

338 parquet_file = self.api.ParquetFile(path, **parquet_kwargs) 

339 return parquet_file.to_pandas(columns=columns, **kwargs) 

340 finally: 

341 if handles is not None: 

342 handles.close() 

343 

344 

345@doc(storage_options=_shared_docs["storage_options"]) 

346def to_parquet( 

347 df: DataFrame, 

348 path: FilePath | WriteBuffer[bytes] | None = None, 

349 engine: str = "auto", 

350 compression: str | None = "snappy", 

351 index: bool | None = None, 

352 storage_options: StorageOptions = None, 

353 partition_cols: list[str] | None = None, 

354 **kwargs, 

355) -> bytes | None: 

356 """ 

357 Write a DataFrame to the parquet format. 

358 

359 Parameters 

360 ---------- 

361 df : DataFrame 

362 path : str, path object, file-like object, or None, default None 

363 String, path object (implementing ``os.PathLike[str]``), or file-like 

364 object implementing a binary ``write()`` function. If None, the result is 

365 returned as bytes. If a string, it will be used as Root Directory path 

366 when writing a partitioned dataset. The engine fastparquet does not 

367 accept file-like objects. 

368 

369 .. versionchanged:: 1.2.0 

370 

371 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto' 

372 Parquet library to use. If 'auto', then the option 

373 ``io.parquet.engine`` is used. The default ``io.parquet.engine`` 

374 behavior is to try 'pyarrow', falling back to 'fastparquet' if 

375 'pyarrow' is unavailable. 

376 compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}}, 

377 default 'snappy'. Name of the compression to use. Use ``None`` 

378 for no compression. The supported compression methods actually 

379 depend on which engine is used. For 'pyarrow', 'snappy', 'gzip', 

380 'brotli', 'lz4', 'zstd' are all supported. For 'fastparquet', 

381 only 'gzip' and 'snappy' are supported. 

382 index : bool, default None 

383 If ``True``, include the dataframe's index(es) in the file output. If 

384 ``False``, they will not be written to the file. 

385 If ``None``, similar to ``True`` the dataframe's index(es) 

386 will be saved. However, instead of being saved as values, 

387 the RangeIndex will be stored as a range in the metadata so it 

388 doesn't require much space and is faster. Other indexes will 

389 be included as columns in the file output. 

390 partition_cols : str or list, optional, default None 

391 Column names by which to partition the dataset. 

392 Columns are partitioned in the order they are given. 

393 Must be None if path is not a string. 

394 {storage_options} 

395 

396 .. versionadded:: 1.2.0 

397 

398 kwargs 

399 Additional keyword arguments passed to the engine 

400 

401 Returns 

402 ------- 

403 bytes if no path argument is provided else None 

404 """ 

405 if isinstance(partition_cols, str): 

406 partition_cols = [partition_cols] 

407 impl = get_engine(engine) 

408 

409 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path 

410 

411 impl.write( 

412 df, 

413 path_or_buf, 

414 compression=compression, 

415 index=index, 

416 partition_cols=partition_cols, 

417 storage_options=storage_options, 

418 **kwargs, 

419 ) 

420 

421 if path is None: 

422 assert isinstance(path_or_buf, io.BytesIO) 

423 return path_or_buf.getvalue() 

424 else: 

425 return None 

426 

427 

428@doc(storage_options=_shared_docs["storage_options"]) 

429def read_parquet( 

430 path: FilePath | ReadBuffer[bytes], 

431 engine: str = "auto", 

432 columns: list[str] | None = None, 

433 storage_options: StorageOptions = None, 

434 use_nullable_dtypes: bool | lib.NoDefault = lib.no_default, 

435 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default, 

436 **kwargs, 

437) -> DataFrame: 

438 """ 

439 Load a parquet object from the file path, returning a DataFrame. 

440 

441 Parameters 

442 ---------- 

443 path : str, path object or file-like object 

444 String, path object (implementing ``os.PathLike[str]``), or file-like 

445 object implementing a binary ``read()`` function. 

446 The string could be a URL. Valid URL schemes include http, ftp, s3, 

447 gs, and file. For file URLs, a host is expected. A local file could be: 

448 ``file://localhost/path/to/table.parquet``. 

449 A file URL can also be a path to a directory that contains multiple 

450 partitioned parquet files. Both pyarrow and fastparquet support 

451 paths to directories as well as file URLs. A directory path could be: 

452 ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``. 

453 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto' 

454 Parquet library to use. If 'auto', then the option 

455 ``io.parquet.engine`` is used. The default ``io.parquet.engine`` 

456 behavior is to try 'pyarrow', falling back to 'fastparquet' if 

457 'pyarrow' is unavailable. 

458 columns : list, default=None 

459 If not None, only these columns will be read from the file. 

460 

461 {storage_options} 

462 

463 .. versionadded:: 1.3.0 

464 

465 use_nullable_dtypes : bool, default False 

466 If True, use dtypes that use ``pd.NA`` as missing value indicator 

467 for the resulting DataFrame. (only applicable for the ``pyarrow`` 

468 engine) 

469 As new dtypes are added that support ``pd.NA`` in the future, the 

470 output with this option will change to use those dtypes. 

471 Note: this is an experimental option, and behaviour (e.g. additional 

472 support dtypes) may change without notice. 

473 

474 .. deprecated:: 2.0 

475 

476 dtype_backend : {{"numpy_nullable", "pyarrow"}}, defaults to NumPy backed DataFrames 

477 Which dtype_backend to use, e.g. whether a DataFrame should have NumPy 

478 arrays, nullable dtypes are used for all dtypes that have a nullable 

479 implementation when "numpy_nullable" is set, pyarrow is used for all 

480 dtypes if "pyarrow" is set. 

481 

482 The dtype_backends are still experimential. 

483 

484 .. versionadded:: 2.0 

485 

486 **kwargs 

487 Any additional kwargs are passed to the engine. 

488 

489 Returns 

490 ------- 

491 DataFrame 

492 """ 

493 impl = get_engine(engine) 

494 

495 if use_nullable_dtypes is not lib.no_default: 

496 msg = ( 

497 "The argument 'use_nullable_dtypes' is deprecated and will be removed " 

498 "in a future version." 

499 ) 

500 if use_nullable_dtypes is True: 

501 msg += ( 

502 "Use dtype_backend='numpy_nullable' instead of use_nullable_dtype=True." 

503 ) 

504 warnings.warn(msg, FutureWarning, stacklevel=find_stack_level()) 

505 else: 

506 use_nullable_dtypes = False 

507 check_dtype_backend(dtype_backend) 

508 

509 return impl.read( 

510 path, 

511 columns=columns, 

512 storage_options=storage_options, 

513 use_nullable_dtypes=use_nullable_dtypes, 

514 dtype_backend=dtype_backend, 

515 **kwargs, 

516 )