Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/_pandas_helpers.py: 18%
369 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1# Copyright 2019 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Shared helper functions for connecting BigQuery and pandas."""
17import concurrent.futures
18from datetime import datetime
19import functools
20from itertools import islice
21import logging
22import queue
23import warnings
24from typing import Any, Union
26from packaging import version
28from google.cloud.bigquery import _helpers
29from google.cloud.bigquery import schema
31try:
32 import pandas # type: ignore
34 pandas_import_exception = None
35except ImportError as exc: # pragma: NO COVER
36 pandas = None
37 pandas_import_exception = exc
38else:
39 import numpy
41try:
42 import db_dtypes # type: ignore
44 date_dtype_name = db_dtypes.DateDtype.name
45 time_dtype_name = db_dtypes.TimeDtype.name
46 db_dtypes_import_exception = None
47except ImportError as exc: # pragma: NO COVER
48 db_dtypes = None
49 db_dtypes_import_exception = exc
50 date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype
52pyarrow = _helpers.PYARROW_VERSIONS.try_import()
54try:
55 # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array`
56 from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore
57except ImportError: # pragma: NO COVER
58 # No shapely, use NoneType for _BaseGeometry as a placeholder.
59 _BaseGeometry = type(None)
60else:
61 if pandas is not None: # pragma: NO COVER
63 def _to_wkb():
64 from shapely import wkb # type: ignore
66 write = wkb.dumps
67 notnull = pandas.notnull
69 def _to_wkb(v):
70 return write(v) if notnull(v) else v
72 return _to_wkb
74 _to_wkb = _to_wkb()
76try:
77 from google.cloud.bigquery_storage import ArrowSerializationOptions
78except ImportError:
79 _ARROW_COMPRESSION_SUPPORT = False
80else:
81 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
82 _ARROW_COMPRESSION_SUPPORT = True
84_LOGGER = logging.getLogger(__name__)
86_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds.
88_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads
90_NO_PANDAS_ERROR = "Please install the 'pandas' package to use this function."
91_NO_DB_TYPES_ERROR = "Please install the 'db-dtypes' package to use this function."
93_PANDAS_DTYPE_TO_BQ = {
94 "bool": "BOOLEAN",
95 "datetime64[ns, UTC]": "TIMESTAMP",
96 "datetime64[ns]": "DATETIME",
97 "float32": "FLOAT",
98 "float64": "FLOAT",
99 "int8": "INTEGER",
100 "int16": "INTEGER",
101 "int32": "INTEGER",
102 "int64": "INTEGER",
103 "uint8": "INTEGER",
104 "uint16": "INTEGER",
105 "uint32": "INTEGER",
106 "geometry": "GEOGRAPHY",
107 date_dtype_name: "DATE",
108 time_dtype_name: "TIME",
109}
112class _DownloadState(object):
113 """Flag to indicate that a thread should exit early."""
115 def __init__(self):
116 # No need for a lock because reading/replacing a variable is defined to
117 # be an atomic operation in the Python language definition (enforced by
118 # the global interpreter lock).
119 self.done = False
122def pyarrow_datetime():
123 return pyarrow.timestamp("us", tz=None)
126def pyarrow_numeric():
127 return pyarrow.decimal128(38, 9)
130def pyarrow_bignumeric():
131 # 77th digit is partial.
132 # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
133 return pyarrow.decimal256(76, 38)
136def pyarrow_time():
137 return pyarrow.time64("us")
140def pyarrow_timestamp():
141 return pyarrow.timestamp("us", tz="UTC")
144if pyarrow:
145 # This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
146 # When modifying it be sure to update it there as well.
147 BQ_TO_ARROW_SCALARS = {
148 "BOOL": pyarrow.bool_,
149 "BOOLEAN": pyarrow.bool_,
150 "BYTES": pyarrow.binary,
151 "DATE": pyarrow.date32,
152 "DATETIME": pyarrow_datetime,
153 "FLOAT": pyarrow.float64,
154 "FLOAT64": pyarrow.float64,
155 "GEOGRAPHY": pyarrow.string,
156 "INT64": pyarrow.int64,
157 "INTEGER": pyarrow.int64,
158 "NUMERIC": pyarrow_numeric,
159 "STRING": pyarrow.string,
160 "TIME": pyarrow_time,
161 "TIMESTAMP": pyarrow_timestamp,
162 }
163 ARROW_SCALAR_IDS_TO_BQ = {
164 # https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
165 pyarrow.bool_().id: "BOOL",
166 pyarrow.int8().id: "INT64",
167 pyarrow.int16().id: "INT64",
168 pyarrow.int32().id: "INT64",
169 pyarrow.int64().id: "INT64",
170 pyarrow.uint8().id: "INT64",
171 pyarrow.uint16().id: "INT64",
172 pyarrow.uint32().id: "INT64",
173 pyarrow.uint64().id: "INT64",
174 pyarrow.float16().id: "FLOAT64",
175 pyarrow.float32().id: "FLOAT64",
176 pyarrow.float64().id: "FLOAT64",
177 pyarrow.time32("ms").id: "TIME",
178 pyarrow.time64("ns").id: "TIME",
179 pyarrow.timestamp("ns").id: "TIMESTAMP",
180 pyarrow.date32().id: "DATE",
181 pyarrow.date64().id: "DATETIME", # because millisecond resolution
182 pyarrow.binary().id: "BYTES",
183 pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
184 # The exact scale and precision don't matter, see below.
185 pyarrow.decimal128(38, scale=9).id: "NUMERIC",
186 }
188 if version.parse(pyarrow.__version__) >= version.parse("3.0.0"):
189 BQ_TO_ARROW_SCALARS["BIGNUMERIC"] = pyarrow_bignumeric
190 # The exact decimal's scale and precision are not important, as only
191 # the type ID matters, and it's the same for all decimal256 instances.
192 ARROW_SCALAR_IDS_TO_BQ[pyarrow.decimal256(76, scale=38).id] = "BIGNUMERIC"
193 _BIGNUMERIC_SUPPORT = True
194 else:
195 _BIGNUMERIC_SUPPORT = False # pragma: NO COVER
197else: # pragma: NO COVER
198 BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
199 ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
200 _BIGNUMERIC_SUPPORT = False # pragma: NO COVER
203BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = {
204 "GEOGRAPHY": {
205 b"ARROW:extension:name": b"google:sqlType:geography",
206 b"ARROW:extension:metadata": b'{"encoding": "WKT"}',
207 },
208 "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"},
209}
212def bq_to_arrow_struct_data_type(field):
213 arrow_fields = []
214 for subfield in field.fields:
215 arrow_subfield = bq_to_arrow_field(subfield)
216 if arrow_subfield:
217 arrow_fields.append(arrow_subfield)
218 else:
219 # Could not determine a subfield type. Fallback to type
220 # inference.
221 return None
222 return pyarrow.struct(arrow_fields)
225def bq_to_arrow_data_type(field):
226 """Return the Arrow data type, corresponding to a given BigQuery column.
228 Returns:
229 None: if default Arrow type inspection should be used.
230 """
231 if field.mode is not None and field.mode.upper() == "REPEATED":
232 inner_type = bq_to_arrow_data_type(
233 schema.SchemaField(field.name, field.field_type, fields=field.fields)
234 )
235 if inner_type:
236 return pyarrow.list_(inner_type)
237 return None
239 field_type_upper = field.field_type.upper() if field.field_type else ""
240 if field_type_upper in schema._STRUCT_TYPES:
241 return bq_to_arrow_struct_data_type(field)
243 data_type_constructor = BQ_TO_ARROW_SCALARS.get(field_type_upper)
244 if data_type_constructor is None:
245 return None
246 return data_type_constructor()
249def bq_to_arrow_field(bq_field, array_type=None):
250 """Return the Arrow field, corresponding to a given BigQuery column.
252 Returns:
253 None: if the Arrow type cannot be determined.
254 """
255 arrow_type = bq_to_arrow_data_type(bq_field)
256 if arrow_type is not None:
257 if array_type is not None:
258 arrow_type = array_type # For GEOGRAPHY, at least initially
259 is_nullable = bq_field.mode.upper() == "NULLABLE"
260 metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get(
261 bq_field.field_type.upper() if bq_field.field_type else ""
262 )
263 return pyarrow.field(
264 bq_field.name, arrow_type, nullable=is_nullable, metadata=metadata
265 )
267 warnings.warn("Unable to determine type for field '{}'.".format(bq_field.name))
268 return None
271def bq_to_arrow_schema(bq_schema):
272 """Return the Arrow schema, corresponding to a given BigQuery schema.
274 Returns:
275 None: if any Arrow type cannot be determined.
276 """
277 arrow_fields = []
278 for bq_field in bq_schema:
279 arrow_field = bq_to_arrow_field(bq_field)
280 if arrow_field is None:
281 # Auto-detect the schema if there is an unknown field type.
282 return None
283 arrow_fields.append(arrow_field)
284 return pyarrow.schema(arrow_fields)
287def default_types_mapper(
288 date_as_object: bool = False,
289 bool_dtype: Union[Any, None] = None,
290 int_dtype: Union[Any, None] = None,
291 float_dtype: Union[Any, None] = None,
292 string_dtype: Union[Any, None] = None,
293):
294 """Create a mapping from pyarrow types to pandas types.
296 This overrides the pandas defaults to use null-safe extension types where
297 available.
299 See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of
300 data types. See:
301 tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for
302 BigQuery to Arrow type mapping.
304 Note to google-cloud-bigquery developers: If you update the default dtypes,
305 also update the docs at docs/usage/pandas.rst.
306 """
308 def types_mapper(arrow_data_type):
309 if bool_dtype is not None and pyarrow.types.is_boolean(arrow_data_type):
310 return bool_dtype
312 elif int_dtype is not None and pyarrow.types.is_integer(arrow_data_type):
313 return int_dtype
315 elif float_dtype is not None and pyarrow.types.is_floating(arrow_data_type):
316 return float_dtype
318 elif string_dtype is not None and pyarrow.types.is_string(arrow_data_type):
319 return string_dtype
321 elif (
322 # If date_as_object is True, we know some DATE columns are
323 # out-of-bounds of what is supported by pandas.
324 not date_as_object
325 and pyarrow.types.is_date(arrow_data_type)
326 ):
327 return db_dtypes.DateDtype()
329 elif pyarrow.types.is_time(arrow_data_type):
330 return db_dtypes.TimeDtype()
332 return types_mapper
335def bq_to_arrow_array(series, bq_field):
336 if bq_field.field_type.upper() == "GEOGRAPHY":
337 arrow_type = None
338 first = _first_valid(series)
339 if first is not None:
340 if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry):
341 arrow_type = pyarrow.binary()
342 # Convert shapey geometry to WKB binary format:
343 series = series.apply(_to_wkb)
344 elif isinstance(first, bytes):
345 arrow_type = pyarrow.binary()
346 elif series.dtype.name == "geometry":
347 # We have a GeoSeries containing all nulls, convert it to a pandas series
348 series = pandas.Series(numpy.array(series))
350 if arrow_type is None:
351 arrow_type = bq_to_arrow_data_type(bq_field)
352 else:
353 arrow_type = bq_to_arrow_data_type(bq_field)
355 field_type_upper = bq_field.field_type.upper() if bq_field.field_type else ""
357 if bq_field.mode.upper() == "REPEATED":
358 return pyarrow.ListArray.from_pandas(series, type=arrow_type)
359 if field_type_upper in schema._STRUCT_TYPES:
360 return pyarrow.StructArray.from_pandas(series, type=arrow_type)
361 return pyarrow.Array.from_pandas(series, type=arrow_type)
364def get_column_or_index(dataframe, name):
365 """Return a column or index as a pandas series."""
366 if name in dataframe.columns:
367 return dataframe[name].reset_index(drop=True)
369 if isinstance(dataframe.index, pandas.MultiIndex):
370 if name in dataframe.index.names:
371 return (
372 dataframe.index.get_level_values(name)
373 .to_series()
374 .reset_index(drop=True)
375 )
376 else:
377 if name == dataframe.index.name:
378 return dataframe.index.to_series().reset_index(drop=True)
380 raise ValueError("column or index '{}' not found.".format(name))
383def list_columns_and_indexes(dataframe):
384 """Return all index and column names with dtypes.
386 Returns:
387 Sequence[Tuple[str, dtype]]:
388 Returns a sorted list of indexes and column names with
389 corresponding dtypes. If an index is missing a name or has the
390 same name as a column, the index is omitted.
391 """
392 column_names = frozenset(dataframe.columns)
393 columns_and_indexes = []
394 if isinstance(dataframe.index, pandas.MultiIndex):
395 for name in dataframe.index.names:
396 if name and name not in column_names:
397 values = dataframe.index.get_level_values(name)
398 columns_and_indexes.append((name, values.dtype))
399 else:
400 if dataframe.index.name and dataframe.index.name not in column_names:
401 columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype))
403 columns_and_indexes += zip(dataframe.columns, dataframe.dtypes)
404 return columns_and_indexes
407def _first_valid(series):
408 first_valid_index = series.first_valid_index()
409 if first_valid_index is not None:
410 return series.at[first_valid_index]
413def _first_array_valid(series):
414 """Return the first "meaningful" element from the array series.
416 Here, "meaningful" means the first non-None element in one of the arrays that can
417 be used for type detextion.
418 """
419 first_valid_index = series.first_valid_index()
420 if first_valid_index is None:
421 return None
423 valid_array = series.at[first_valid_index]
424 valid_item = next((item for item in valid_array if not pandas.isna(item)), None)
426 if valid_item is not None:
427 return valid_item
429 # Valid item is None because all items in the "valid" array are invalid. Try
430 # to find a true valid array manually.
431 for array in islice(series, first_valid_index + 1, None):
432 try:
433 array_iter = iter(array)
434 except TypeError:
435 continue # Not an array, apparently, e.g. None, thus skip.
436 valid_item = next((item for item in array_iter if not pandas.isna(item)), None)
437 if valid_item is not None:
438 break
440 return valid_item
443def dataframe_to_bq_schema(dataframe, bq_schema):
444 """Convert a pandas DataFrame schema to a BigQuery schema.
446 Args:
447 dataframe (pandas.DataFrame):
448 DataFrame for which the client determines the BigQuery schema.
449 bq_schema (Sequence[Union[ \
450 :class:`~google.cloud.bigquery.schema.SchemaField`, \
451 Mapping[str, Any] \
452 ]]):
453 A BigQuery schema. Use this argument to override the autodetected
454 type for some or all of the DataFrame columns.
456 Returns:
457 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
458 The automatically determined schema. Returns None if the type of
459 any column cannot be determined.
460 """
461 if bq_schema:
462 bq_schema = schema._to_schema_fields(bq_schema)
463 bq_schema_index = {field.name: field for field in bq_schema}
464 bq_schema_unused = set(bq_schema_index.keys())
465 else:
466 bq_schema_index = {}
467 bq_schema_unused = set()
469 bq_schema_out = []
470 unknown_type_fields = []
472 for column, dtype in list_columns_and_indexes(dataframe):
473 # Use provided type from schema, if present.
474 bq_field = bq_schema_index.get(column)
475 if bq_field:
476 bq_schema_out.append(bq_field)
477 bq_schema_unused.discard(bq_field.name)
478 continue
480 # Otherwise, try to automatically determine the type based on the
481 # pandas dtype.
482 bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
483 if bq_type is None:
484 sample_data = _first_valid(dataframe[column])
485 if (
486 isinstance(sample_data, _BaseGeometry)
487 and sample_data is not None # Paranoia
488 ):
489 bq_type = "GEOGRAPHY"
490 bq_field = schema.SchemaField(column, bq_type)
491 bq_schema_out.append(bq_field)
493 if bq_field.field_type is None:
494 unknown_type_fields.append(bq_field)
496 # Catch any schema mismatch. The developer explicitly asked to serialize a
497 # column, but it was not found.
498 if bq_schema_unused:
499 raise ValueError(
500 "bq_schema contains fields not present in dataframe: {}".format(
501 bq_schema_unused
502 )
503 )
505 # If schema detection was not successful for all columns, also try with
506 # pyarrow, if available.
507 if unknown_type_fields:
508 if not pyarrow:
509 msg = "Could not determine the type of columns: {}".format(
510 ", ".join(field.name for field in unknown_type_fields)
511 )
512 warnings.warn(msg)
513 return None # We cannot detect the schema in full.
515 # The augment_schema() helper itself will also issue unknown type
516 # warnings if detection still fails for any of the fields.
517 bq_schema_out = augment_schema(dataframe, bq_schema_out)
519 return tuple(bq_schema_out) if bq_schema_out else None
522def augment_schema(dataframe, current_bq_schema):
523 """Try to deduce the unknown field types and return an improved schema.
525 This function requires ``pyarrow`` to run. If all the missing types still
526 cannot be detected, ``None`` is returned. If all types are already known,
527 a shallow copy of the given schema is returned.
529 Args:
530 dataframe (pandas.DataFrame):
531 DataFrame for which some of the field types are still unknown.
532 current_bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
533 A BigQuery schema for ``dataframe``. The types of some or all of
534 the fields may be ``None``.
535 Returns:
536 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]
537 """
538 # pytype: disable=attribute-error
539 augmented_schema = []
540 unknown_type_fields = []
542 for field in current_bq_schema:
543 if field.field_type is not None:
544 augmented_schema.append(field)
545 continue
547 arrow_table = pyarrow.array(dataframe[field.name])
549 if pyarrow.types.is_list(arrow_table.type):
550 # `pyarrow.ListType`
551 detected_mode = "REPEATED"
552 detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id)
554 # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
555 # it to such datetimes, causing them to be recognized as TIMESTAMP type.
556 # We thus additionally check the actual data to see if we need to overrule
557 # that and choose DATETIME instead.
558 # Note that this should only be needed for datetime values inside a list,
559 # since scalar datetime values have a proper Pandas dtype that allows
560 # distinguishing between timezone-naive and timezone-aware values before
561 # even requiring the additional schema augment logic in this method.
562 if detected_type == "TIMESTAMP":
563 valid_item = _first_array_valid(dataframe[field.name])
564 if isinstance(valid_item, datetime) and valid_item.tzinfo is None:
565 detected_type = "DATETIME"
566 else:
567 detected_mode = field.mode
568 detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)
570 if detected_type is None:
571 unknown_type_fields.append(field)
572 continue
574 new_field = schema.SchemaField(
575 name=field.name,
576 field_type=detected_type,
577 mode=detected_mode,
578 description=field.description,
579 fields=field.fields,
580 )
581 augmented_schema.append(new_field)
583 if unknown_type_fields:
584 warnings.warn(
585 "Pyarrow could not determine the type of columns: {}.".format(
586 ", ".join(field.name for field in unknown_type_fields)
587 )
588 )
589 return None
591 return augmented_schema
592 # pytype: enable=attribute-error
595def dataframe_to_arrow(dataframe, bq_schema):
596 """Convert pandas dataframe to Arrow table, using BigQuery schema.
598 Args:
599 dataframe (pandas.DataFrame):
600 DataFrame to convert to Arrow table.
601 bq_schema (Sequence[Union[ \
602 :class:`~google.cloud.bigquery.schema.SchemaField`, \
603 Mapping[str, Any] \
604 ]]):
605 Desired BigQuery schema. The number of columns must match the
606 number of columns in the DataFrame.
608 Returns:
609 pyarrow.Table:
610 Table containing dataframe data, with schema derived from
611 BigQuery schema.
612 """
613 column_names = set(dataframe.columns)
614 column_and_index_names = set(
615 name for name, _ in list_columns_and_indexes(dataframe)
616 )
618 bq_schema = schema._to_schema_fields(bq_schema)
619 bq_field_names = set(field.name for field in bq_schema)
621 extra_fields = bq_field_names - column_and_index_names
622 if extra_fields:
623 raise ValueError(
624 "bq_schema contains fields not present in dataframe: {}".format(
625 extra_fields
626 )
627 )
629 # It's okay for indexes to be missing from bq_schema, but it's not okay to
630 # be missing columns.
631 missing_fields = column_names - bq_field_names
632 if missing_fields:
633 raise ValueError(
634 "bq_schema is missing fields from dataframe: {}".format(missing_fields)
635 )
637 arrow_arrays = []
638 arrow_names = []
639 arrow_fields = []
640 for bq_field in bq_schema:
641 arrow_names.append(bq_field.name)
642 arrow_arrays.append(
643 bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
644 )
645 arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type))
647 if all((field is not None for field in arrow_fields)):
648 return pyarrow.Table.from_arrays(
649 arrow_arrays, schema=pyarrow.schema(arrow_fields)
650 )
651 return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)
654def dataframe_to_parquet(
655 dataframe,
656 bq_schema,
657 filepath,
658 parquet_compression="SNAPPY",
659 parquet_use_compliant_nested_type=True,
660):
661 """Write dataframe as a Parquet file, according to the desired BQ schema.
663 This function requires the :mod:`pyarrow` package. Arrow is used as an
664 intermediate format.
666 Args:
667 dataframe (pandas.DataFrame):
668 DataFrame to convert to Parquet file.
669 bq_schema (Sequence[Union[ \
670 :class:`~google.cloud.bigquery.schema.SchemaField`, \
671 Mapping[str, Any] \
672 ]]):
673 Desired BigQuery schema. Number of columns must match number of
674 columns in the DataFrame.
675 filepath (str):
676 Path to write Parquet file to.
677 parquet_compression (Optional[str]):
678 The compression codec to use by the the ``pyarrow.parquet.write_table``
679 serializing method. Defaults to "SNAPPY".
680 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
681 parquet_use_compliant_nested_type (bool):
682 Whether the ``pyarrow.parquet.write_table`` serializing method should write
683 compliant Parquet nested type (lists). Defaults to ``True``.
684 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types
685 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
687 This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
688 """
689 pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
691 import pyarrow.parquet # type: ignore
693 kwargs = (
694 {"use_compliant_nested_type": parquet_use_compliant_nested_type}
695 if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
696 else {}
697 )
699 bq_schema = schema._to_schema_fields(bq_schema)
700 arrow_table = dataframe_to_arrow(dataframe, bq_schema)
701 pyarrow.parquet.write_table(
702 arrow_table,
703 filepath,
704 compression=parquet_compression,
705 **kwargs,
706 )
709def _row_iterator_page_to_arrow(page, column_names, arrow_types):
710 # Iterate over the page to force the API request to get the page data.
711 try:
712 next(iter(page))
713 except StopIteration:
714 pass
716 arrays = []
717 for column_index, arrow_type in enumerate(arrow_types):
718 arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type))
720 if isinstance(column_names, pyarrow.Schema):
721 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names)
722 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)
725def download_arrow_row_iterator(pages, bq_schema):
726 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
728 Args:
729 pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
730 An iterator over the result pages.
731 bq_schema (Sequence[Union[ \
732 :class:`~google.cloud.bigquery.schema.SchemaField`, \
733 Mapping[str, Any] \
734 ]]):
735 A decription of the fields in result pages.
736 Yields:
737 :class:`pyarrow.RecordBatch`
738 The next page of records as a ``pyarrow`` record batch.
739 """
740 bq_schema = schema._to_schema_fields(bq_schema)
741 column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
742 arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]
744 for page in pages:
745 yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
748def _row_iterator_page_to_dataframe(page, column_names, dtypes):
749 # Iterate over the page to force the API request to get the page data.
750 try:
751 next(iter(page))
752 except StopIteration:
753 pass
755 columns = {}
756 for column_index, column_name in enumerate(column_names):
757 dtype = dtypes.get(column_name)
758 columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype)
760 return pandas.DataFrame(columns, columns=column_names)
763def download_dataframe_row_iterator(pages, bq_schema, dtypes):
764 """Use HTTP JSON RowIterator to construct a DataFrame.
766 Args:
767 pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
768 An iterator over the result pages.
769 bq_schema (Sequence[Union[ \
770 :class:`~google.cloud.bigquery.schema.SchemaField`, \
771 Mapping[str, Any] \
772 ]]):
773 A decription of the fields in result pages.
774 dtypes(Mapping[str, numpy.dtype]):
775 The types of columns in result data to hint construction of the
776 resulting DataFrame. Not all column types have to be specified.
777 Yields:
778 :class:`pandas.DataFrame`
779 The next page of records as a ``pandas.DataFrame`` record batch.
780 """
781 bq_schema = schema._to_schema_fields(bq_schema)
782 column_names = [field.name for field in bq_schema]
783 for page in pages:
784 yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
787def _bqstorage_page_to_arrow(page):
788 return page.to_arrow()
791def _bqstorage_page_to_dataframe(column_names, dtypes, page):
792 # page.to_dataframe() does not preserve column order in some versions
793 # of google-cloud-bigquery-storage. Access by column name to rearrange.
794 return page.to_dataframe(dtypes=dtypes)[column_names]
797def _download_table_bqstorage_stream(
798 download_state, bqstorage_client, session, stream, worker_queue, page_to_item
799):
800 reader = bqstorage_client.read_rows(stream.name)
802 # Avoid deprecation warnings for passing in unnecessary read session.
803 # https://github.com/googleapis/python-bigquery-storage/issues/229
804 if _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional:
805 rowstream = reader.rows()
806 else:
807 rowstream = reader.rows(session)
809 for page in rowstream.pages:
810 if download_state.done:
811 return
812 item = page_to_item(page)
813 worker_queue.put(item)
816def _nowait(futures):
817 """Separate finished and unfinished threads, much like
818 :func:`concurrent.futures.wait`, but don't wait.
819 """
820 done = []
821 not_done = []
822 for future in futures:
823 if future.done():
824 done.append(future)
825 else:
826 not_done.append(future)
827 return done, not_done
830def _download_table_bqstorage(
831 project_id,
832 table,
833 bqstorage_client,
834 preserve_order=False,
835 selected_fields=None,
836 page_to_item=None,
837 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
838):
839 """Use (faster, but billable) BQ Storage API to construct DataFrame."""
841 # Passing a BQ Storage client in implies that the BigQuery Storage library
842 # is available and can be imported.
843 from google.cloud import bigquery_storage
845 if "$" in table.table_id:
846 raise ValueError(
847 "Reading from a specific partition is not currently supported."
848 )
849 if "@" in table.table_id:
850 raise ValueError("Reading from a specific snapshot is not currently supported.")
852 requested_streams = 1 if preserve_order else 0
854 requested_session = bigquery_storage.types.ReadSession(
855 table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW
856 )
857 if selected_fields is not None:
858 for field in selected_fields:
859 requested_session.read_options.selected_fields.append(field.name)
861 if _ARROW_COMPRESSION_SUPPORT:
862 requested_session.read_options.arrow_serialization_options.buffer_compression = (
863 ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
864 )
866 session = bqstorage_client.create_read_session(
867 parent="projects/{}".format(project_id),
868 read_session=requested_session,
869 max_stream_count=requested_streams,
870 )
872 _LOGGER.debug(
873 "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
874 table.project, table.dataset_id, table.table_id, session.name
875 )
876 )
878 # Avoid reading rows from an empty table.
879 if not session.streams:
880 return
882 total_streams = len(session.streams)
884 # Use _DownloadState to notify worker threads when to quit.
885 # See: https://stackoverflow.com/a/29237343/101923
886 download_state = _DownloadState()
888 # Create a queue to collect frames as they are created in each thread.
889 #
890 # The queue needs to be bounded by default, because if the user code processes the
891 # fetched result pages too slowly, while at the same time new pages are rapidly being
892 # fetched from the server, the queue can grow to the point where the process runs
893 # out of memory.
894 if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT:
895 max_queue_size = total_streams
896 elif max_queue_size is None:
897 max_queue_size = 0 # unbounded
899 worker_queue = queue.Queue(maxsize=max_queue_size)
901 with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
902 try:
903 # Manually submit jobs and wait for download to complete rather
904 # than using pool.map because pool.map continues running in the
905 # background even if there is an exception on the main thread.
906 # See: https://github.com/googleapis/google-cloud-python/pull/7698
907 not_done = [
908 pool.submit(
909 _download_table_bqstorage_stream,
910 download_state,
911 bqstorage_client,
912 session,
913 stream,
914 worker_queue,
915 page_to_item,
916 )
917 for stream in session.streams
918 ]
920 while not_done:
921 # Don't block on the worker threads. For performance reasons,
922 # we want to block on the queue's get method, instead. This
923 # prevents the queue from filling up, because the main thread
924 # has smaller gaps in time between calls to the queue's get
925 # method. For a detailed explaination, see:
926 # https://friendliness.dev/2019/06/18/python-nowait/
927 done, not_done = _nowait(not_done)
928 for future in done:
929 # Call result() on any finished threads to raise any
930 # exceptions encountered.
931 future.result()
933 try:
934 frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
935 yield frame
936 except queue.Empty: # pragma: NO COVER
937 continue
939 # Return any remaining values after the workers finished.
940 while True: # pragma: NO COVER
941 try:
942 frame = worker_queue.get_nowait()
943 yield frame
944 except queue.Empty: # pragma: NO COVER
945 break
946 finally:
947 # No need for a lock because reading/replacing a variable is
948 # defined to be an atomic operation in the Python language
949 # definition (enforced by the global interpreter lock).
950 download_state.done = True
952 # Shutdown all background threads, now that they should know to
953 # exit early.
954 pool.shutdown(wait=True)
957def download_arrow_bqstorage(
958 project_id,
959 table,
960 bqstorage_client,
961 preserve_order=False,
962 selected_fields=None,
963 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
964):
965 return _download_table_bqstorage(
966 project_id,
967 table,
968 bqstorage_client,
969 preserve_order=preserve_order,
970 selected_fields=selected_fields,
971 page_to_item=_bqstorage_page_to_arrow,
972 max_queue_size=max_queue_size,
973 )
976def download_dataframe_bqstorage(
977 project_id,
978 table,
979 bqstorage_client,
980 column_names,
981 dtypes,
982 preserve_order=False,
983 selected_fields=None,
984 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
985):
986 page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
987 return _download_table_bqstorage(
988 project_id,
989 table,
990 bqstorage_client,
991 preserve_order=preserve_order,
992 selected_fields=selected_fields,
993 page_to_item=page_to_item,
994 max_queue_size=max_queue_size,
995 )
998def dataframe_to_json_generator(dataframe):
999 for row in dataframe.itertuples(index=False, name=None):
1000 output = {}
1001 for column, value in zip(dataframe.columns, row):
1002 # Omit NaN values.
1003 is_nan = pandas.isna(value)
1005 # isna() can also return an array-like of bools, but the latter's boolean
1006 # value is ambiguous, hence an extra check. An array-like value is *not*
1007 # considered a NaN, however.
1008 if isinstance(is_nan, bool) and is_nan:
1009 continue
1010 output[column] = value
1012 yield output
1015def verify_pandas_imports():
1016 if pandas is None:
1017 raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception
1018 if db_dtypes is None:
1019 raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception