1# Copyright 2019 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Shared helper functions for connecting BigQuery and pandas.
16
17NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package,
18instead. See: go/pandas-gbq-and-bigframes-redundancy and
19https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pandas_to_bigquery.py
20"""
21
22import concurrent.futures
23from datetime import datetime
24import functools
25from itertools import islice
26import logging
27import queue
28import threading
29import time
30import warnings
31from typing import Any, Union, Optional, Callable, Generator, List
32
33
34from google.cloud.bigquery import _pyarrow_helpers
35from google.cloud.bigquery import _versions_helpers
36from google.cloud.bigquery import schema
37
38
39try:
40 import pandas # type: ignore
41
42 pandas_import_exception = None
43except ImportError as exc:
44 pandas = None
45 pandas_import_exception = exc
46else:
47 import numpy
48
49
50try:
51 import pandas_gbq.schema.pandas_to_bigquery # type: ignore
52
53 pandas_gbq_import_exception = None
54except ImportError as exc:
55 pandas_gbq = None
56 pandas_gbq_import_exception = exc
57
58
59try:
60 import db_dtypes # type: ignore
61
62 date_dtype_name = db_dtypes.DateDtype.name
63 time_dtype_name = db_dtypes.TimeDtype.name
64 db_dtypes_import_exception = None
65except ImportError as exc:
66 db_dtypes = None
67 db_dtypes_import_exception = exc
68 date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype
69
70pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()
71
72try:
73 # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array`
74 from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore
75except ImportError:
76 # No shapely, use NoneType for _BaseGeometry as a placeholder.
77 _BaseGeometry = type(None)
78else:
79 # We don't have any unit test sessions that install shapely but not pandas.
80 if pandas is not None: # pragma: NO COVER
81
82 def _to_wkb():
83 from shapely import wkb # type: ignore
84
85 write = wkb.dumps
86 notnull = pandas.notnull
87
88 def _to_wkb(v):
89 return write(v) if notnull(v) else v
90
91 return _to_wkb
92
93 _to_wkb = _to_wkb()
94
95try:
96 from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions
97except ImportError:
98 _ARROW_COMPRESSION_SUPPORT = False
99else:
100 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
101 _ARROW_COMPRESSION_SUPPORT = True
102
103_LOGGER = logging.getLogger(__name__)
104
105_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds.
106
107_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads
108
109_NO_PANDAS_ERROR = "Please install the 'pandas' package to use this function."
110_NO_DB_TYPES_ERROR = "Please install the 'db-dtypes' package to use this function."
111
112_PANDAS_DTYPE_TO_BQ = {
113 "bool": "BOOLEAN",
114 "datetime64[ns, UTC]": "TIMESTAMP",
115 "datetime64[ns]": "DATETIME",
116 "float32": "FLOAT",
117 "float64": "FLOAT",
118 "int8": "INTEGER",
119 "int16": "INTEGER",
120 "int32": "INTEGER",
121 "int64": "INTEGER",
122 "uint8": "INTEGER",
123 "uint16": "INTEGER",
124 "uint32": "INTEGER",
125 "geometry": "GEOGRAPHY",
126 date_dtype_name: "DATE",
127 time_dtype_name: "TIME",
128}
129
130
131class _DownloadState(object):
132 """Flag to indicate that a thread should exit early."""
133
134 def __init__(self):
135 # No need for a lock because reading/replacing a variable is defined to
136 # be an atomic operation in the Python language definition (enforced by
137 # the global interpreter lock).
138 self.done = False
139 # To assist with testing and understanding the behavior of the
140 # download, use this object as shared state to track how many worker
141 # threads have started and have gracefully shutdown.
142 self._started_workers_lock = threading.Lock()
143 self.started_workers = 0
144 self._finished_workers_lock = threading.Lock()
145 self.finished_workers = 0
146
147 def start(self):
148 with self._started_workers_lock:
149 self.started_workers += 1
150
151 def finish(self):
152 with self._finished_workers_lock:
153 self.finished_workers += 1
154
155
156BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = {
157 "GEOGRAPHY": {
158 b"ARROW:extension:name": b"google:sqlType:geography",
159 b"ARROW:extension:metadata": b'{"encoding": "WKT"}',
160 },
161 "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"},
162 "JSON": {b"ARROW:extension:name": b"google:sqlType:json"},
163}
164
165
166def bq_to_arrow_struct_data_type(field):
167 arrow_fields = []
168 for subfield in field.fields:
169 arrow_subfield = bq_to_arrow_field(subfield)
170 if arrow_subfield:
171 arrow_fields.append(arrow_subfield)
172 else:
173 # Could not determine a subfield type. Fallback to type
174 # inference.
175 return None
176 return pyarrow.struct(arrow_fields)
177
178
179def bq_to_arrow_range_data_type(field):
180 if field is None:
181 raise ValueError(
182 "Range element type cannot be None, must be one of "
183 "DATE, DATETIME, or TIMESTAMP"
184 )
185 element_type = field.element_type.upper()
186 arrow_element_type = _pyarrow_helpers.bq_to_arrow_scalars(element_type)()
187 return pyarrow.struct([("start", arrow_element_type), ("end", arrow_element_type)])
188
189
190def bq_to_arrow_data_type(field):
191 """Return the Arrow data type, corresponding to a given BigQuery column.
192
193 Returns:
194 None: if default Arrow type inspection should be used.
195 """
196 if field.mode is not None and field.mode.upper() == "REPEATED":
197 inner_type = bq_to_arrow_data_type(
198 schema.SchemaField(field.name, field.field_type, fields=field.fields)
199 )
200 if inner_type:
201 return pyarrow.list_(inner_type)
202 return None
203
204 field_type_upper = field.field_type.upper() if field.field_type else ""
205 if field_type_upper in schema._STRUCT_TYPES:
206 return bq_to_arrow_struct_data_type(field)
207
208 if field_type_upper == "RANGE":
209 return bq_to_arrow_range_data_type(field.range_element_type)
210
211 data_type_constructor = _pyarrow_helpers.bq_to_arrow_scalars(field_type_upper)
212 if data_type_constructor is None:
213 return None
214 return data_type_constructor()
215
216
217def bq_to_arrow_field(bq_field, array_type=None):
218 """Return the Arrow field, corresponding to a given BigQuery column.
219
220 Returns:
221 None: if the Arrow type cannot be determined.
222 """
223 arrow_type = bq_to_arrow_data_type(bq_field)
224 if arrow_type is not None:
225 if array_type is not None:
226 arrow_type = array_type # For GEOGRAPHY, at least initially
227 metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get(
228 bq_field.field_type.upper() if bq_field.field_type else ""
229 )
230 return pyarrow.field(
231 bq_field.name,
232 arrow_type,
233 # Even if the remote schema is REQUIRED, there's a chance there's
234 # local NULL values. Arrow will gladly interpret these NULL values
235 # as non-NULL and give you an arbitrary value. See:
236 # https://github.com/googleapis/python-bigquery/issues/1692
237 nullable=False if bq_field.mode.upper() == "REPEATED" else True,
238 metadata=metadata,
239 )
240
241 warnings.warn(
242 "Unable to determine Arrow type for field '{}'.".format(bq_field.name)
243 )
244 return None
245
246
247def bq_to_arrow_schema(bq_schema):
248 """Return the Arrow schema, corresponding to a given BigQuery schema.
249
250 Returns:
251 None: if any Arrow type cannot be determined.
252 """
253 arrow_fields = []
254 for bq_field in bq_schema:
255 arrow_field = bq_to_arrow_field(bq_field)
256 if arrow_field is None:
257 # Auto-detect the schema if there is an unknown field type.
258 return None
259 arrow_fields.append(arrow_field)
260 return pyarrow.schema(arrow_fields)
261
262
263def default_types_mapper(
264 date_as_object: bool = False,
265 bool_dtype: Union[Any, None] = None,
266 int_dtype: Union[Any, None] = None,
267 float_dtype: Union[Any, None] = None,
268 string_dtype: Union[Any, None] = None,
269 date_dtype: Union[Any, None] = None,
270 datetime_dtype: Union[Any, None] = None,
271 time_dtype: Union[Any, None] = None,
272 timestamp_dtype: Union[Any, None] = None,
273 range_date_dtype: Union[Any, None] = None,
274 range_datetime_dtype: Union[Any, None] = None,
275 range_timestamp_dtype: Union[Any, None] = None,
276):
277 """Create a mapping from pyarrow types to pandas types.
278
279 This overrides the pandas defaults to use null-safe extension types where
280 available.
281
282 See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of
283 data types. See:
284 tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for
285 BigQuery to Arrow type mapping.
286
287 Note to google-cloud-bigquery developers: If you update the default dtypes,
288 also update the docs at docs/usage/pandas.rst.
289 """
290
291 def types_mapper(arrow_data_type):
292 if bool_dtype is not None and pyarrow.types.is_boolean(arrow_data_type):
293 return bool_dtype
294
295 elif int_dtype is not None and pyarrow.types.is_integer(arrow_data_type):
296 return int_dtype
297
298 elif float_dtype is not None and pyarrow.types.is_floating(arrow_data_type):
299 return float_dtype
300
301 elif string_dtype is not None and pyarrow.types.is_string(arrow_data_type):
302 return string_dtype
303
304 elif (
305 # If date_as_object is True, we know some DATE columns are
306 # out-of-bounds of what is supported by pandas.
307 date_dtype is not None
308 and not date_as_object
309 and pyarrow.types.is_date(arrow_data_type)
310 ):
311 return date_dtype
312
313 elif (
314 datetime_dtype is not None
315 and pyarrow.types.is_timestamp(arrow_data_type)
316 and arrow_data_type.tz is None
317 ):
318 return datetime_dtype
319
320 elif (
321 timestamp_dtype is not None
322 and pyarrow.types.is_timestamp(arrow_data_type)
323 and arrow_data_type.tz is not None
324 ):
325 return timestamp_dtype
326
327 elif time_dtype is not None and pyarrow.types.is_time(arrow_data_type):
328 return time_dtype
329
330 elif pyarrow.types.is_struct(arrow_data_type):
331 if range_datetime_dtype is not None and arrow_data_type.equals(
332 range_datetime_dtype.pyarrow_dtype
333 ):
334 return range_datetime_dtype
335
336 elif range_date_dtype is not None and arrow_data_type.equals(
337 range_date_dtype.pyarrow_dtype
338 ):
339 return range_date_dtype
340
341 elif range_timestamp_dtype is not None and arrow_data_type.equals(
342 range_timestamp_dtype.pyarrow_dtype
343 ):
344 return range_timestamp_dtype
345
346 return types_mapper
347
348
349def bq_to_arrow_array(series, bq_field):
350 if bq_field.field_type.upper() == "GEOGRAPHY":
351 arrow_type = None
352 first = _first_valid(series)
353 if first is not None:
354 if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry):
355 arrow_type = pyarrow.binary()
356 # Convert shapey geometry to WKB binary format:
357 series = series.apply(_to_wkb)
358 elif isinstance(first, bytes):
359 arrow_type = pyarrow.binary()
360 elif series.dtype.name == "geometry":
361 # We have a GeoSeries containing all nulls, convert it to a pandas series
362 series = pandas.Series(numpy.array(series))
363
364 if arrow_type is None:
365 arrow_type = bq_to_arrow_data_type(bq_field)
366 else:
367 arrow_type = bq_to_arrow_data_type(bq_field)
368
369 field_type_upper = bq_field.field_type.upper() if bq_field.field_type else ""
370
371 try:
372 if bq_field.mode.upper() == "REPEATED":
373 return pyarrow.ListArray.from_pandas(series, type=arrow_type)
374 if field_type_upper in schema._STRUCT_TYPES:
375 return pyarrow.StructArray.from_pandas(series, type=arrow_type)
376 return pyarrow.Array.from_pandas(series, type=arrow_type)
377 except pyarrow.ArrowTypeError:
378 msg = f"""Error converting Pandas column with name: "{series.name}" and datatype: "{series.dtype}" to an appropriate pyarrow datatype: Array, ListArray, or StructArray"""
379 _LOGGER.error(msg)
380 raise pyarrow.ArrowTypeError(msg)
381
382
383def get_column_or_index(dataframe, name):
384 """Return a column or index as a pandas series."""
385 if name in dataframe.columns:
386 return dataframe[name].reset_index(drop=True)
387
388 if isinstance(dataframe.index, pandas.MultiIndex):
389 if name in dataframe.index.names:
390 return (
391 dataframe.index.get_level_values(name)
392 .to_series()
393 .reset_index(drop=True)
394 )
395 else:
396 if name == dataframe.index.name:
397 return dataframe.index.to_series().reset_index(drop=True)
398
399 raise ValueError("column or index '{}' not found.".format(name))
400
401
402def list_columns_and_indexes(dataframe):
403 """Return all index and column names with dtypes.
404
405 Returns:
406 Sequence[Tuple[str, dtype]]:
407 Returns a sorted list of indexes and column names with
408 corresponding dtypes. If an index is missing a name or has the
409 same name as a column, the index is omitted.
410 """
411 column_names = frozenset(dataframe.columns)
412 columns_and_indexes = []
413 if isinstance(dataframe.index, pandas.MultiIndex):
414 for name in dataframe.index.names:
415 if name and name not in column_names:
416 values = dataframe.index.get_level_values(name)
417 columns_and_indexes.append((name, values.dtype))
418 else:
419 if dataframe.index.name and dataframe.index.name not in column_names:
420 columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype))
421
422 columns_and_indexes += zip(dataframe.columns, dataframe.dtypes)
423 return columns_and_indexes
424
425
426def _first_valid(series):
427 first_valid_index = series.first_valid_index()
428 if first_valid_index is not None:
429 return series.at[first_valid_index]
430
431
432def _first_array_valid(series):
433 """Return the first "meaningful" element from the array series.
434
435 Here, "meaningful" means the first non-None element in one of the arrays that can
436 be used for type detextion.
437 """
438 first_valid_index = series.first_valid_index()
439 if first_valid_index is None:
440 return None
441
442 valid_array = series.at[first_valid_index]
443 valid_item = next((item for item in valid_array if not pandas.isna(item)), None)
444
445 if valid_item is not None:
446 return valid_item
447
448 # Valid item is None because all items in the "valid" array are invalid. Try
449 # to find a true valid array manually.
450 for array in islice(series, first_valid_index + 1, None):
451 try:
452 array_iter = iter(array)
453 except TypeError:
454 continue # Not an array, apparently, e.g. None, thus skip.
455 valid_item = next((item for item in array_iter if not pandas.isna(item)), None)
456 if valid_item is not None:
457 break
458
459 return valid_item
460
461
462def dataframe_to_bq_schema(dataframe, bq_schema):
463 """Convert a pandas DataFrame schema to a BigQuery schema.
464
465 DEPRECATED: Use
466 pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(),
467 instead. See: go/pandas-gbq-and-bigframes-redundancy.
468
469 Args:
470 dataframe (pandas.DataFrame):
471 DataFrame for which the client determines the BigQuery schema.
472 bq_schema (Sequence[Union[ \
473 :class:`~google.cloud.bigquery.schema.SchemaField`, \
474 Mapping[str, Any] \
475 ]]):
476 A BigQuery schema. Use this argument to override the autodetected
477 type for some or all of the DataFrame columns.
478
479 Returns:
480 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
481 The automatically determined schema. Returns None if the type of
482 any column cannot be determined.
483 """
484 if pandas_gbq is None:
485 warnings.warn(
486 "Loading pandas DataFrame into BigQuery will require pandas-gbq "
487 "package version 0.26.1 or greater in the future. "
488 f"Tried to import pandas-gbq and got: {pandas_gbq_import_exception}",
489 category=FutureWarning,
490 )
491 else:
492 return pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(
493 dataframe,
494 override_bigquery_fields=bq_schema,
495 index=True,
496 )
497
498 if bq_schema:
499 bq_schema = schema._to_schema_fields(bq_schema)
500 bq_schema_index = {field.name: field for field in bq_schema}
501 bq_schema_unused = set(bq_schema_index.keys())
502 else:
503 bq_schema_index = {}
504 bq_schema_unused = set()
505
506 bq_schema_out = []
507 unknown_type_columns = []
508 dataframe_reset_index = dataframe.reset_index()
509 for column, dtype in list_columns_and_indexes(dataframe):
510 # Step 1: use provided type from schema, if present.
511 bq_field = bq_schema_index.get(column)
512 if bq_field:
513 bq_schema_out.append(bq_field)
514 bq_schema_unused.discard(bq_field.name)
515 continue
516
517 # Step 2: try to automatically determine the type based on the
518 # pandas dtype.
519 bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
520 if bq_type is None:
521 sample_data = _first_valid(dataframe_reset_index[column])
522 if (
523 isinstance(sample_data, _BaseGeometry)
524 and sample_data is not None # Paranoia
525 ):
526 bq_type = "GEOGRAPHY"
527 if bq_type is not None:
528 bq_schema_out.append(schema.SchemaField(column, bq_type))
529 continue
530
531 # Step 3: try with pyarrow if available
532 bq_field = _get_schema_by_pyarrow(column, dataframe_reset_index[column])
533 if bq_field is not None:
534 bq_schema_out.append(bq_field)
535 continue
536
537 unknown_type_columns.append(column)
538
539 # Catch any schema mismatch. The developer explicitly asked to serialize a
540 # column, but it was not found.
541 if bq_schema_unused:
542 raise ValueError(
543 "bq_schema contains fields not present in dataframe: {}".format(
544 bq_schema_unused
545 )
546 )
547
548 if unknown_type_columns != []:
549 msg = "Could not determine the type of columns: {}".format(
550 ", ".join(unknown_type_columns)
551 )
552 warnings.warn(msg)
553 return None # We cannot detect the schema in full.
554
555 return tuple(bq_schema_out)
556
557
558def _get_schema_by_pyarrow(name, series):
559 """Attempt to detect the type of the given series by leveraging PyArrow's
560 type detection capabilities.
561
562 This function requires the ``pyarrow`` library to be installed and
563 available. If the series type cannot be determined or ``pyarrow`` is not
564 available, ``None`` is returned.
565
566 Args:
567 name (str):
568 the column name of the SchemaField.
569 series (pandas.Series):
570 The Series data for which to detect the data type.
571 Returns:
572 Optional[google.cloud.bigquery.schema.SchemaField]:
573 A tuple containing the BigQuery-compatible type string (e.g.,
574 "STRING", "INTEGER", "TIMESTAMP", "DATETIME", "NUMERIC", "BIGNUMERIC")
575 and the mode string ("NULLABLE", "REPEATED").
576 Returns ``None`` if the type cannot be determined or ``pyarrow``
577 is not imported.
578 """
579
580 if not pyarrow:
581 return None
582
583 arrow_table = pyarrow.array(series)
584 if pyarrow.types.is_list(arrow_table.type):
585 # `pyarrow.ListType`
586 mode = "REPEATED"
587 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.values.type.id)
588
589 # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
590 # it to such datetimes, causing them to be recognized as TIMESTAMP type.
591 # We thus additionally check the actual data to see if we need to overrule
592 # that and choose DATETIME instead.
593 # Note that this should only be needed for datetime values inside a list,
594 # since scalar datetime values have a proper Pandas dtype that allows
595 # distinguishing between timezone-naive and timezone-aware values before
596 # even requiring the additional schema augment logic in this method.
597 if type == "TIMESTAMP":
598 valid_item = _first_array_valid(series)
599 if isinstance(valid_item, datetime) and valid_item.tzinfo is None:
600 type = "DATETIME"
601 else:
602 mode = "NULLABLE" # default mode
603 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.type.id)
604 if type == "NUMERIC" and arrow_table.type.scale > 9:
605 type = "BIGNUMERIC"
606
607 if type is not None:
608 return schema.SchemaField(name, type, mode)
609 else:
610 return None
611
612
613def dataframe_to_arrow(dataframe, bq_schema):
614 """Convert pandas dataframe to Arrow table, using BigQuery schema.
615
616 Args:
617 dataframe (pandas.DataFrame):
618 DataFrame to convert to Arrow table.
619 bq_schema (Sequence[Union[ \
620 :class:`~google.cloud.bigquery.schema.SchemaField`, \
621 Mapping[str, Any] \
622 ]]):
623 Desired BigQuery schema. The number of columns must match the
624 number of columns in the DataFrame.
625
626 Returns:
627 pyarrow.Table:
628 Table containing dataframe data, with schema derived from
629 BigQuery schema.
630 """
631 column_names = set(dataframe.columns)
632 column_and_index_names = set(
633 name for name, _ in list_columns_and_indexes(dataframe)
634 )
635
636 bq_schema = schema._to_schema_fields(bq_schema)
637 bq_field_names = set(field.name for field in bq_schema)
638
639 extra_fields = bq_field_names - column_and_index_names
640 if extra_fields:
641 raise ValueError(
642 "bq_schema contains fields not present in dataframe: {}".format(
643 extra_fields
644 )
645 )
646
647 # It's okay for indexes to be missing from bq_schema, but it's not okay to
648 # be missing columns.
649 missing_fields = column_names - bq_field_names
650 if missing_fields:
651 raise ValueError(
652 "bq_schema is missing fields from dataframe: {}".format(missing_fields)
653 )
654
655 arrow_arrays = []
656 arrow_names = []
657 arrow_fields = []
658 for bq_field in bq_schema:
659 arrow_names.append(bq_field.name)
660 arrow_arrays.append(
661 bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field)
662 )
663 arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type))
664
665 if all((field is not None for field in arrow_fields)):
666 return pyarrow.Table.from_arrays(
667 arrow_arrays, schema=pyarrow.schema(arrow_fields)
668 )
669 return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)
670
671
672def dataframe_to_parquet(
673 dataframe,
674 bq_schema,
675 filepath,
676 parquet_compression="SNAPPY",
677 parquet_use_compliant_nested_type=True,
678):
679 """Write dataframe as a Parquet file, according to the desired BQ schema.
680
681 This function requires the :mod:`pyarrow` package. Arrow is used as an
682 intermediate format.
683
684 Args:
685 dataframe (pandas.DataFrame):
686 DataFrame to convert to Parquet file.
687 bq_schema (Sequence[Union[ \
688 :class:`~google.cloud.bigquery.schema.SchemaField`, \
689 Mapping[str, Any] \
690 ]]):
691 Desired BigQuery schema. Number of columns must match number of
692 columns in the DataFrame.
693 filepath (str):
694 Path to write Parquet file to.
695 parquet_compression (Optional[str]):
696 The compression codec to use by the the ``pyarrow.parquet.write_table``
697 serializing method. Defaults to "SNAPPY".
698 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
699 parquet_use_compliant_nested_type (bool):
700 Whether the ``pyarrow.parquet.write_table`` serializing method should write
701 compliant Parquet nested type (lists). Defaults to ``True``.
702 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types
703 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
704
705 This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
706 """
707 pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
708
709 import pyarrow.parquet # type: ignore
710
711 kwargs = (
712 {"use_compliant_nested_type": parquet_use_compliant_nested_type}
713 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type
714 else {}
715 )
716
717 bq_schema = schema._to_schema_fields(bq_schema)
718 arrow_table = dataframe_to_arrow(dataframe, bq_schema)
719 pyarrow.parquet.write_table(
720 arrow_table,
721 filepath,
722 compression=parquet_compression,
723 **kwargs,
724 )
725
726
727def _row_iterator_page_to_arrow(page, column_names, arrow_types):
728 # Iterate over the page to force the API request to get the page data.
729 try:
730 next(iter(page))
731 except StopIteration:
732 pass
733
734 arrays = []
735 for column_index, arrow_type in enumerate(arrow_types):
736 arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type))
737
738 if isinstance(column_names, pyarrow.Schema):
739 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names)
740 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)
741
742
743def download_arrow_row_iterator(pages, bq_schema):
744 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
745
746 Args:
747 pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
748 An iterator over the result pages.
749 bq_schema (Sequence[Union[ \
750 :class:`~google.cloud.bigquery.schema.SchemaField`, \
751 Mapping[str, Any] \
752 ]]):
753 A decription of the fields in result pages.
754 Yields:
755 :class:`pyarrow.RecordBatch`
756 The next page of records as a ``pyarrow`` record batch.
757 """
758 bq_schema = schema._to_schema_fields(bq_schema)
759 column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
760 arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]
761
762 for page in pages:
763 yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
764
765
766def _row_iterator_page_to_dataframe(page, column_names, dtypes):
767 # Iterate over the page to force the API request to get the page data.
768 try:
769 next(iter(page))
770 except StopIteration:
771 pass
772
773 columns = {}
774 for column_index, column_name in enumerate(column_names):
775 dtype = dtypes.get(column_name)
776 columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype)
777
778 return pandas.DataFrame(columns, columns=column_names)
779
780
781def download_dataframe_row_iterator(pages, bq_schema, dtypes):
782 """Use HTTP JSON RowIterator to construct a DataFrame.
783
784 Args:
785 pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
786 An iterator over the result pages.
787 bq_schema (Sequence[Union[ \
788 :class:`~google.cloud.bigquery.schema.SchemaField`, \
789 Mapping[str, Any] \
790 ]]):
791 A decription of the fields in result pages.
792 dtypes(Mapping[str, numpy.dtype]):
793 The types of columns in result data to hint construction of the
794 resulting DataFrame. Not all column types have to be specified.
795 Yields:
796 :class:`pandas.DataFrame`
797 The next page of records as a ``pandas.DataFrame`` record batch.
798 """
799 bq_schema = schema._to_schema_fields(bq_schema)
800 column_names = [field.name for field in bq_schema]
801 for page in pages:
802 yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
803
804
805def _bqstorage_page_to_arrow(page):
806 return page.to_arrow()
807
808
809def _bqstorage_page_to_dataframe(column_names, dtypes, page):
810 # page.to_dataframe() does not preserve column order in some versions
811 # of google-cloud-bigquery-storage. Access by column name to rearrange.
812 return page.to_dataframe(dtypes=dtypes)[column_names]
813
814
815def _download_table_bqstorage_stream(
816 download_state, bqstorage_client, session, stream, worker_queue, page_to_item
817):
818 download_state.start()
819 try:
820 reader = bqstorage_client.read_rows(stream.name)
821
822 # Avoid deprecation warnings for passing in unnecessary read session.
823 # https://github.com/googleapis/python-bigquery-storage/issues/229
824 if _versions_helpers.BQ_STORAGE_VERSIONS.is_read_session_optional:
825 rowstream = reader.rows()
826 else:
827 rowstream = reader.rows(session)
828
829 for page in rowstream.pages:
830 item = page_to_item(page)
831
832 # Make sure we set a timeout on put() so that we give the worker
833 # thread opportunities to shutdown gracefully, for example if the
834 # parent thread shuts down or the parent generator object which
835 # collects rows from all workers goes out of scope. See:
836 # https://github.com/googleapis/python-bigquery/issues/2032
837 while True:
838 if download_state.done:
839 return
840 try:
841 worker_queue.put(item, timeout=_PROGRESS_INTERVAL)
842 break
843 except queue.Full:
844 continue
845 finally:
846 download_state.finish()
847
848
849def _nowait(futures):
850 """Separate finished and unfinished threads, much like
851 :func:`concurrent.futures.wait`, but don't wait.
852 """
853 done = []
854 not_done = []
855 for future in futures:
856 if future.done():
857 done.append(future)
858 else:
859 not_done.append(future)
860 return done, not_done
861
862
863def _download_table_bqstorage(
864 project_id: str,
865 table: Any,
866 bqstorage_client: Any,
867 preserve_order: bool = False,
868 selected_fields: Optional[List[Any]] = None,
869 page_to_item: Optional[Callable] = None,
870 max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT,
871 max_stream_count: Optional[int] = None,
872 download_state: Optional[_DownloadState] = None,
873 timeout: Optional[float] = None,
874) -> Generator[Any, None, None]:
875 """Downloads a BigQuery table using the BigQuery Storage API.
876
877 This method uses the faster, but potentially more expensive, BigQuery
878 Storage API to download a table as a Pandas DataFrame. It supports
879 parallel downloads and optional data transformations.
880
881 Args:
882 project_id (str): The ID of the Google Cloud project containing
883 the table.
884 table (Any): The BigQuery table to download.
885 bqstorage_client (Any): An
886 authenticated BigQuery Storage API client.
887 preserve_order (bool, optional): Whether to preserve the order
888 of the rows as they are read from BigQuery. If True this limits
889 the number of streams to one and overrides `max_stream_count`.
890 Defaults to False.
891 selected_fields (Optional[List[SchemaField]]):
892 A list of BigQuery schema fields to select for download. If None,
893 all fields are downloaded. Defaults to None.
894 page_to_item (Optional[Callable]): An optional callable
895 function that takes a page of data from the BigQuery Storage API
896 max_stream_count (Optional[int]): The maximum number of
897 concurrent streams to use for downloading data. If `preserve_order`
898 is True, the requested streams are limited to 1 regardless of the
899 `max_stream_count` value. If 0 or None, then the number of
900 requested streams will be unbounded. Defaults to None.
901 download_state (Optional[_DownloadState]):
902 A threadsafe state object which can be used to observe the
903 behavior of the worker threads created by this method.
904 timeout (Optional[float]):
905 The number of seconds to wait for the download to complete.
906 If None, wait indefinitely.
907
908 Yields:
909 pandas.DataFrame: Pandas DataFrames, one for each chunk of data
910 downloaded from BigQuery.
911
912 Raises:
913 ValueError: If attempting to read from a specific partition or snapshot.
914 concurrent.futures.TimeoutError:
915 If the download does not complete within the specified timeout.
916
917 Note:
918 This method requires the `google-cloud-bigquery-storage` library
919 to be installed.
920 """
921
922 from google.cloud import bigquery_storage
923
924 if "$" in table.table_id:
925 raise ValueError(
926 "Reading from a specific partition is not currently supported."
927 )
928 if "@" in table.table_id:
929 raise ValueError("Reading from a specific snapshot is not currently supported.")
930
931 requested_streams = determine_requested_streams(preserve_order, max_stream_count)
932
933 requested_session = bigquery_storage.types.stream.ReadSession(
934 table=table.to_bqstorage(),
935 data_format=bigquery_storage.types.stream.DataFormat.ARROW,
936 )
937 if selected_fields is not None:
938 for field in selected_fields:
939 requested_session.read_options.selected_fields.append(field.name)
940
941 if _ARROW_COMPRESSION_SUPPORT:
942 requested_session.read_options.arrow_serialization_options.buffer_compression = (
943 # CompressionCodec(1) -> LZ4_FRAME
944 ArrowSerializationOptions.CompressionCodec(1)
945 )
946
947 session = bqstorage_client.create_read_session(
948 parent="projects/{}".format(project_id),
949 read_session=requested_session,
950 max_stream_count=requested_streams,
951 )
952
953 _LOGGER.debug(
954 "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
955 table.project, table.dataset_id, table.table_id, session.name
956 )
957 )
958
959 # Avoid reading rows from an empty table.
960 if not session.streams:
961 return
962
963 total_streams = len(session.streams)
964
965 # Use _DownloadState to notify worker threads when to quit.
966 # See: https://stackoverflow.com/a/29237343/101923
967 if download_state is None:
968 download_state = _DownloadState()
969
970 # Create a queue to collect frames as they are created in each thread.
971 #
972 # The queue needs to be bounded by default, because if the user code processes the
973 # fetched result pages too slowly, while at the same time new pages are rapidly being
974 # fetched from the server, the queue can grow to the point where the process runs
975 # out of memory.
976 if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT:
977 max_queue_size = total_streams
978 elif max_queue_size is None:
979 max_queue_size = 0 # unbounded
980
981 worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size)
982
983 # Manually manage the pool to control shutdown behavior on timeout.
984 pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams))
985 wait_on_shutdown = True
986 start_time = time.time()
987
988 try:
989 # Manually submit jobs and wait for download to complete rather
990 # than using pool.map because pool.map continues running in the
991 # background even if there is an exception on the main thread.
992 # See: https://github.com/googleapis/google-cloud-python/pull/7698
993 not_done = [
994 pool.submit(
995 _download_table_bqstorage_stream,
996 download_state,
997 bqstorage_client,
998 session,
999 stream,
1000 worker_queue,
1001 page_to_item,
1002 )
1003 for stream in session.streams
1004 ]
1005
1006 while not_done:
1007 # Check for timeout
1008 if timeout is not None:
1009 elapsed = time.time() - start_time
1010 if elapsed > timeout:
1011 wait_on_shutdown = False
1012 raise concurrent.futures.TimeoutError(
1013 f"Download timed out after {timeout} seconds."
1014 )
1015
1016 # Don't block on the worker threads. For performance reasons,
1017 # we want to block on the queue's get method, instead. This
1018 # prevents the queue from filling up, because the main thread
1019 # has smaller gaps in time between calls to the queue's get
1020 # method. For a detailed explanation, see:
1021 # https://friendliness.dev/2019/06/18/python-nowait/
1022 done, not_done = _nowait(not_done)
1023 for future in done:
1024 # Call result() on any finished threads to raise any
1025 # exceptions encountered.
1026 future.result()
1027
1028 try:
1029 frame = worker_queue.get(timeout=_PROGRESS_INTERVAL)
1030 yield frame
1031 except queue.Empty: # pragma: NO COVER
1032 continue
1033
1034 # Return any remaining values after the workers finished.
1035 while True: # pragma: NO COVER
1036 try:
1037 frame = worker_queue.get_nowait()
1038 yield frame
1039 except queue.Empty: # pragma: NO COVER
1040 break
1041 finally:
1042 # No need for a lock because reading/replacing a variable is
1043 # defined to be an atomic operation in the Python language
1044 # definition (enforced by the global interpreter lock).
1045 download_state.done = True
1046
1047 # Shutdown all background threads, now that they should know to
1048 # exit early.
1049 pool.shutdown(wait=wait_on_shutdown)
1050
1051
1052def download_arrow_bqstorage(
1053 project_id,
1054 table,
1055 bqstorage_client,
1056 preserve_order=False,
1057 selected_fields=None,
1058 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
1059 max_stream_count=None,
1060 timeout=None,
1061):
1062 return _download_table_bqstorage(
1063 project_id,
1064 table,
1065 bqstorage_client,
1066 preserve_order=preserve_order,
1067 selected_fields=selected_fields,
1068 page_to_item=_bqstorage_page_to_arrow,
1069 max_queue_size=max_queue_size,
1070 max_stream_count=max_stream_count,
1071 timeout=timeout,
1072 )
1073
1074
1075def download_dataframe_bqstorage(
1076 project_id,
1077 table,
1078 bqstorage_client,
1079 column_names,
1080 dtypes,
1081 preserve_order=False,
1082 selected_fields=None,
1083 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
1084 max_stream_count=None,
1085 timeout=None,
1086):
1087 page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
1088 return _download_table_bqstorage(
1089 project_id,
1090 table,
1091 bqstorage_client,
1092 preserve_order=preserve_order,
1093 selected_fields=selected_fields,
1094 page_to_item=page_to_item,
1095 max_queue_size=max_queue_size,
1096 max_stream_count=max_stream_count,
1097 timeout=timeout,
1098 )
1099
1100
1101def dataframe_to_json_generator(dataframe):
1102 for row in dataframe.itertuples(index=False, name=None):
1103 output = {}
1104 for column, value in zip(dataframe.columns, row):
1105 # Omit NaN values.
1106 is_nan = pandas.isna(value)
1107
1108 # isna() can also return an array-like of bools, but the latter's boolean
1109 # value is ambiguous, hence an extra check. An array-like value is *not*
1110 # considered a NaN, however.
1111 if isinstance(is_nan, bool) and is_nan:
1112 continue
1113
1114 # Convert numpy types to corresponding Python types.
1115 # https://stackoverflow.com/a/60441783/101923
1116 if isinstance(value, numpy.bool_):
1117 value = bool(value)
1118 elif isinstance(
1119 value,
1120 (
1121 numpy.int64,
1122 numpy.int32,
1123 numpy.int16,
1124 numpy.int8,
1125 numpy.uint64,
1126 numpy.uint32,
1127 numpy.uint16,
1128 numpy.uint8,
1129 ),
1130 ):
1131 value = int(value)
1132 output[column] = value
1133
1134 yield output
1135
1136
1137def verify_pandas_imports():
1138 if pandas is None:
1139 raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception
1140 if db_dtypes is None:
1141 raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception
1142
1143
1144def determine_requested_streams(
1145 preserve_order: bool,
1146 max_stream_count: Union[int, None],
1147) -> int:
1148 """Determines the value of requested_streams based on the values of
1149 `preserve_order` and `max_stream_count`.
1150
1151 Args:
1152 preserve_order (bool): Whether to preserve the order of streams. If True,
1153 this limits the number of streams to one. `preserve_order` takes
1154 precedence over `max_stream_count`.
1155 max_stream_count (Union[int, None]]): The maximum number of streams
1156 allowed. Must be a non-negative number or None, where None indicates
1157 the value is unset. NOTE: if `preserve_order` is also set, it takes
1158 precedence over `max_stream_count`, thus to ensure that `max_stream_count`
1159 is used, ensure that `preserve_order` is None.
1160
1161 Returns:
1162 (int) The appropriate value for requested_streams.
1163 """
1164
1165 if preserve_order:
1166 # If preserve order is set, it takes precedence.
1167 # Limit the requested streams to 1, to ensure that order
1168 # is preserved)
1169 return 1
1170
1171 elif max_stream_count is not None:
1172 # If preserve_order is not set, only then do we consider max_stream_count
1173 if max_stream_count <= -1:
1174 raise ValueError("max_stream_count must be non-negative OR None")
1175 return max_stream_count
1176
1177 # Default to zero requested streams (unbounded).
1178 return 0