1# Copyright 2019 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Shared helper functions for connecting BigQuery and pandas.
16
17NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package,
18instead. See: go/pandas-gbq-and-bigframes-redundancy and
19https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pandas_to_bigquery.py
20"""
21
22import concurrent.futures
23from datetime import datetime
24import functools
25from itertools import islice
26import logging
27import queue
28import threading
29import time
30import warnings
31from typing import Any, Union, Optional, Callable, Generator, List
32
33
34from google.cloud.bigquery import _pyarrow_helpers
35from google.cloud.bigquery import _versions_helpers
36from google.cloud.bigquery import retry as bq_retry
37from google.cloud.bigquery import schema
38
39
40try:
41 import pandas # type: ignore
42
43 pandas_import_exception = None
44except ImportError as exc:
45 pandas = None
46 pandas_import_exception = exc
47else:
48 import numpy
49
50
51try:
52 import pandas_gbq.schema.pandas_to_bigquery # type: ignore
53
54 pandas_gbq_import_exception = None
55except ImportError as exc:
56 pandas_gbq = None
57 pandas_gbq_import_exception = exc
58
59
60try:
61 import db_dtypes # type: ignore
62
63 date_dtype_name = db_dtypes.DateDtype.name
64 time_dtype_name = db_dtypes.TimeDtype.name
65 db_dtypes_import_exception = None
66except ImportError as exc:
67 db_dtypes = None
68 db_dtypes_import_exception = exc
69 date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype
70
71pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()
72
73try:
74 # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array`
75 from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore
76except ImportError:
77 # No shapely, use NoneType for _BaseGeometry as a placeholder.
78 _BaseGeometry = type(None)
79else:
80 # We don't have any unit test sessions that install shapely but not pandas.
81 if pandas is not None: # pragma: NO COVER
82
83 def _to_wkb():
84 from shapely import wkb # type: ignore
85
86 write = wkb.dumps
87 notnull = pandas.notnull
88
89 def _to_wkb(v):
90 return write(v) if notnull(v) else v
91
92 return _to_wkb
93
94 _to_wkb = _to_wkb()
95
96try:
97 from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions
98except ImportError:
99 _ARROW_COMPRESSION_SUPPORT = False
100else:
101 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
102 _ARROW_COMPRESSION_SUPPORT = True
103
104_LOGGER = logging.getLogger(__name__)
105
106_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds.
107
108_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads
109
110_NO_PANDAS_ERROR = "Please install the 'pandas' package to use this function."
111_NO_DB_TYPES_ERROR = "Please install the 'db-dtypes' package to use this function."
112
113_PANDAS_DTYPE_TO_BQ = {
114 "bool": "BOOLEAN",
115 "datetime64[ns, UTC]": "TIMESTAMP",
116 "datetime64[ns]": "DATETIME",
117 "float32": "FLOAT",
118 "float64": "FLOAT",
119 "int8": "INTEGER",
120 "int16": "INTEGER",
121 "int32": "INTEGER",
122 "int64": "INTEGER",
123 "uint8": "INTEGER",
124 "uint16": "INTEGER",
125 "uint32": "INTEGER",
126 "geometry": "GEOGRAPHY",
127 date_dtype_name: "DATE",
128 time_dtype_name: "TIME",
129}
130
131
132class _DownloadState(object):
133 """Flag to indicate that a thread should exit early."""
134
135 def __init__(self):
136 # No need for a lock because reading/replacing a variable is defined to
137 # be an atomic operation in the Python language definition (enforced by
138 # the global interpreter lock).
139 self.done = False
140 # To assist with testing and understanding the behavior of the
141 # download, use this object as shared state to track how many worker
142 # threads have started and have gracefully shutdown.
143 self._started_workers_lock = threading.Lock()
144 self.started_workers = 0
145 self._finished_workers_lock = threading.Lock()
146 self.finished_workers = 0
147
148 def start(self):
149 with self._started_workers_lock:
150 self.started_workers += 1
151
152 def finish(self):
153 with self._finished_workers_lock:
154 self.finished_workers += 1
155
156
157BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = {
158 "GEOGRAPHY": {
159 b"ARROW:extension:name": b"google:sqlType:geography",
160 b"ARROW:extension:metadata": b'{"encoding": "WKT"}',
161 },
162 "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"},
163 "JSON": {b"ARROW:extension:name": b"google:sqlType:json"},
164}
165
166
167def bq_to_arrow_struct_data_type(field):
168 arrow_fields = []
169 for subfield in field.fields:
170 arrow_subfield = bq_to_arrow_field(subfield)
171 if arrow_subfield:
172 arrow_fields.append(arrow_subfield)
173 else:
174 # Could not determine a subfield type. Fallback to type
175 # inference.
176 return None
177 return pyarrow.struct(arrow_fields)
178
179
180def bq_to_arrow_range_data_type(field):
181 if field is None:
182 raise ValueError(
183 "Range element type cannot be None, must be one of "
184 "DATE, DATETIME, or TIMESTAMP"
185 )
186 element_type = field.element_type.upper()
187 arrow_element_type = _pyarrow_helpers.bq_to_arrow_scalars(element_type)()
188 return pyarrow.struct([("start", arrow_element_type), ("end", arrow_element_type)])
189
190
191def bq_to_arrow_data_type(field):
192 """Return the Arrow data type, corresponding to a given BigQuery column.
193
194 Returns:
195 None: if default Arrow type inspection should be used.
196 """
197 if field.mode is not None and field.mode.upper() == "REPEATED":
198 inner_type = bq_to_arrow_data_type(
199 schema.SchemaField(field.name, field.field_type, fields=field.fields)
200 )
201 if inner_type:
202 return pyarrow.list_(inner_type)
203 return None
204
205 field_type_upper = field.field_type.upper() if field.field_type else ""
206 if field_type_upper in schema._STRUCT_TYPES:
207 return bq_to_arrow_struct_data_type(field)
208
209 if field_type_upper == "RANGE":
210 return bq_to_arrow_range_data_type(field.range_element_type)
211
212 data_type_constructor = _pyarrow_helpers.bq_to_arrow_scalars(field_type_upper)
213 if data_type_constructor is None:
214 return None
215 return data_type_constructor()
216
217
218def bq_to_arrow_field(bq_field, array_type=None):
219 """Return the Arrow field, corresponding to a given BigQuery column.
220
221 Returns:
222 None: if the Arrow type cannot be determined.
223 """
224 arrow_type = bq_to_arrow_data_type(bq_field)
225 if arrow_type is not None:
226 if array_type is not None:
227 arrow_type = array_type # For GEOGRAPHY, at least initially
228 metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get(
229 bq_field.field_type.upper() if bq_field.field_type else ""
230 )
231 return pyarrow.field(
232 bq_field.name,
233 arrow_type,
234 # Even if the remote schema is REQUIRED, there's a chance there's
235 # local NULL values. Arrow will gladly interpret these NULL values
236 # as non-NULL and give you an arbitrary value. See:
237 # https://github.com/googleapis/python-bigquery/issues/1692
238 nullable=False if bq_field.mode.upper() == "REPEATED" else True,
239 metadata=metadata,
240 )
241
242 warnings.warn(
243 "Unable to determine Arrow type for field '{}'.".format(bq_field.name)
244 )
245 return None
246
247
248def bq_to_arrow_schema(bq_schema):
249 """Return the Arrow schema, corresponding to a given BigQuery schema.
250
251 Returns:
252 None: if any Arrow type cannot be determined.
253 """
254 arrow_fields = []
255 for bq_field in bq_schema:
256 arrow_field = bq_to_arrow_field(bq_field)
257 if arrow_field is None:
258 # Auto-detect the schema if there is an unknown field type.
259 return None
260 arrow_fields.append(arrow_field)
261 return pyarrow.schema(arrow_fields)
262
263
264def default_types_mapper(
265 date_as_object: bool = False,
266 bool_dtype: Union[Any, None] = None,
267 int_dtype: Union[Any, None] = None,
268 float_dtype: Union[Any, None] = None,
269 string_dtype: Union[Any, None] = None,
270 date_dtype: Union[Any, None] = None,
271 datetime_dtype: Union[Any, None] = None,
272 time_dtype: Union[Any, None] = None,
273 timestamp_dtype: Union[Any, None] = None,
274 range_date_dtype: Union[Any, None] = None,
275 range_datetime_dtype: Union[Any, None] = None,
276 range_timestamp_dtype: Union[Any, None] = None,
277):
278 """Create a mapping from pyarrow types to pandas types.
279
280 This overrides the pandas defaults to use null-safe extension types where
281 available.
282
283 See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of
284 data types. See:
285 tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for
286 BigQuery to Arrow type mapping.
287
288 Note to google-cloud-bigquery developers: If you update the default dtypes,
289 also update the docs at docs/usage/pandas.rst.
290 """
291
292 def types_mapper(arrow_data_type):
293 if bool_dtype is not None and pyarrow.types.is_boolean(arrow_data_type):
294 return bool_dtype
295
296 elif int_dtype is not None and pyarrow.types.is_integer(arrow_data_type):
297 return int_dtype
298
299 elif float_dtype is not None and pyarrow.types.is_floating(arrow_data_type):
300 return float_dtype
301
302 elif string_dtype is not None and pyarrow.types.is_string(arrow_data_type):
303 return string_dtype
304
305 elif (
306 # If date_as_object is True, we know some DATE columns are
307 # out-of-bounds of what is supported by pandas.
308 date_dtype is not None
309 and not date_as_object
310 and pyarrow.types.is_date(arrow_data_type)
311 ):
312 return date_dtype
313
314 elif (
315 datetime_dtype is not None
316 and pyarrow.types.is_timestamp(arrow_data_type)
317 and arrow_data_type.tz is None
318 ):
319 return datetime_dtype
320
321 elif (
322 timestamp_dtype is not None
323 and pyarrow.types.is_timestamp(arrow_data_type)
324 and arrow_data_type.tz is not None
325 ):
326 return timestamp_dtype
327
328 elif time_dtype is not None and pyarrow.types.is_time(arrow_data_type):
329 return time_dtype
330
331 elif pyarrow.types.is_struct(arrow_data_type):
332 if range_datetime_dtype is not None and arrow_data_type.equals(
333 range_datetime_dtype.pyarrow_dtype
334 ):
335 return range_datetime_dtype
336
337 elif range_date_dtype is not None and arrow_data_type.equals(
338 range_date_dtype.pyarrow_dtype
339 ):
340 return range_date_dtype
341
342 elif range_timestamp_dtype is not None and arrow_data_type.equals(
343 range_timestamp_dtype.pyarrow_dtype
344 ):
345 return range_timestamp_dtype
346
347 return types_mapper
348
349
350def bq_to_arrow_array(series, bq_field):
351 if bq_field.field_type.upper() == "GEOGRAPHY":
352 arrow_type = None
353 first = _first_valid(series)
354 if first is not None:
355 if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry):
356 arrow_type = pyarrow.binary()
357 # Convert shapey geometry to WKB binary format:
358 series = series.apply(_to_wkb)
359 elif isinstance(first, bytes):
360 arrow_type = pyarrow.binary()
361 elif series.dtype.name == "geometry":
362 # We have a GeoSeries containing all nulls, convert it to a pandas series
363 series = pandas.Series(numpy.array(series))
364
365 if arrow_type is None:
366 arrow_type = bq_to_arrow_data_type(bq_field)
367 else:
368 arrow_type = bq_to_arrow_data_type(bq_field)
369
370 field_type_upper = bq_field.field_type.upper() if bq_field.field_type else ""
371
372 try:
373 if bq_field.mode.upper() == "REPEATED":
374 return pyarrow.ListArray.from_pandas(series, type=arrow_type)
375 if field_type_upper in schema._STRUCT_TYPES:
376 return pyarrow.StructArray.from_pandas(series, type=arrow_type)
377 return pyarrow.Array.from_pandas(series, type=arrow_type)
378 except pyarrow.ArrowTypeError:
379 msg = f"""Error converting Pandas column with name: "{series.name}" and datatype: "{series.dtype}" to an appropriate pyarrow datatype: Array, ListArray, or StructArray"""
380 _LOGGER.error(msg)
381 raise pyarrow.ArrowTypeError(msg)
382
383
384def get_column_or_index(dataframe, name):
385 """Return a column or index as a pandas series."""
386 if name in dataframe.columns:
387 return dataframe[name].reset_index(drop=True)
388
389 if isinstance(dataframe.index, pandas.MultiIndex):
390 if name in dataframe.index.names:
391 return (
392 dataframe.index.get_level_values(name)
393 .to_series()
394 .reset_index(drop=True)
395 )
396 else:
397 if name == dataframe.index.name:
398 return dataframe.index.to_series().reset_index(drop=True)
399
400 raise ValueError("column or index '{}' not found.".format(name))
401
402
403def list_columns_and_indexes(dataframe):
404 """Return all index and column names with dtypes.
405
406 Returns:
407 Sequence[Tuple[str, dtype]]:
408 Returns a sorted list of indexes and column names with
409 corresponding dtypes. If an index is missing a name or has the
410 same name as a column, the index is omitted.
411 """
412 column_names = frozenset(dataframe.columns)
413 columns_and_indexes = []
414 if isinstance(dataframe.index, pandas.MultiIndex):
415 for name in dataframe.index.names:
416 if name and name not in column_names:
417 values = dataframe.index.get_level_values(name)
418 columns_and_indexes.append((name, values.dtype))
419 else:
420 if dataframe.index.name and dataframe.index.name not in column_names:
421 columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype))
422
423 columns_and_indexes += zip(dataframe.columns, dataframe.dtypes)
424 return columns_and_indexes
425
426
427def _first_valid(series):
428 first_valid_index = series.first_valid_index()
429 if first_valid_index is not None:
430 return series.at[first_valid_index]
431
432
433def _first_array_valid(series):
434 """Return the first "meaningful" element from the array series.
435
436 Here, "meaningful" means the first non-None element in one of the arrays that can
437 be used for type detextion.
438 """
439 first_valid_index = series.first_valid_index()
440 if first_valid_index is None:
441 return None
442
443 valid_array = series.at[first_valid_index]
444 valid_item = next((item for item in valid_array if not pandas.isna(item)), None)
445
446 if valid_item is not None:
447 return valid_item
448
449 # Valid item is None because all items in the "valid" array are invalid. Try
450 # to find a true valid array manually.
451 for array in islice(series, first_valid_index + 1, None):
452 try:
453 array_iter = iter(array)
454 except TypeError:
455 continue # Not an array, apparently, e.g. None, thus skip.
456 valid_item = next((item for item in array_iter if not pandas.isna(item)), None)
457 if valid_item is not None:
458 break
459
460 return valid_item
461
462
463def dataframe_to_bq_schema(dataframe, bq_schema):
464 """Convert a pandas DataFrame schema to a BigQuery schema.
465
466 DEPRECATED: Use
467 pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(),
468 instead. See: go/pandas-gbq-and-bigframes-redundancy.
469
470 Args:
471 dataframe (pandas.DataFrame):
472 DataFrame for which the client determines the BigQuery schema.
473 bq_schema (Sequence[Union[ \
474 :class:`~google.cloud.bigquery.schema.SchemaField`, \
475 Mapping[str, Any] \
476 ]]):
477 A BigQuery schema. Use this argument to override the autodetected
478 type for some or all of the DataFrame columns.
479
480 Returns:
481 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
482 The automatically determined schema. Returns None if the type of
483 any column cannot be determined.
484 """
485 if pandas_gbq is None:
486 warnings.warn(
487 "Loading pandas DataFrame into BigQuery will require pandas-gbq "
488 "package version 0.26.1 or greater in the future. "
489 f"Tried to import pandas-gbq and got: {pandas_gbq_import_exception}",
490 category=FutureWarning,
491 )
492 else:
493 return pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(
494 dataframe,
495 override_bigquery_fields=bq_schema,
496 index=True,
497 )
498
499 if bq_schema:
500 bq_schema = schema._to_schema_fields(bq_schema)
501 bq_schema_index = {field.name: field for field in bq_schema}
502 bq_schema_unused = set(bq_schema_index.keys())
503 else:
504 bq_schema_index = {}
505 bq_schema_unused = set()
506
507 bq_schema_out = []
508 unknown_type_columns = []
509 dataframe_reset_index = dataframe.reset_index()
510 for column, dtype in list_columns_and_indexes(dataframe):
511 # Step 1: use provided type from schema, if present.
512 bq_field = bq_schema_index.get(column)
513 if bq_field:
514 bq_schema_out.append(bq_field)
515 bq_schema_unused.discard(bq_field.name)
516 continue
517
518 # Step 2: try to automatically determine the type based on the
519 # pandas dtype.
520 bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
521 if bq_type is None:
522 sample_data = _first_valid(dataframe_reset_index[column])
523 if (
524 isinstance(sample_data, _BaseGeometry)
525 and sample_data is not None # Paranoia
526 ):
527 bq_type = "GEOGRAPHY"
528 if bq_type is not None:
529 bq_schema_out.append(schema.SchemaField(column, bq_type))
530 continue
531
532 # Step 3: try with pyarrow if available
533 bq_field = _get_schema_by_pyarrow(column, dataframe_reset_index[column])
534 if bq_field is not None:
535 bq_schema_out.append(bq_field)
536 continue
537
538 unknown_type_columns.append(column)
539
540 # Catch any schema mismatch. The developer explicitly asked to serialize a
541 # column, but it was not found.
542 if bq_schema_unused:
543 raise ValueError(
544 "bq_schema contains fields not present in dataframe: {}".format(
545 bq_schema_unused
546 )
547 )
548
549 if unknown_type_columns != []:
550 msg = "Could not determine the type of columns: {}".format(
551 ", ".join(unknown_type_columns)
552 )
553 warnings.warn(msg)
554 return None # We cannot detect the schema in full.
555
556 return tuple(bq_schema_out)
557
558
559def _get_schema_by_pyarrow(name, series):
560 """Attempt to detect the type of the given series by leveraging PyArrow's
561 type detection capabilities.
562
563 This function requires the ``pyarrow`` library to be installed and
564 available. If the series type cannot be determined or ``pyarrow`` is not
565 available, ``None`` is returned.
566
567 Args:
568 name (str):
569 the column name of the SchemaField.
570 series (pandas.Series):
571 The Series data for which to detect the data type.
572 Returns:
573 Optional[google.cloud.bigquery.schema.SchemaField]:
574 A tuple containing the BigQuery-compatible type string (e.g.,
575 "STRING", "INTEGER", "TIMESTAMP", "DATETIME", "NUMERIC", "BIGNUMERIC")
576 and the mode string ("NULLABLE", "REPEATED").
577 Returns ``None`` if the type cannot be determined or ``pyarrow``
578 is not imported.
579 """
580
581 if not pyarrow:
582 return None
583
584 arrow_table = pyarrow.array(series)
585 if pyarrow.types.is_list(arrow_table.type):
586 # `pyarrow.ListType`
587 mode = "REPEATED"
588 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.values.type.id)
589
590 # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
591 # it to such datetimes, causing them to be recognized as TIMESTAMP type.
592 # We thus additionally check the actual data to see if we need to overrule
593 # that and choose DATETIME instead.
594 # Note that this should only be needed for datetime values inside a list,
595 # since scalar datetime values have a proper Pandas dtype that allows
596 # distinguishing between timezone-naive and timezone-aware values before
597 # even requiring the additional schema augment logic in this method.
598 if type == "TIMESTAMP":
599 valid_item = _first_array_valid(series)
600 if isinstance(valid_item, datetime) and valid_item.tzinfo is None:
601 type = "DATETIME"
602 else:
603 mode = "NULLABLE" # default mode
604 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.type.id)
605 if type == "NUMERIC" and arrow_table.type.scale > 9:
606 type = "BIGNUMERIC"
607
608 if type is not None:
609 return schema.SchemaField(name, type, mode)
610 else:
611 return None
612
613
614def dataframe_to_arrow(dataframe, bq_schema):
615 """Convert pandas dataframe to Arrow table, using BigQuery schema.
616
617 Args:
618 dataframe (pandas.DataFrame):
619 DataFrame to convert to Arrow table.
620 bq_schema (Sequence[Union[ \
621 :class:`~google.cloud.bigquery.schema.SchemaField`, \
622 Mapping[str, Any] \
623 ]]):
624 Desired BigQuery schema. The number of columns must match the
625 number of columns in the DataFrame.
626
627 Returns:
628 pyarrow.Table:
629 Table containing dataframe data, with schema derived from
630 BigQuery schema.
631 """
632 column_names = set(dataframe.columns)
633 column_and_index_names = set(
634 name for name, _ in list_columns_and_indexes(dataframe)
635 )
636
637 bq_schema = schema._to_schema_fields(bq_schema)
638 bq_field_names = set(field.name for field in bq_schema)
639
640 extra_fields = bq_field_names - column_and_index_names
641 if extra_fields:
642 raise ValueError(
643 "bq_schema contains fields not present in dataframe: {}".format(
644 extra_fields
645 )
646 )
647
648 # It's okay for indexes to be missing from bq_schema, but it's not okay to
649 # be missing columns.
650 missing_fields = column_names - bq_field_names
651 if missing_fields:
652 raise ValueError(
653 "bq_schema is missing fields from dataframe: {}".format(missing_fields)
654 )
655
656 arrow_arrays = []
657 arrow_names = []
658 arrow_fields = []
659 for bq_field in bq_schema:
660 arrow_names.append(bq_field.name)
661 arrow_arrays.append(
662 bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
663 )
664 arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type))
665
666 if all((field is not None for field in arrow_fields)):
667 return pyarrow.Table.from_arrays(
668 arrow_arrays, schema=pyarrow.schema(arrow_fields)
669 )
670 return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)
671
672
673def dataframe_to_parquet(
674 dataframe,
675 bq_schema,
676 filepath,
677 parquet_compression="SNAPPY",
678 parquet_use_compliant_nested_type=True,
679):
680 """Write dataframe as a Parquet file, according to the desired BQ schema.
681
682 This function requires the :mod:`pyarrow` package. Arrow is used as an
683 intermediate format.
684
685 Args:
686 dataframe (pandas.DataFrame):
687 DataFrame to convert to Parquet file.
688 bq_schema (Sequence[Union[ \
689 :class:`~google.cloud.bigquery.schema.SchemaField`, \
690 Mapping[str, Any] \
691 ]]):
692 Desired BigQuery schema. Number of columns must match number of
693 columns in the DataFrame.
694 filepath (str):
695 Path to write Parquet file to.
696 parquet_compression (Optional[str]):
697 The compression codec to use by the the ``pyarrow.parquet.write_table``
698 serializing method. Defaults to "SNAPPY".
699 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
700 parquet_use_compliant_nested_type (bool):
701 Whether the ``pyarrow.parquet.write_table`` serializing method should write
702 compliant Parquet nested type (lists). Defaults to ``True``.
703 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types
704 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
705
706 This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
707 """
708 pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
709
710 import pyarrow.parquet # type: ignore
711
712 kwargs = (
713 {"use_compliant_nested_type": parquet_use_compliant_nested_type}
714 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type
715 else {}
716 )
717
718 bq_schema = schema._to_schema_fields(bq_schema)
719 arrow_table = dataframe_to_arrow(dataframe, bq_schema)
720 pyarrow.parquet.write_table(
721 arrow_table,
722 filepath,
723 compression=parquet_compression,
724 **kwargs,
725 )
726
727
728def _row_iterator_page_to_arrow(page, column_names, arrow_types):
729 # Iterate over the page to force the API request to get the page data.
730 try:
731 next(iter(page))
732 except StopIteration:
733 pass
734
735 arrays = []
736 for column_index, arrow_type in enumerate(arrow_types):
737 arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type))
738
739 if isinstance(column_names, pyarrow.Schema):
740 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names)
741 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)
742
743
744def download_arrow_row_iterator(pages, bq_schema, timeout=None):
745 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
746
747 Args:
748 pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
749 An iterator over the result pages.
750 bq_schema (Sequence[Union[ \
751 :class:`~google.cloud.bigquery.schema.SchemaField`, \
752 Mapping[str, Any] \
753 ]]):
754 A decription of the fields in result pages.
755 timeout (Optional[float]):
756 The number of seconds to wait for the underlying download to complete.
757 If ``None``, wait indefinitely.
758
759 Yields:
760 :class:`pyarrow.RecordBatch`
761 The next page of records as a ``pyarrow`` record batch.
762 """
763 bq_schema = schema._to_schema_fields(bq_schema)
764 column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
765 arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]
766
767 if timeout is None:
768 for page in pages:
769 yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
770 else:
771 start_time = time.monotonic()
772 for page in pages:
773 if time.monotonic() - start_time > timeout:
774 raise concurrent.futures.TimeoutError()
775
776 yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
777
778
779def _row_iterator_page_to_dataframe(page, column_names, dtypes):
780 # Iterate over the page to force the API request to get the page data.
781 try:
782 next(iter(page))
783 except StopIteration:
784 pass
785
786 columns = {}
787 for column_index, column_name in enumerate(column_names):
788 dtype = dtypes.get(column_name)
789 columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype)
790
791 return pandas.DataFrame(columns, columns=column_names)
792
793
794def download_dataframe_row_iterator(pages, bq_schema, dtypes, timeout=None):
795 """Use HTTP JSON RowIterator to construct a DataFrame.
796
797 Args:
798 pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
799 An iterator over the result pages.
800 bq_schema (Sequence[Union[ \
801 :class:`~google.cloud.bigquery.schema.SchemaField`, \
802 Mapping[str, Any] \
803 ]]):
804 A decription of the fields in result pages.
805 dtypes(Mapping[str, numpy.dtype]):
806 The types of columns in result data to hint construction of the
807 resulting DataFrame. Not all column types have to be specified.
808 timeout (Optional[float]):
809 The number of seconds to wait for the underlying download to complete.
810 If ``None``, wait indefinitely.
811
812 Yields:
813 :class:`pandas.DataFrame`
814 The next page of records as a ``pandas.DataFrame`` record batch.
815 """
816 bq_schema = schema._to_schema_fields(bq_schema)
817 column_names = [field.name for field in bq_schema]
818
819 if timeout is None:
820 for page in pages:
821 yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
822 else:
823 start_time = time.monotonic()
824 for page in pages:
825 if time.monotonic() - start_time > timeout:
826 raise concurrent.futures.TimeoutError()
827
828 yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
829
830
831def _bqstorage_page_to_arrow(page):
832 return page.to_arrow()
833
834
835def _bqstorage_page_to_dataframe(column_names, dtypes, page):
836 # page.to_dataframe() does not preserve column order in some versions
837 # of google-cloud-bigquery-storage. Access by column name to rearrange.
838 return page.to_dataframe(dtypes=dtypes)[column_names]
839
840
841def _download_table_bqstorage_stream(
842 download_state, bqstorage_client, session, stream, worker_queue, page_to_item
843):
844 download_state.start()
845 try:
846 reader = bqstorage_client.read_rows(stream.name)
847
848 # Avoid deprecation warnings for passing in unnecessary read session.
849 # https://github.com/googleapis/python-bigquery-storage/issues/229
850 if _versions_helpers.BQ_STORAGE_VERSIONS.is_read_session_optional:
851 rowstream = reader.rows()
852 else:
853 rowstream = reader.rows(session)
854
855 for page in rowstream.pages:
856 item = page_to_item(page)
857
858 # Make sure we set a timeout on put() so that we give the worker
859 # thread opportunities to shutdown gracefully, for example if the
860 # parent thread shuts down or the parent generator object which
861 # collects rows from all workers goes out of scope. See:
862 # https://github.com/googleapis/python-bigquery/issues/2032
863 while True:
864 if download_state.done:
865 return
866 try:
867 worker_queue.put(item, timeout=_PROGRESS_INTERVAL)
868 break
869 except queue.Full:
870 continue
871 finally:
872 download_state.finish()
873
874
875def _nowait(futures):
876 """Separate finished and unfinished threads, much like
877 :func:`concurrent.futures.wait`, but don't wait.
878 """
879 done = []
880 not_done = []
881 for future in futures:
882 if future.done():
883 done.append(future)
884 else:
885 not_done.append(future)
886 return done, not_done
887
888
889def _download_table_bqstorage(
890 project_id: str,
891 table: Any,
892 bqstorage_client: Any,
893 preserve_order: bool = False,
894 selected_fields: Optional[List[Any]] = None,
895 page_to_item: Optional[Callable] = None,
896 max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT,
897 max_stream_count: Optional[int] = None,
898 download_state: Optional[_DownloadState] = None,
899 timeout: Optional[float] = None,
900) -> Generator[Any, None, None]:
901 """Downloads a BigQuery table using the BigQuery Storage API.
902
903 This method uses the faster, but potentially more expensive, BigQuery
904 Storage API to download a table as a Pandas DataFrame. It supports
905 parallel downloads and optional data transformations.
906
907 Args:
908 project_id (str): The ID of the Google Cloud project containing
909 the table.
910 table (Any): The BigQuery table to download.
911 bqstorage_client (Any): An
912 authenticated BigQuery Storage API client.
913 preserve_order (bool, optional): Whether to preserve the order
914 of the rows as they are read from BigQuery. If True this limits
915 the number of streams to one and overrides `max_stream_count`.
916 Defaults to False.
917 selected_fields (Optional[List[SchemaField]]):
918 A list of BigQuery schema fields to select for download. If None,
919 all fields are downloaded. Defaults to None.
920 page_to_item (Optional[Callable]): An optional callable
921 function that takes a page of data from the BigQuery Storage API
922 max_stream_count (Optional[int]): The maximum number of
923 concurrent streams to use for downloading data. If `preserve_order`
924 is True, the requested streams are limited to 1 regardless of the
925 `max_stream_count` value. If 0 or None, then the number of
926 requested streams will be unbounded. Defaults to None.
927 download_state (Optional[_DownloadState]):
928 A threadsafe state object which can be used to observe the
929 behavior of the worker threads created by this method.
930 timeout (Optional[float]):
931 The number of seconds to wait for the download to complete.
932 If None, wait indefinitely.
933
934 Yields:
935 pandas.DataFrame: Pandas DataFrames, one for each chunk of data
936 downloaded from BigQuery.
937
938 Raises:
939 ValueError: If attempting to read from a specific partition or snapshot.
940 concurrent.futures.TimeoutError:
941 If the download does not complete within the specified timeout.
942
943 Note:
944 This method requires the `google-cloud-bigquery-storage` library
945 to be installed.
946 """
947
948 from google.cloud import bigquery_storage
949
950 if "$" in table.table_id:
951 raise ValueError(
952 "Reading from a specific partition is not currently supported."
953 )
954 if "@" in table.table_id:
955 raise ValueError("Reading from a specific snapshot is not currently supported.")
956
957 start_time = time.monotonic()
958 requested_streams = determine_requested_streams(preserve_order, max_stream_count)
959
960 requested_session = bigquery_storage.types.stream.ReadSession(
961 table=table.to_bqstorage(),
962 data_format=bigquery_storage.types.stream.DataFormat.ARROW,
963 )
964 if selected_fields is not None:
965 for field in selected_fields:
966 requested_session.read_options.selected_fields.append(field.name)
967
968 if _ARROW_COMPRESSION_SUPPORT:
969 requested_session.read_options.arrow_serialization_options.buffer_compression = (
970 # CompressionCodec(1) -> LZ4_FRAME
971 ArrowSerializationOptions.CompressionCodec(1)
972 )
973
974 retry_policy = (
975 bq_retry.DEFAULT_RETRY.with_deadline(timeout) if timeout is not None else None
976 )
977
978 session = bqstorage_client.create_read_session(
979 parent="projects/{}".format(project_id),
980 read_session=requested_session,
981 max_stream_count=requested_streams,
982 retry=retry_policy,
983 timeout=timeout,
984 )
985
986 _LOGGER.debug(
987 "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
988 table.project, table.dataset_id, table.table_id, session.name
989 )
990 )
991
992 # Avoid reading rows from an empty table.
993 if not session.streams:
994 return
995
996 total_streams = len(session.streams)
997
998 # Use _DownloadState to notify worker threads when to quit.
999 # See: https://stackoverflow.com/a/29237343/101923
1000 if download_state is None:
1001 download_state = _DownloadState()
1002
1003 # Create a queue to collect frames as they are created in each thread.
1004 #
1005 # The queue needs to be bounded by default, because if the user code processes the
1006 # fetched result pages too slowly, while at the same time new pages are rapidly being
1007 # fetched from the server, the queue can grow to the point where the process runs
1008 # out of memory.
1009 if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT:
1010 max_queue_size = total_streams
1011 elif max_queue_size is None:
1012 max_queue_size = 0 # unbounded
1013
1014 worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size)
1015
1016 # Manually manage the pool to control shutdown behavior on timeout.
1017 pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams))
1018 wait_on_shutdown = True
1019 try:
1020 # Manually submit jobs and wait for download to complete rather
1021 # than using pool.map because pool.map continues running in the
1022 # background even if there is an exception on the main thread.
1023 # See: https://github.com/googleapis/google-cloud-python/pull/7698
1024 not_done = [
1025 pool.submit(
1026 _download_table_bqstorage_stream,
1027 download_state,
1028 bqstorage_client,
1029 session,
1030 stream,
1031 worker_queue,
1032 page_to_item,
1033 )
1034 for stream in session.streams
1035 ]
1036
1037 while not_done:
1038 # Check for timeout
1039 if timeout is not None:
1040 elapsed = time.monotonic() - start_time
1041 if elapsed > timeout:
1042 wait_on_shutdown = False
1043 raise concurrent.futures.TimeoutError(
1044 f"Download timed out after {timeout} seconds."
1045 )
1046
1047 # Don't block on the worker threads. For performance reasons,
1048 # we want to block on the queue's get method, instead. This
1049 # prevents the queue from filling up, because the main thread
1050 # has smaller gaps in time between calls to the queue's get
1051 # method. For a detailed explanation, see:
1052 # https://friendliness.dev/2019/06/18/python-nowait/
1053 done, not_done = _nowait(not_done)
1054 for future in done:
1055 # Call result() on any finished threads to raise any
1056 # exceptions encountered.
1057 future.result()
1058
1059 try:
1060 frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
1061 yield frame
1062 except queue.Empty: # pragma: NO COVER
1063 continue
1064
1065 # Return any remaining values after the workers finished.
1066 while True: # pragma: NO COVER
1067 try:
1068 frame = worker_queue.get_nowait()
1069 yield frame
1070 except queue.Empty: # pragma: NO COVER
1071 break
1072 finally:
1073 # No need for a lock because reading/replacing a variable is
1074 # defined to be an atomic operation in the Python language
1075 # definition (enforced by the global interpreter lock).
1076 download_state.done = True
1077
1078 # Shutdown all background threads, now that they should know to
1079 # exit early.
1080 pool.shutdown(wait=wait_on_shutdown)
1081
1082
1083def download_arrow_bqstorage(
1084 project_id,
1085 table,
1086 bqstorage_client,
1087 preserve_order=False,
1088 selected_fields=None,
1089 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
1090 max_stream_count=None,
1091 timeout=None,
1092):
1093 return _download_table_bqstorage(
1094 project_id,
1095 table,
1096 bqstorage_client,
1097 preserve_order=preserve_order,
1098 selected_fields=selected_fields,
1099 page_to_item=_bqstorage_page_to_arrow,
1100 max_queue_size=max_queue_size,
1101 max_stream_count=max_stream_count,
1102 timeout=timeout,
1103 )
1104
1105
1106def download_dataframe_bqstorage(
1107 project_id,
1108 table,
1109 bqstorage_client,
1110 column_names,
1111 dtypes,
1112 preserve_order=False,
1113 selected_fields=None,
1114 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
1115 max_stream_count=None,
1116 timeout=None,
1117):
1118 page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
1119 return _download_table_bqstorage(
1120 project_id,
1121 table,
1122 bqstorage_client,
1123 preserve_order=preserve_order,
1124 selected_fields=selected_fields,
1125 page_to_item=page_to_item,
1126 max_queue_size=max_queue_size,
1127 max_stream_count=max_stream_count,
1128 timeout=timeout,
1129 )
1130
1131
1132def dataframe_to_json_generator(dataframe):
1133 for row in dataframe.itertuples(index=False, name=None):
1134 output = {}
1135 for column, value in zip(dataframe.columns, row):
1136 # Omit NaN values.
1137 is_nan = pandas.isna(value)
1138
1139 # isna() can also return an array-like of bools, but the latter's boolean
1140 # value is ambiguous, hence an extra check. An array-like value is *not*
1141 # considered a NaN, however.
1142 if isinstance(is_nan, bool) and is_nan:
1143 continue
1144
1145 # Convert numpy types to corresponding Python types.
1146 # https://stackoverflow.com/a/60441783/101923
1147 if isinstance(value, numpy.bool_):
1148 value = bool(value)
1149 elif isinstance(
1150 value,
1151 (
1152 numpy.int64,
1153 numpy.int32,
1154 numpy.int16,
1155 numpy.int8,
1156 numpy.uint64,
1157 numpy.uint32,
1158 numpy.uint16,
1159 numpy.uint8,
1160 ),
1161 ):
1162 value = int(value)
1163 output[column] = value
1164
1165 yield output
1166
1167
1168def verify_pandas_imports():
1169 if pandas is None:
1170 raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception
1171 if db_dtypes is None:
1172 raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception
1173
1174
1175def determine_requested_streams(
1176 preserve_order: bool,
1177 max_stream_count: Union[int, None],
1178) -> int:
1179 """Determines the value of requested_streams based on the values of
1180 `preserve_order` and `max_stream_count`.
1181
1182 Args:
1183 preserve_order (bool): Whether to preserve the order of streams. If True,
1184 this limits the number of streams to one. `preserve_order` takes
1185 precedence over `max_stream_count`.
1186 max_stream_count (Union[int, None]]): The maximum number of streams
1187 allowed. Must be a non-negative number or None, where None indicates
1188 the value is unset. NOTE: if `preserve_order` is also set, it takes
1189 precedence over `max_stream_count`, thus to ensure that `max_stream_count`
1190 is used, ensure that `preserve_order` is None.
1191
1192 Returns:
1193 (int) The appropriate value for requested_streams.
1194 """
1195
1196 if preserve_order:
1197 # If preserve order is set, it takes precedence.
1198 # Limit the requested streams to 1, to ensure that order
1199 # is preserved)
1200 return 1
1201
1202 elif max_stream_count is not None:
1203 # If preserve_order is not set, only then do we consider max_stream_count
1204 if max_stream_count <= -1:
1205 raise ValueError("max_stream_count must be non-negative OR None")
1206 return max_stream_count
1207
1208 # Default to zero requested streams (unbounded).
1209 return 0