1# Copyright 2019 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Shared helper functions for connecting BigQuery and pandas.
16
17NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package,
18instead. See: go/pandas-gbq-and-bigframes-redundancy and
19https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pandas_to_bigquery.py
20"""
21
22import concurrent.futures
23from datetime import datetime
24import functools
25from itertools import islice
26import logging
27import queue
28import threading
29import warnings
30from typing import Any, Union, Optional, Callable, Generator, List
31
32
33from google.cloud.bigquery import _pyarrow_helpers
34from google.cloud.bigquery import _versions_helpers
35from google.cloud.bigquery import schema
36
37
38try:
39 import pandas # type: ignore
40
41 pandas_import_exception = None
42except ImportError as exc:
43 pandas = None
44 pandas_import_exception = exc
45else:
46 import numpy
47
48
49try:
50 import pandas_gbq.schema.pandas_to_bigquery # type: ignore
51
52 pandas_gbq_import_exception = None
53except ImportError as exc:
54 pandas_gbq = None
55 pandas_gbq_import_exception = exc
56
57
58try:
59 import db_dtypes # type: ignore
60
61 date_dtype_name = db_dtypes.DateDtype.name
62 time_dtype_name = db_dtypes.TimeDtype.name
63 db_dtypes_import_exception = None
64except ImportError as exc:
65 db_dtypes = None
66 db_dtypes_import_exception = exc
67 date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype
68
69pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()
70
71try:
72 # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array`
73 from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore
74except ImportError:
75 # No shapely, use NoneType for _BaseGeometry as a placeholder.
76 _BaseGeometry = type(None)
77else:
78 # We don't have any unit test sessions that install shapely but not pandas.
79 if pandas is not None: # pragma: NO COVER
80
81 def _to_wkb():
82 from shapely import wkb # type: ignore
83
84 write = wkb.dumps
85 notnull = pandas.notnull
86
87 def _to_wkb(v):
88 return write(v) if notnull(v) else v
89
90 return _to_wkb
91
92 _to_wkb = _to_wkb()
93
94try:
95 from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions
96except ImportError:
97 _ARROW_COMPRESSION_SUPPORT = False
98else:
99 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
100 _ARROW_COMPRESSION_SUPPORT = True
101
102_LOGGER = logging.getLogger(__name__)
103
104_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds.
105
106_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads
107
108_NO_PANDAS_ERROR = "Please install the 'pandas' package to use this function."
109_NO_DB_TYPES_ERROR = "Please install the 'db-dtypes' package to use this function."
110
111_PANDAS_DTYPE_TO_BQ = {
112 "bool": "BOOLEAN",
113 "datetime64[ns, UTC]": "TIMESTAMP",
114 "datetime64[ns]": "DATETIME",
115 "float32": "FLOAT",
116 "float64": "FLOAT",
117 "int8": "INTEGER",
118 "int16": "INTEGER",
119 "int32": "INTEGER",
120 "int64": "INTEGER",
121 "uint8": "INTEGER",
122 "uint16": "INTEGER",
123 "uint32": "INTEGER",
124 "geometry": "GEOGRAPHY",
125 date_dtype_name: "DATE",
126 time_dtype_name: "TIME",
127}
128
129
130class _DownloadState(object):
131 """Flag to indicate that a thread should exit early."""
132
133 def __init__(self):
134 # No need for a lock because reading/replacing a variable is defined to
135 # be an atomic operation in the Python language definition (enforced by
136 # the global interpreter lock).
137 self.done = False
138 # To assist with testing and understanding the behavior of the
139 # download, use this object as shared state to track how many worker
140 # threads have started and have gracefully shutdown.
141 self._started_workers_lock = threading.Lock()
142 self.started_workers = 0
143 self._finished_workers_lock = threading.Lock()
144 self.finished_workers = 0
145
146 def start(self):
147 with self._started_workers_lock:
148 self.started_workers += 1
149
150 def finish(self):
151 with self._finished_workers_lock:
152 self.finished_workers += 1
153
154
155BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = {
156 "GEOGRAPHY": {
157 b"ARROW:extension:name": b"google:sqlType:geography",
158 b"ARROW:extension:metadata": b'{"encoding": "WKT"}',
159 },
160 "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"},
161 "JSON": {b"ARROW:extension:name": b"google:sqlType:json"},
162}
163
164
165def bq_to_arrow_struct_data_type(field):
166 arrow_fields = []
167 for subfield in field.fields:
168 arrow_subfield = bq_to_arrow_field(subfield)
169 if arrow_subfield:
170 arrow_fields.append(arrow_subfield)
171 else:
172 # Could not determine a subfield type. Fallback to type
173 # inference.
174 return None
175 return pyarrow.struct(arrow_fields)
176
177
178def bq_to_arrow_range_data_type(field):
179 if field is None:
180 raise ValueError(
181 "Range element type cannot be None, must be one of "
182 "DATE, DATETIME, or TIMESTAMP"
183 )
184 element_type = field.element_type.upper()
185 arrow_element_type = _pyarrow_helpers.bq_to_arrow_scalars(element_type)()
186 return pyarrow.struct([("start", arrow_element_type), ("end", arrow_element_type)])
187
188
189def bq_to_arrow_data_type(field):
190 """Return the Arrow data type, corresponding to a given BigQuery column.
191
192 Returns:
193 None: if default Arrow type inspection should be used.
194 """
195 if field.mode is not None and field.mode.upper() == "REPEATED":
196 inner_type = bq_to_arrow_data_type(
197 schema.SchemaField(field.name, field.field_type, fields=field.fields)
198 )
199 if inner_type:
200 return pyarrow.list_(inner_type)
201 return None
202
203 field_type_upper = field.field_type.upper() if field.field_type else ""
204 if field_type_upper in schema._STRUCT_TYPES:
205 return bq_to_arrow_struct_data_type(field)
206
207 if field_type_upper == "RANGE":
208 return bq_to_arrow_range_data_type(field.range_element_type)
209
210 data_type_constructor = _pyarrow_helpers.bq_to_arrow_scalars(field_type_upper)
211 if data_type_constructor is None:
212 return None
213 return data_type_constructor()
214
215
216def bq_to_arrow_field(bq_field, array_type=None):
217 """Return the Arrow field, corresponding to a given BigQuery column.
218
219 Returns:
220 None: if the Arrow type cannot be determined.
221 """
222 arrow_type = bq_to_arrow_data_type(bq_field)
223 if arrow_type is not None:
224 if array_type is not None:
225 arrow_type = array_type # For GEOGRAPHY, at least initially
226 metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get(
227 bq_field.field_type.upper() if bq_field.field_type else ""
228 )
229 return pyarrow.field(
230 bq_field.name,
231 arrow_type,
232 # Even if the remote schema is REQUIRED, there's a chance there's
233 # local NULL values. Arrow will gladly interpret these NULL values
234 # as non-NULL and give you an arbitrary value. See:
235 # https://github.com/googleapis/python-bigquery/issues/1692
236 nullable=False if bq_field.mode.upper() == "REPEATED" else True,
237 metadata=metadata,
238 )
239
240 warnings.warn(
241 "Unable to determine Arrow type for field '{}'.".format(bq_field.name)
242 )
243 return None
244
245
246def bq_to_arrow_schema(bq_schema):
247 """Return the Arrow schema, corresponding to a given BigQuery schema.
248
249 Returns:
250 None: if any Arrow type cannot be determined.
251 """
252 arrow_fields = []
253 for bq_field in bq_schema:
254 arrow_field = bq_to_arrow_field(bq_field)
255 if arrow_field is None:
256 # Auto-detect the schema if there is an unknown field type.
257 return None
258 arrow_fields.append(arrow_field)
259 return pyarrow.schema(arrow_fields)
260
261
262def default_types_mapper(
263 date_as_object: bool = False,
264 bool_dtype: Union[Any, None] = None,
265 int_dtype: Union[Any, None] = None,
266 float_dtype: Union[Any, None] = None,
267 string_dtype: Union[Any, None] = None,
268 date_dtype: Union[Any, None] = None,
269 datetime_dtype: Union[Any, None] = None,
270 time_dtype: Union[Any, None] = None,
271 timestamp_dtype: Union[Any, None] = None,
272 range_date_dtype: Union[Any, None] = None,
273 range_datetime_dtype: Union[Any, None] = None,
274 range_timestamp_dtype: Union[Any, None] = None,
275):
276 """Create a mapping from pyarrow types to pandas types.
277
278 This overrides the pandas defaults to use null-safe extension types where
279 available.
280
281 See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of
282 data types. See:
283 tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for
284 BigQuery to Arrow type mapping.
285
286 Note to google-cloud-bigquery developers: If you update the default dtypes,
287 also update the docs at docs/usage/pandas.rst.
288 """
289
290 def types_mapper(arrow_data_type):
291 if bool_dtype is not None and pyarrow.types.is_boolean(arrow_data_type):
292 return bool_dtype
293
294 elif int_dtype is not None and pyarrow.types.is_integer(arrow_data_type):
295 return int_dtype
296
297 elif float_dtype is not None and pyarrow.types.is_floating(arrow_data_type):
298 return float_dtype
299
300 elif string_dtype is not None and pyarrow.types.is_string(arrow_data_type):
301 return string_dtype
302
303 elif (
304 # If date_as_object is True, we know some DATE columns are
305 # out-of-bounds of what is supported by pandas.
306 date_dtype is not None
307 and not date_as_object
308 and pyarrow.types.is_date(arrow_data_type)
309 ):
310 return date_dtype
311
312 elif (
313 datetime_dtype is not None
314 and pyarrow.types.is_timestamp(arrow_data_type)
315 and arrow_data_type.tz is None
316 ):
317 return datetime_dtype
318
319 elif (
320 timestamp_dtype is not None
321 and pyarrow.types.is_timestamp(arrow_data_type)
322 and arrow_data_type.tz is not None
323 ):
324 return timestamp_dtype
325
326 elif time_dtype is not None and pyarrow.types.is_time(arrow_data_type):
327 return time_dtype
328
329 elif pyarrow.types.is_struct(arrow_data_type):
330 if range_datetime_dtype is not None and arrow_data_type.equals(
331 range_datetime_dtype.pyarrow_dtype
332 ):
333 return range_datetime_dtype
334
335 elif range_date_dtype is not None and arrow_data_type.equals(
336 range_date_dtype.pyarrow_dtype
337 ):
338 return range_date_dtype
339
340 elif range_timestamp_dtype is not None and arrow_data_type.equals(
341 range_timestamp_dtype.pyarrow_dtype
342 ):
343 return range_timestamp_dtype
344
345 return types_mapper
346
347
348def bq_to_arrow_array(series, bq_field):
349 if bq_field.field_type.upper() == "GEOGRAPHY":
350 arrow_type = None
351 first = _first_valid(series)
352 if first is not None:
353 if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry):
354 arrow_type = pyarrow.binary()
355 # Convert shapey geometry to WKB binary format:
356 series = series.apply(_to_wkb)
357 elif isinstance(first, bytes):
358 arrow_type = pyarrow.binary()
359 elif series.dtype.name == "geometry":
360 # We have a GeoSeries containing all nulls, convert it to a pandas series
361 series = pandas.Series(numpy.array(series))
362
363 if arrow_type is None:
364 arrow_type = bq_to_arrow_data_type(bq_field)
365 else:
366 arrow_type = bq_to_arrow_data_type(bq_field)
367
368 field_type_upper = bq_field.field_type.upper() if bq_field.field_type else ""
369
370 try:
371 if bq_field.mode.upper() == "REPEATED":
372 return pyarrow.ListArray.from_pandas(series, type=arrow_type)
373 if field_type_upper in schema._STRUCT_TYPES:
374 return pyarrow.StructArray.from_pandas(series, type=arrow_type)
375 return pyarrow.Array.from_pandas(series, type=arrow_type)
376 except pyarrow.ArrowTypeError:
377 msg = f"""Error converting Pandas column with name: "{series.name}" and datatype: "{series.dtype}" to an appropriate pyarrow datatype: Array, ListArray, or StructArray"""
378 _LOGGER.error(msg)
379 raise pyarrow.ArrowTypeError(msg)
380
381
382def get_column_or_index(dataframe, name):
383 """Return a column or index as a pandas series."""
384 if name in dataframe.columns:
385 return dataframe[name].reset_index(drop=True)
386
387 if isinstance(dataframe.index, pandas.MultiIndex):
388 if name in dataframe.index.names:
389 return (
390 dataframe.index.get_level_values(name)
391 .to_series()
392 .reset_index(drop=True)
393 )
394 else:
395 if name == dataframe.index.name:
396 return dataframe.index.to_series().reset_index(drop=True)
397
398 raise ValueError("column or index '{}' not found.".format(name))
399
400
401def list_columns_and_indexes(dataframe):
402 """Return all index and column names with dtypes.
403
404 Returns:
405 Sequence[Tuple[str, dtype]]:
406 Returns a sorted list of indexes and column names with
407 corresponding dtypes. If an index is missing a name or has the
408 same name as a column, the index is omitted.
409 """
410 column_names = frozenset(dataframe.columns)
411 columns_and_indexes = []
412 if isinstance(dataframe.index, pandas.MultiIndex):
413 for name in dataframe.index.names:
414 if name and name not in column_names:
415 values = dataframe.index.get_level_values(name)
416 columns_and_indexes.append((name, values.dtype))
417 else:
418 if dataframe.index.name and dataframe.index.name not in column_names:
419 columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype))
420
421 columns_and_indexes += zip(dataframe.columns, dataframe.dtypes)
422 return columns_and_indexes
423
424
425def _first_valid(series):
426 first_valid_index = series.first_valid_index()
427 if first_valid_index is not None:
428 return series.at[first_valid_index]
429
430
431def _first_array_valid(series):
432 """Return the first "meaningful" element from the array series.
433
434 Here, "meaningful" means the first non-None element in one of the arrays that can
435 be used for type detextion.
436 """
437 first_valid_index = series.first_valid_index()
438 if first_valid_index is None:
439 return None
440
441 valid_array = series.at[first_valid_index]
442 valid_item = next((item for item in valid_array if not pandas.isna(item)), None)
443
444 if valid_item is not None:
445 return valid_item
446
447 # Valid item is None because all items in the "valid" array are invalid. Try
448 # to find a true valid array manually.
449 for array in islice(series, first_valid_index + 1, None):
450 try:
451 array_iter = iter(array)
452 except TypeError:
453 continue # Not an array, apparently, e.g. None, thus skip.
454 valid_item = next((item for item in array_iter if not pandas.isna(item)), None)
455 if valid_item is not None:
456 break
457
458 return valid_item
459
460
461def dataframe_to_bq_schema(dataframe, bq_schema):
462 """Convert a pandas DataFrame schema to a BigQuery schema.
463
464 DEPRECATED: Use
465 pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(),
466 instead. See: go/pandas-gbq-and-bigframes-redundancy.
467
468 Args:
469 dataframe (pandas.DataFrame):
470 DataFrame for which the client determines the BigQuery schema.
471 bq_schema (Sequence[Union[ \
472 :class:`~google.cloud.bigquery.schema.SchemaField`, \
473 Mapping[str, Any] \
474 ]]):
475 A BigQuery schema. Use this argument to override the autodetected
476 type for some or all of the DataFrame columns.
477
478 Returns:
479 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
480 The automatically determined schema. Returns None if the type of
481 any column cannot be determined.
482 """
483 if pandas_gbq is None:
484 warnings.warn(
485 "Loading pandas DataFrame into BigQuery will require pandas-gbq "
486 "package version 0.26.1 or greater in the future. "
487 f"Tried to import pandas-gbq and got: {pandas_gbq_import_exception}",
488 category=FutureWarning,
489 )
490 else:
491 return pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(
492 dataframe,
493 override_bigquery_fields=bq_schema,
494 index=True,
495 )
496
497 if bq_schema:
498 bq_schema = schema._to_schema_fields(bq_schema)
499 bq_schema_index = {field.name: field for field in bq_schema}
500 bq_schema_unused = set(bq_schema_index.keys())
501 else:
502 bq_schema_index = {}
503 bq_schema_unused = set()
504
505 bq_schema_out = []
506 unknown_type_columns = []
507 dataframe_reset_index = dataframe.reset_index()
508 for column, dtype in list_columns_and_indexes(dataframe):
509 # Step 1: use provided type from schema, if present.
510 bq_field = bq_schema_index.get(column)
511 if bq_field:
512 bq_schema_out.append(bq_field)
513 bq_schema_unused.discard(bq_field.name)
514 continue
515
516 # Step 2: try to automatically determine the type based on the
517 # pandas dtype.
518 bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
519 if bq_type is None:
520 sample_data = _first_valid(dataframe_reset_index[column])
521 if (
522 isinstance(sample_data, _BaseGeometry)
523 and sample_data is not None # Paranoia
524 ):
525 bq_type = "GEOGRAPHY"
526 if bq_type is not None:
527 bq_schema_out.append(schema.SchemaField(column, bq_type))
528 continue
529
530 # Step 3: try with pyarrow if available
531 bq_field = _get_schema_by_pyarrow(column, dataframe_reset_index[column])
532 if bq_field is not None:
533 bq_schema_out.append(bq_field)
534 continue
535
536 unknown_type_columns.append(column)
537
538 # Catch any schema mismatch. The developer explicitly asked to serialize a
539 # column, but it was not found.
540 if bq_schema_unused:
541 raise ValueError(
542 "bq_schema contains fields not present in dataframe: {}".format(
543 bq_schema_unused
544 )
545 )
546
547 if unknown_type_columns != []:
548 msg = "Could not determine the type of columns: {}".format(
549 ", ".join(unknown_type_columns)
550 )
551 warnings.warn(msg)
552 return None # We cannot detect the schema in full.
553
554 return tuple(bq_schema_out)
555
556
557def _get_schema_by_pyarrow(name, series):
558 """Attempt to detect the type of the given series by leveraging PyArrow's
559 type detection capabilities.
560
561 This function requires the ``pyarrow`` library to be installed and
562 available. If the series type cannot be determined or ``pyarrow`` is not
563 available, ``None`` is returned.
564
565 Args:
566 name (str):
567 the column name of the SchemaField.
568 series (pandas.Series):
569 The Series data for which to detect the data type.
570 Returns:
571 Optional[google.cloud.bigquery.schema.SchemaField]:
572 A tuple containing the BigQuery-compatible type string (e.g.,
573 "STRING", "INTEGER", "TIMESTAMP", "DATETIME", "NUMERIC", "BIGNUMERIC")
574 and the mode string ("NULLABLE", "REPEATED").
575 Returns ``None`` if the type cannot be determined or ``pyarrow``
576 is not imported.
577 """
578
579 if not pyarrow:
580 return None
581
582 arrow_table = pyarrow.array(series)
583 if pyarrow.types.is_list(arrow_table.type):
584 # `pyarrow.ListType`
585 mode = "REPEATED"
586 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.values.type.id)
587
588 # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
589 # it to such datetimes, causing them to be recognized as TIMESTAMP type.
590 # We thus additionally check the actual data to see if we need to overrule
591 # that and choose DATETIME instead.
592 # Note that this should only be needed for datetime values inside a list,
593 # since scalar datetime values have a proper Pandas dtype that allows
594 # distinguishing between timezone-naive and timezone-aware values before
595 # even requiring the additional schema augment logic in this method.
596 if type == "TIMESTAMP":
597 valid_item = _first_array_valid(series)
598 if isinstance(valid_item, datetime) and valid_item.tzinfo is None:
599 type = "DATETIME"
600 else:
601 mode = "NULLABLE" # default mode
602 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.type.id)
603 if type == "NUMERIC" and arrow_table.type.scale > 9:
604 type = "BIGNUMERIC"
605
606 if type is not None:
607 return schema.SchemaField(name, type, mode)
608 else:
609 return None
610
611
612def dataframe_to_arrow(dataframe, bq_schema):
613 """Convert pandas dataframe to Arrow table, using BigQuery schema.
614
615 Args:
616 dataframe (pandas.DataFrame):
617 DataFrame to convert to Arrow table.
618 bq_schema (Sequence[Union[ \
619 :class:`~google.cloud.bigquery.schema.SchemaField`, \
620 Mapping[str, Any] \
621 ]]):
622 Desired BigQuery schema. The number of columns must match the
623 number of columns in the DataFrame.
624
625 Returns:
626 pyarrow.Table:
627 Table containing dataframe data, with schema derived from
628 BigQuery schema.
629 """
630 column_names = set(dataframe.columns)
631 column_and_index_names = set(
632 name for name, _ in list_columns_and_indexes(dataframe)
633 )
634
635 bq_schema = schema._to_schema_fields(bq_schema)
636 bq_field_names = set(field.name for field in bq_schema)
637
638 extra_fields = bq_field_names - column_and_index_names
639 if extra_fields:
640 raise ValueError(
641 "bq_schema contains fields not present in dataframe: {}".format(
642 extra_fields
643 )
644 )
645
646 # It's okay for indexes to be missing from bq_schema, but it's not okay to
647 # be missing columns.
648 missing_fields = column_names - bq_field_names
649 if missing_fields:
650 raise ValueError(
651 "bq_schema is missing fields from dataframe: {}".format(missing_fields)
652 )
653
654 arrow_arrays = []
655 arrow_names = []
656 arrow_fields = []
657 for bq_field in bq_schema:
658 arrow_names.append(bq_field.name)
659 arrow_arrays.append(
660 bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
661 )
662 arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type))
663
664 if all((field is not None for field in arrow_fields)):
665 return pyarrow.Table.from_arrays(
666 arrow_arrays, schema=pyarrow.schema(arrow_fields)
667 )
668 return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)
669
670
671def dataframe_to_parquet(
672 dataframe,
673 bq_schema,
674 filepath,
675 parquet_compression="SNAPPY",
676 parquet_use_compliant_nested_type=True,
677):
678 """Write dataframe as a Parquet file, according to the desired BQ schema.
679
680 This function requires the :mod:`pyarrow` package. Arrow is used as an
681 intermediate format.
682
683 Args:
684 dataframe (pandas.DataFrame):
685 DataFrame to convert to Parquet file.
686 bq_schema (Sequence[Union[ \
687 :class:`~google.cloud.bigquery.schema.SchemaField`, \
688 Mapping[str, Any] \
689 ]]):
690 Desired BigQuery schema. Number of columns must match number of
691 columns in the DataFrame.
692 filepath (str):
693 Path to write Parquet file to.
694 parquet_compression (Optional[str]):
695 The compression codec to use by the the ``pyarrow.parquet.write_table``
696 serializing method. Defaults to "SNAPPY".
697 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
698 parquet_use_compliant_nested_type (bool):
699 Whether the ``pyarrow.parquet.write_table`` serializing method should write
700 compliant Parquet nested type (lists). Defaults to ``True``.
701 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types
702 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
703
704 This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
705 """
706 pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
707
708 import pyarrow.parquet # type: ignore
709
710 kwargs = (
711 {"use_compliant_nested_type": parquet_use_compliant_nested_type}
712 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type
713 else {}
714 )
715
716 bq_schema = schema._to_schema_fields(bq_schema)
717 arrow_table = dataframe_to_arrow(dataframe, bq_schema)
718 pyarrow.parquet.write_table(
719 arrow_table,
720 filepath,
721 compression=parquet_compression,
722 **kwargs,
723 )
724
725
726def _row_iterator_page_to_arrow(page, column_names, arrow_types):
727 # Iterate over the page to force the API request to get the page data.
728 try:
729 next(iter(page))
730 except StopIteration:
731 pass
732
733 arrays = []
734 for column_index, arrow_type in enumerate(arrow_types):
735 arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type))
736
737 if isinstance(column_names, pyarrow.Schema):
738 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names)
739 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)
740
741
742def download_arrow_row_iterator(pages, bq_schema):
743 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
744
745 Args:
746 pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
747 An iterator over the result pages.
748 bq_schema (Sequence[Union[ \
749 :class:`~google.cloud.bigquery.schema.SchemaField`, \
750 Mapping[str, Any] \
751 ]]):
752 A decription of the fields in result pages.
753 Yields:
754 :class:`pyarrow.RecordBatch`
755 The next page of records as a ``pyarrow`` record batch.
756 """
757 bq_schema = schema._to_schema_fields(bq_schema)
758 column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
759 arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]
760
761 for page in pages:
762 yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
763
764
765def _row_iterator_page_to_dataframe(page, column_names, dtypes):
766 # Iterate over the page to force the API request to get the page data.
767 try:
768 next(iter(page))
769 except StopIteration:
770 pass
771
772 columns = {}
773 for column_index, column_name in enumerate(column_names):
774 dtype = dtypes.get(column_name)
775 columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype)
776
777 return pandas.DataFrame(columns, columns=column_names)
778
779
780def download_dataframe_row_iterator(pages, bq_schema, dtypes):
781 """Use HTTP JSON RowIterator to construct a DataFrame.
782
783 Args:
784 pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
785 An iterator over the result pages.
786 bq_schema (Sequence[Union[ \
787 :class:`~google.cloud.bigquery.schema.SchemaField`, \
788 Mapping[str, Any] \
789 ]]):
790 A decription of the fields in result pages.
791 dtypes(Mapping[str, numpy.dtype]):
792 The types of columns in result data to hint construction of the
793 resulting DataFrame. Not all column types have to be specified.
794 Yields:
795 :class:`pandas.DataFrame`
796 The next page of records as a ``pandas.DataFrame`` record batch.
797 """
798 bq_schema = schema._to_schema_fields(bq_schema)
799 column_names = [field.name for field in bq_schema]
800 for page in pages:
801 yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
802
803
804def _bqstorage_page_to_arrow(page):
805 return page.to_arrow()
806
807
808def _bqstorage_page_to_dataframe(column_names, dtypes, page):
809 # page.to_dataframe() does not preserve column order in some versions
810 # of google-cloud-bigquery-storage. Access by column name to rearrange.
811 return page.to_dataframe(dtypes=dtypes)[column_names]
812
813
814def _download_table_bqstorage_stream(
815 download_state, bqstorage_client, session, stream, worker_queue, page_to_item
816):
817 download_state.start()
818 try:
819 reader = bqstorage_client.read_rows(stream.name)
820
821 # Avoid deprecation warnings for passing in unnecessary read session.
822 # https://github.com/googleapis/python-bigquery-storage/issues/229
823 if _versions_helpers.BQ_STORAGE_VERSIONS.is_read_session_optional:
824 rowstream = reader.rows()
825 else:
826 rowstream = reader.rows(session)
827
828 for page in rowstream.pages:
829 item = page_to_item(page)
830
831 # Make sure we set a timeout on put() so that we give the worker
832 # thread opportunities to shutdown gracefully, for example if the
833 # parent thread shuts down or the parent generator object which
834 # collects rows from all workers goes out of scope. See:
835 # https://github.com/googleapis/python-bigquery/issues/2032
836 while True:
837 if download_state.done:
838 return
839 try:
840 worker_queue.put(item, timeout=_PROGRESS_INTERVAL)
841 break
842 except queue.Full:
843 continue
844 finally:
845 download_state.finish()
846
847
848def _nowait(futures):
849 """Separate finished and unfinished threads, much like
850 :func:`concurrent.futures.wait`, but don't wait.
851 """
852 done = []
853 not_done = []
854 for future in futures:
855 if future.done():
856 done.append(future)
857 else:
858 not_done.append(future)
859 return done, not_done
860
861
862def _download_table_bqstorage(
863 project_id: str,
864 table: Any,
865 bqstorage_client: Any,
866 preserve_order: bool = False,
867 selected_fields: Optional[List[Any]] = None,
868 page_to_item: Optional[Callable] = None,
869 max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT,
870 max_stream_count: Optional[int] = None,
871 download_state: Optional[_DownloadState] = None,
872) -> Generator[Any, None, None]:
873 """Downloads a BigQuery table using the BigQuery Storage API.
874
875 This method uses the faster, but potentially more expensive, BigQuery
876 Storage API to download a table as a Pandas DataFrame. It supports
877 parallel downloads and optional data transformations.
878
879 Args:
880 project_id (str): The ID of the Google Cloud project containing
881 the table.
882 table (Any): The BigQuery table to download.
883 bqstorage_client (Any): An
884 authenticated BigQuery Storage API client.
885 preserve_order (bool, optional): Whether to preserve the order
886 of the rows as they are read from BigQuery. If True this limits
887 the number of streams to one and overrides `max_stream_count`.
888 Defaults to False.
889 selected_fields (Optional[List[SchemaField]]):
890 A list of BigQuery schema fields to select for download. If None,
891 all fields are downloaded. Defaults to None.
892 page_to_item (Optional[Callable]): An optional callable
893 function that takes a page of data from the BigQuery Storage API
894 max_stream_count (Optional[int]): The maximum number of
895 concurrent streams to use for downloading data. If `preserve_order`
896 is True, the requested streams are limited to 1 regardless of the
897 `max_stream_count` value. If 0 or None, then the number of
898 requested streams will be unbounded. Defaults to None.
899 download_state (Optional[_DownloadState]):
900 A threadsafe state object which can be used to observe the
901 behavior of the worker threads created by this method.
902
903 Yields:
904 pandas.DataFrame: Pandas DataFrames, one for each chunk of data
905 downloaded from BigQuery.
906
907 Raises:
908 ValueError: If attempting to read from a specific partition or snapshot.
909
910 Note:
911 This method requires the `google-cloud-bigquery-storage` library
912 to be installed.
913 """
914
915 from google.cloud import bigquery_storage
916
917 if "$" in table.table_id:
918 raise ValueError(
919 "Reading from a specific partition is not currently supported."
920 )
921 if "@" in table.table_id:
922 raise ValueError("Reading from a specific snapshot is not currently supported.")
923
924 requested_streams = determine_requested_streams(preserve_order, max_stream_count)
925
926 requested_session = bigquery_storage.types.stream.ReadSession(
927 table=table.to_bqstorage(),
928 data_format=bigquery_storage.types.stream.DataFormat.ARROW,
929 )
930 if selected_fields is not None:
931 for field in selected_fields:
932 requested_session.read_options.selected_fields.append(field.name)
933
934 if _ARROW_COMPRESSION_SUPPORT:
935 requested_session.read_options.arrow_serialization_options.buffer_compression = (
936 # CompressionCodec(1) -> LZ4_FRAME
937 ArrowSerializationOptions.CompressionCodec(1)
938 )
939
940 session = bqstorage_client.create_read_session(
941 parent="projects/{}".format(project_id),
942 read_session=requested_session,
943 max_stream_count=requested_streams,
944 )
945
946 _LOGGER.debug(
947 "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
948 table.project, table.dataset_id, table.table_id, session.name
949 )
950 )
951
952 # Avoid reading rows from an empty table.
953 if not session.streams:
954 return
955
956 total_streams = len(session.streams)
957
958 # Use _DownloadState to notify worker threads when to quit.
959 # See: https://stackoverflow.com/a/29237343/101923
960 if download_state is None:
961 download_state = _DownloadState()
962
963 # Create a queue to collect frames as they are created in each thread.
964 #
965 # The queue needs to be bounded by default, because if the user code processes the
966 # fetched result pages too slowly, while at the same time new pages are rapidly being
967 # fetched from the server, the queue can grow to the point where the process runs
968 # out of memory.
969 if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT:
970 max_queue_size = total_streams
971 elif max_queue_size is None:
972 max_queue_size = 0 # unbounded
973
974 worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size)
975
976 with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
977 try:
978 # Manually submit jobs and wait for download to complete rather
979 # than using pool.map because pool.map continues running in the
980 # background even if there is an exception on the main thread.
981 # See: https://github.com/googleapis/google-cloud-python/pull/7698
982 not_done = [
983 pool.submit(
984 _download_table_bqstorage_stream,
985 download_state,
986 bqstorage_client,
987 session,
988 stream,
989 worker_queue,
990 page_to_item,
991 )
992 for stream in session.streams
993 ]
994
995 while not_done:
996 # Don't block on the worker threads. For performance reasons,
997 # we want to block on the queue's get method, instead. This
998 # prevents the queue from filling up, because the main thread
999 # has smaller gaps in time between calls to the queue's get
1000 # method. For a detailed explanation, see:
1001 # https://friendliness.dev/2019/06/18/python-nowait/
1002 done, not_done = _nowait(not_done)
1003 for future in done:
1004 # Call result() on any finished threads to raise any
1005 # exceptions encountered.
1006 future.result()
1007
1008 try:
1009 frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
1010 yield frame
1011 except queue.Empty: # pragma: NO COVER
1012 continue
1013
1014 # Return any remaining values after the workers finished.
1015 while True: # pragma: NO COVER
1016 try:
1017 frame = worker_queue.get_nowait()
1018 yield frame
1019 except queue.Empty: # pragma: NO COVER
1020 break
1021 finally:
1022 # No need for a lock because reading/replacing a variable is
1023 # defined to be an atomic operation in the Python language
1024 # definition (enforced by the global interpreter lock).
1025 download_state.done = True
1026
1027 # Shutdown all background threads, now that they should know to
1028 # exit early.
1029 pool.shutdown(wait=True)
1030
1031
1032def download_arrow_bqstorage(
1033 project_id,
1034 table,
1035 bqstorage_client,
1036 preserve_order=False,
1037 selected_fields=None,
1038 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
1039 max_stream_count=None,
1040):
1041 return _download_table_bqstorage(
1042 project_id,
1043 table,
1044 bqstorage_client,
1045 preserve_order=preserve_order,
1046 selected_fields=selected_fields,
1047 page_to_item=_bqstorage_page_to_arrow,
1048 max_queue_size=max_queue_size,
1049 max_stream_count=max_stream_count,
1050 )
1051
1052
1053def download_dataframe_bqstorage(
1054 project_id,
1055 table,
1056 bqstorage_client,
1057 column_names,
1058 dtypes,
1059 preserve_order=False,
1060 selected_fields=None,
1061 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
1062 max_stream_count=None,
1063):
1064 page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
1065 return _download_table_bqstorage(
1066 project_id,
1067 table,
1068 bqstorage_client,
1069 preserve_order=preserve_order,
1070 selected_fields=selected_fields,
1071 page_to_item=page_to_item,
1072 max_queue_size=max_queue_size,
1073 max_stream_count=max_stream_count,
1074 )
1075
1076
1077def dataframe_to_json_generator(dataframe):
1078 for row in dataframe.itertuples(index=False, name=None):
1079 output = {}
1080 for column, value in zip(dataframe.columns, row):
1081 # Omit NaN values.
1082 is_nan = pandas.isna(value)
1083
1084 # isna() can also return an array-like of bools, but the latter's boolean
1085 # value is ambiguous, hence an extra check. An array-like value is *not*
1086 # considered a NaN, however.
1087 if isinstance(is_nan, bool) and is_nan:
1088 continue
1089
1090 # Convert numpy types to corresponding Python types.
1091 # https://stackoverflow.com/a/60441783/101923
1092 if isinstance(value, numpy.bool_):
1093 value = bool(value)
1094 elif isinstance(
1095 value,
1096 (
1097 numpy.int64,
1098 numpy.int32,
1099 numpy.int16,
1100 numpy.int8,
1101 numpy.uint64,
1102 numpy.uint32,
1103 numpy.uint16,
1104 numpy.uint8,
1105 ),
1106 ):
1107 value = int(value)
1108 output[column] = value
1109
1110 yield output
1111
1112
1113def verify_pandas_imports():
1114 if pandas is None:
1115 raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception
1116 if db_dtypes is None:
1117 raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception
1118
1119
1120def determine_requested_streams(
1121 preserve_order: bool,
1122 max_stream_count: Union[int, None],
1123) -> int:
1124 """Determines the value of requested_streams based on the values of
1125 `preserve_order` and `max_stream_count`.
1126
1127 Args:
1128 preserve_order (bool): Whether to preserve the order of streams. If True,
1129 this limits the number of streams to one. `preserve_order` takes
1130 precedence over `max_stream_count`.
1131 max_stream_count (Union[int, None]]): The maximum number of streams
1132 allowed. Must be a non-negative number or None, where None indicates
1133 the value is unset. NOTE: if `preserve_order` is also set, it takes
1134 precedence over `max_stream_count`, thus to ensure that `max_stream_count`
1135 is used, ensure that `preserve_order` is None.
1136
1137 Returns:
1138 (int) The appropriate value for requested_streams.
1139 """
1140
1141 if preserve_order:
1142 # If preserve order is set, it takes precedence.
1143 # Limit the requested streams to 1, to ensure that order
1144 # is preserved)
1145 return 1
1146
1147 elif max_stream_count is not None:
1148 # If preserve_order is not set, only then do we consider max_stream_count
1149 if max_stream_count <= -1:
1150 raise ValueError("max_stream_count must be non-negative OR None")
1151 return max_stream_count
1152
1153 # Default to zero requested streams (unbounded).
1154 return 0