Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/_pandas_helpers.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

443 statements  

1# Copyright 2019 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Shared helper functions for connecting BigQuery and pandas. 

16 

17NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package, 

18instead. See: go/pandas-gbq-and-bigframes-redundancy and 

19https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pandas_to_bigquery.py 

20""" 

21 

22import concurrent.futures 

23from datetime import datetime 

24import functools 

25from itertools import islice 

26import logging 

27import queue 

28import threading 

29import time 

30import warnings 

31from typing import Any, Union, Optional, Callable, Generator, List 

32 

33 

34from google.cloud.bigquery import _pyarrow_helpers 

35from google.cloud.bigquery import _versions_helpers 

36from google.cloud.bigquery import retry as bq_retry 

37from google.cloud.bigquery import schema 

38 

39 

40try: 

41 import pandas # type: ignore 

42 

43 pandas_import_exception = None 

44except ImportError as exc: 

45 pandas = None 

46 pandas_import_exception = exc 

47else: 

48 import numpy 

49 

50 

51try: 

52 import pandas_gbq.schema.pandas_to_bigquery # type: ignore 

53 

54 pandas_gbq_import_exception = None 

55except ImportError as exc: 

56 pandas_gbq = None 

57 pandas_gbq_import_exception = exc 

58 

59 

60try: 

61 import db_dtypes # type: ignore 

62 

63 date_dtype_name = db_dtypes.DateDtype.name 

64 time_dtype_name = db_dtypes.TimeDtype.name 

65 db_dtypes_import_exception = None 

66except ImportError as exc: 

67 db_dtypes = None 

68 db_dtypes_import_exception = exc 

69 date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype 

70 

71pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import() 

72 

73try: 

74 # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array` 

75 from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore 

76except ImportError: 

77 # No shapely, use NoneType for _BaseGeometry as a placeholder. 

78 _BaseGeometry = type(None) 

79else: 

80 # We don't have any unit test sessions that install shapely but not pandas. 

81 if pandas is not None: # pragma: NO COVER 

82 

83 def _to_wkb(): 

84 from shapely import wkb # type: ignore 

85 

86 write = wkb.dumps 

87 notnull = pandas.notnull 

88 

89 def _to_wkb(v): 

90 return write(v) if notnull(v) else v 

91 

92 return _to_wkb 

93 

94 _to_wkb = _to_wkb() 

95 

96try: 

97 from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions 

98except ImportError: 

99 _ARROW_COMPRESSION_SUPPORT = False 

100else: 

101 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. 

102 _ARROW_COMPRESSION_SUPPORT = True 

103 

104_LOGGER = logging.getLogger(__name__) 

105 

106_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds. 

107 

108_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads 

109 

110_NO_PANDAS_ERROR = "Please install the 'pandas' package to use this function." 

111_NO_DB_TYPES_ERROR = "Please install the 'db-dtypes' package to use this function." 

112 

113_PANDAS_DTYPE_TO_BQ = { 

114 "bool": "BOOLEAN", 

115 "datetime64[ns, UTC]": "TIMESTAMP", 

116 "datetime64[ns]": "DATETIME", 

117 "float32": "FLOAT", 

118 "float64": "FLOAT", 

119 "int8": "INTEGER", 

120 "int16": "INTEGER", 

121 "int32": "INTEGER", 

122 "int64": "INTEGER", 

123 "uint8": "INTEGER", 

124 "uint16": "INTEGER", 

125 "uint32": "INTEGER", 

126 "geometry": "GEOGRAPHY", 

127 date_dtype_name: "DATE", 

128 time_dtype_name: "TIME", 

129} 

130 

131 

132class _DownloadState(object): 

133 """Flag to indicate that a thread should exit early.""" 

134 

135 def __init__(self): 

136 # No need for a lock because reading/replacing a variable is defined to 

137 # be an atomic operation in the Python language definition (enforced by 

138 # the global interpreter lock). 

139 self.done = False 

140 # To assist with testing and understanding the behavior of the 

141 # download, use this object as shared state to track how many worker 

142 # threads have started and have gracefully shutdown. 

143 self._started_workers_lock = threading.Lock() 

144 self.started_workers = 0 

145 self._finished_workers_lock = threading.Lock() 

146 self.finished_workers = 0 

147 

148 def start(self): 

149 with self._started_workers_lock: 

150 self.started_workers += 1 

151 

152 def finish(self): 

153 with self._finished_workers_lock: 

154 self.finished_workers += 1 

155 

156 

157BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = { 

158 "GEOGRAPHY": { 

159 b"ARROW:extension:name": b"google:sqlType:geography", 

160 b"ARROW:extension:metadata": b'{"encoding": "WKT"}', 

161 }, 

162 "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"}, 

163 "JSON": {b"ARROW:extension:name": b"google:sqlType:json"}, 

164} 

165 

166 

167def bq_to_arrow_struct_data_type(field): 

168 arrow_fields = [] 

169 for subfield in field.fields: 

170 arrow_subfield = bq_to_arrow_field(subfield) 

171 if arrow_subfield: 

172 arrow_fields.append(arrow_subfield) 

173 else: 

174 # Could not determine a subfield type. Fallback to type 

175 # inference. 

176 return None 

177 return pyarrow.struct(arrow_fields) 

178 

179 

180def bq_to_arrow_range_data_type(field): 

181 if field is None: 

182 raise ValueError( 

183 "Range element type cannot be None, must be one of " 

184 "DATE, DATETIME, or TIMESTAMP" 

185 ) 

186 element_type = field.element_type.upper() 

187 arrow_element_type = _pyarrow_helpers.bq_to_arrow_scalars(element_type)() 

188 return pyarrow.struct([("start", arrow_element_type), ("end", arrow_element_type)]) 

189 

190 

191def bq_to_arrow_data_type(field): 

192 """Return the Arrow data type, corresponding to a given BigQuery column. 

193 

194 Returns: 

195 None: if default Arrow type inspection should be used. 

196 """ 

197 if field.mode is not None and field.mode.upper() == "REPEATED": 

198 inner_type = bq_to_arrow_data_type( 

199 schema.SchemaField(field.name, field.field_type, fields=field.fields) 

200 ) 

201 if inner_type: 

202 return pyarrow.list_(inner_type) 

203 return None 

204 

205 field_type_upper = field.field_type.upper() if field.field_type else "" 

206 if field_type_upper in schema._STRUCT_TYPES: 

207 return bq_to_arrow_struct_data_type(field) 

208 

209 if field_type_upper == "RANGE": 

210 return bq_to_arrow_range_data_type(field.range_element_type) 

211 

212 data_type_constructor = _pyarrow_helpers.bq_to_arrow_scalars(field_type_upper) 

213 if data_type_constructor is None: 

214 return None 

215 return data_type_constructor() 

216 

217 

218def bq_to_arrow_field(bq_field, array_type=None): 

219 """Return the Arrow field, corresponding to a given BigQuery column. 

220 

221 Returns: 

222 None: if the Arrow type cannot be determined. 

223 """ 

224 arrow_type = bq_to_arrow_data_type(bq_field) 

225 if arrow_type is not None: 

226 if array_type is not None: 

227 arrow_type = array_type # For GEOGRAPHY, at least initially 

228 metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get( 

229 bq_field.field_type.upper() if bq_field.field_type else "" 

230 ) 

231 return pyarrow.field( 

232 bq_field.name, 

233 arrow_type, 

234 # Even if the remote schema is REQUIRED, there's a chance there's 

235 # local NULL values. Arrow will gladly interpret these NULL values 

236 # as non-NULL and give you an arbitrary value. See: 

237 # https://github.com/googleapis/python-bigquery/issues/1692 

238 nullable=False if bq_field.mode.upper() == "REPEATED" else True, 

239 metadata=metadata, 

240 ) 

241 

242 warnings.warn( 

243 "Unable to determine Arrow type for field '{}'.".format(bq_field.name) 

244 ) 

245 return None 

246 

247 

248def bq_to_arrow_schema(bq_schema): 

249 """Return the Arrow schema, corresponding to a given BigQuery schema. 

250 

251 Returns: 

252 None: if any Arrow type cannot be determined. 

253 """ 

254 arrow_fields = [] 

255 for bq_field in bq_schema: 

256 arrow_field = bq_to_arrow_field(bq_field) 

257 if arrow_field is None: 

258 # Auto-detect the schema if there is an unknown field type. 

259 return None 

260 arrow_fields.append(arrow_field) 

261 return pyarrow.schema(arrow_fields) 

262 

263 

264def default_types_mapper( 

265 date_as_object: bool = False, 

266 bool_dtype: Union[Any, None] = None, 

267 int_dtype: Union[Any, None] = None, 

268 float_dtype: Union[Any, None] = None, 

269 string_dtype: Union[Any, None] = None, 

270 date_dtype: Union[Any, None] = None, 

271 datetime_dtype: Union[Any, None] = None, 

272 time_dtype: Union[Any, None] = None, 

273 timestamp_dtype: Union[Any, None] = None, 

274 range_date_dtype: Union[Any, None] = None, 

275 range_datetime_dtype: Union[Any, None] = None, 

276 range_timestamp_dtype: Union[Any, None] = None, 

277): 

278 """Create a mapping from pyarrow types to pandas types. 

279 

280 This overrides the pandas defaults to use null-safe extension types where 

281 available. 

282 

283 See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of 

284 data types. See: 

285 tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for 

286 BigQuery to Arrow type mapping. 

287 

288 Note to google-cloud-bigquery developers: If you update the default dtypes, 

289 also update the docs at docs/usage/pandas.rst. 

290 """ 

291 

292 def types_mapper(arrow_data_type): 

293 if bool_dtype is not None and pyarrow.types.is_boolean(arrow_data_type): 

294 return bool_dtype 

295 

296 elif int_dtype is not None and pyarrow.types.is_integer(arrow_data_type): 

297 return int_dtype 

298 

299 elif float_dtype is not None and pyarrow.types.is_floating(arrow_data_type): 

300 return float_dtype 

301 

302 elif string_dtype is not None and pyarrow.types.is_string(arrow_data_type): 

303 return string_dtype 

304 

305 elif ( 

306 # If date_as_object is True, we know some DATE columns are 

307 # out-of-bounds of what is supported by pandas. 

308 date_dtype is not None 

309 and not date_as_object 

310 and pyarrow.types.is_date(arrow_data_type) 

311 ): 

312 return date_dtype 

313 

314 elif ( 

315 datetime_dtype is not None 

316 and pyarrow.types.is_timestamp(arrow_data_type) 

317 and arrow_data_type.tz is None 

318 ): 

319 return datetime_dtype 

320 

321 elif ( 

322 timestamp_dtype is not None 

323 and pyarrow.types.is_timestamp(arrow_data_type) 

324 and arrow_data_type.tz is not None 

325 ): 

326 return timestamp_dtype 

327 

328 elif time_dtype is not None and pyarrow.types.is_time(arrow_data_type): 

329 return time_dtype 

330 

331 elif pyarrow.types.is_struct(arrow_data_type): 

332 if range_datetime_dtype is not None and arrow_data_type.equals( 

333 range_datetime_dtype.pyarrow_dtype 

334 ): 

335 return range_datetime_dtype 

336 

337 elif range_date_dtype is not None and arrow_data_type.equals( 

338 range_date_dtype.pyarrow_dtype 

339 ): 

340 return range_date_dtype 

341 

342 elif range_timestamp_dtype is not None and arrow_data_type.equals( 

343 range_timestamp_dtype.pyarrow_dtype 

344 ): 

345 return range_timestamp_dtype 

346 

347 return types_mapper 

348 

349 

350def bq_to_arrow_array(series, bq_field): 

351 if bq_field.field_type.upper() == "GEOGRAPHY": 

352 arrow_type = None 

353 first = _first_valid(series) 

354 if first is not None: 

355 if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry): 

356 arrow_type = pyarrow.binary() 

357 # Convert shapey geometry to WKB binary format: 

358 series = series.apply(_to_wkb) 

359 elif isinstance(first, bytes): 

360 arrow_type = pyarrow.binary() 

361 elif series.dtype.name == "geometry": 

362 # We have a GeoSeries containing all nulls, convert it to a pandas series 

363 series = pandas.Series(numpy.array(series)) 

364 

365 if arrow_type is None: 

366 arrow_type = bq_to_arrow_data_type(bq_field) 

367 else: 

368 arrow_type = bq_to_arrow_data_type(bq_field) 

369 

370 field_type_upper = bq_field.field_type.upper() if bq_field.field_type else "" 

371 

372 try: 

373 if bq_field.mode.upper() == "REPEATED": 

374 return pyarrow.ListArray.from_pandas(series, type=arrow_type) 

375 if field_type_upper in schema._STRUCT_TYPES: 

376 return pyarrow.StructArray.from_pandas(series, type=arrow_type) 

377 return pyarrow.Array.from_pandas(series, type=arrow_type) 

378 except pyarrow.ArrowTypeError: 

379 msg = f"""Error converting Pandas column with name: "{series.name}" and datatype: "{series.dtype}" to an appropriate pyarrow datatype: Array, ListArray, or StructArray""" 

380 _LOGGER.error(msg) 

381 raise pyarrow.ArrowTypeError(msg) 

382 

383 

384def get_column_or_index(dataframe, name): 

385 """Return a column or index as a pandas series.""" 

386 if name in dataframe.columns: 

387 return dataframe[name].reset_index(drop=True) 

388 

389 if isinstance(dataframe.index, pandas.MultiIndex): 

390 if name in dataframe.index.names: 

391 return ( 

392 dataframe.index.get_level_values(name) 

393 .to_series() 

394 .reset_index(drop=True) 

395 ) 

396 else: 

397 if name == dataframe.index.name: 

398 return dataframe.index.to_series().reset_index(drop=True) 

399 

400 raise ValueError("column or index '{}' not found.".format(name)) 

401 

402 

403def list_columns_and_indexes(dataframe): 

404 """Return all index and column names with dtypes. 

405 

406 Returns: 

407 Sequence[Tuple[str, dtype]]: 

408 Returns a sorted list of indexes and column names with 

409 corresponding dtypes. If an index is missing a name or has the 

410 same name as a column, the index is omitted. 

411 """ 

412 column_names = frozenset(dataframe.columns) 

413 columns_and_indexes = [] 

414 if isinstance(dataframe.index, pandas.MultiIndex): 

415 for name in dataframe.index.names: 

416 if name and name not in column_names: 

417 values = dataframe.index.get_level_values(name) 

418 columns_and_indexes.append((name, values.dtype)) 

419 else: 

420 if dataframe.index.name and dataframe.index.name not in column_names: 

421 columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) 

422 

423 columns_and_indexes += zip(dataframe.columns, dataframe.dtypes) 

424 return columns_and_indexes 

425 

426 

427def _first_valid(series): 

428 first_valid_index = series.first_valid_index() 

429 if first_valid_index is not None: 

430 return series.at[first_valid_index] 

431 

432 

433def _first_array_valid(series): 

434 """Return the first "meaningful" element from the array series. 

435 

436 Here, "meaningful" means the first non-None element in one of the arrays that can 

437 be used for type detextion. 

438 """ 

439 first_valid_index = series.first_valid_index() 

440 if first_valid_index is None: 

441 return None 

442 

443 valid_array = series.at[first_valid_index] 

444 valid_item = next((item for item in valid_array if not pandas.isna(item)), None) 

445 

446 if valid_item is not None: 

447 return valid_item 

448 

449 # Valid item is None because all items in the "valid" array are invalid. Try 

450 # to find a true valid array manually. 

451 for array in islice(series, first_valid_index + 1, None): 

452 try: 

453 array_iter = iter(array) 

454 except TypeError: 

455 continue # Not an array, apparently, e.g. None, thus skip. 

456 valid_item = next((item for item in array_iter if not pandas.isna(item)), None) 

457 if valid_item is not None: 

458 break 

459 

460 return valid_item 

461 

462 

463def dataframe_to_bq_schema(dataframe, bq_schema): 

464 """Convert a pandas DataFrame schema to a BigQuery schema. 

465 

466 DEPRECATED: Use 

467 pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(), 

468 instead. See: go/pandas-gbq-and-bigframes-redundancy. 

469 

470 Args: 

471 dataframe (pandas.DataFrame): 

472 DataFrame for which the client determines the BigQuery schema. 

473 bq_schema (Sequence[Union[ \ 

474 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

475 Mapping[str, Any] \ 

476 ]]): 

477 A BigQuery schema. Use this argument to override the autodetected 

478 type for some or all of the DataFrame columns. 

479 

480 Returns: 

481 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: 

482 The automatically determined schema. Returns None if the type of 

483 any column cannot be determined. 

484 """ 

485 if pandas_gbq is None: 

486 warnings.warn( 

487 "Loading pandas DataFrame into BigQuery will require pandas-gbq " 

488 "package version 0.26.1 or greater in the future. " 

489 f"Tried to import pandas-gbq and got: {pandas_gbq_import_exception}", 

490 category=FutureWarning, 

491 ) 

492 else: 

493 return pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( 

494 dataframe, 

495 override_bigquery_fields=bq_schema, 

496 index=True, 

497 ) 

498 

499 if bq_schema: 

500 bq_schema = schema._to_schema_fields(bq_schema) 

501 bq_schema_index = {field.name: field for field in bq_schema} 

502 bq_schema_unused = set(bq_schema_index.keys()) 

503 else: 

504 bq_schema_index = {} 

505 bq_schema_unused = set() 

506 

507 bq_schema_out = [] 

508 unknown_type_columns = [] 

509 dataframe_reset_index = dataframe.reset_index() 

510 for column, dtype in list_columns_and_indexes(dataframe): 

511 # Step 1: use provided type from schema, if present. 

512 bq_field = bq_schema_index.get(column) 

513 if bq_field: 

514 bq_schema_out.append(bq_field) 

515 bq_schema_unused.discard(bq_field.name) 

516 continue 

517 

518 # Step 2: try to automatically determine the type based on the 

519 # pandas dtype. 

520 bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) 

521 if bq_type is None: 

522 sample_data = _first_valid(dataframe_reset_index[column]) 

523 if ( 

524 isinstance(sample_data, _BaseGeometry) 

525 and sample_data is not None # Paranoia 

526 ): 

527 bq_type = "GEOGRAPHY" 

528 if bq_type is not None: 

529 bq_schema_out.append(schema.SchemaField(column, bq_type)) 

530 continue 

531 

532 # Step 3: try with pyarrow if available 

533 bq_field = _get_schema_by_pyarrow(column, dataframe_reset_index[column]) 

534 if bq_field is not None: 

535 bq_schema_out.append(bq_field) 

536 continue 

537 

538 unknown_type_columns.append(column) 

539 

540 # Catch any schema mismatch. The developer explicitly asked to serialize a 

541 # column, but it was not found. 

542 if bq_schema_unused: 

543 raise ValueError( 

544 "bq_schema contains fields not present in dataframe: {}".format( 

545 bq_schema_unused 

546 ) 

547 ) 

548 

549 if unknown_type_columns != []: 

550 msg = "Could not determine the type of columns: {}".format( 

551 ", ".join(unknown_type_columns) 

552 ) 

553 warnings.warn(msg) 

554 return None # We cannot detect the schema in full. 

555 

556 return tuple(bq_schema_out) 

557 

558 

559def _get_schema_by_pyarrow(name, series): 

560 """Attempt to detect the type of the given series by leveraging PyArrow's 

561 type detection capabilities. 

562 

563 This function requires the ``pyarrow`` library to be installed and 

564 available. If the series type cannot be determined or ``pyarrow`` is not 

565 available, ``None`` is returned. 

566 

567 Args: 

568 name (str): 

569 the column name of the SchemaField. 

570 series (pandas.Series): 

571 The Series data for which to detect the data type. 

572 Returns: 

573 Optional[google.cloud.bigquery.schema.SchemaField]: 

574 A tuple containing the BigQuery-compatible type string (e.g., 

575 "STRING", "INTEGER", "TIMESTAMP", "DATETIME", "NUMERIC", "BIGNUMERIC") 

576 and the mode string ("NULLABLE", "REPEATED"). 

577 Returns ``None`` if the type cannot be determined or ``pyarrow`` 

578 is not imported. 

579 """ 

580 

581 if not pyarrow: 

582 return None 

583 

584 arrow_table = pyarrow.array(series) 

585 if pyarrow.types.is_list(arrow_table.type): 

586 # `pyarrow.ListType` 

587 mode = "REPEATED" 

588 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.values.type.id) 

589 

590 # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds 

591 # it to such datetimes, causing them to be recognized as TIMESTAMP type. 

592 # We thus additionally check the actual data to see if we need to overrule 

593 # that and choose DATETIME instead. 

594 # Note that this should only be needed for datetime values inside a list, 

595 # since scalar datetime values have a proper Pandas dtype that allows 

596 # distinguishing between timezone-naive and timezone-aware values before 

597 # even requiring the additional schema augment logic in this method. 

598 if type == "TIMESTAMP": 

599 valid_item = _first_array_valid(series) 

600 if isinstance(valid_item, datetime) and valid_item.tzinfo is None: 

601 type = "DATETIME" 

602 else: 

603 mode = "NULLABLE" # default mode 

604 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.type.id) 

605 if type == "NUMERIC" and arrow_table.type.scale > 9: 

606 type = "BIGNUMERIC" 

607 

608 if type is not None: 

609 return schema.SchemaField(name, type, mode) 

610 else: 

611 return None 

612 

613 

614def dataframe_to_arrow(dataframe, bq_schema): 

615 """Convert pandas dataframe to Arrow table, using BigQuery schema. 

616 

617 Args: 

618 dataframe (pandas.DataFrame): 

619 DataFrame to convert to Arrow table. 

620 bq_schema (Sequence[Union[ \ 

621 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

622 Mapping[str, Any] \ 

623 ]]): 

624 Desired BigQuery schema. The number of columns must match the 

625 number of columns in the DataFrame. 

626 

627 Returns: 

628 pyarrow.Table: 

629 Table containing dataframe data, with schema derived from 

630 BigQuery schema. 

631 """ 

632 column_names = set(dataframe.columns) 

633 column_and_index_names = set( 

634 name for name, _ in list_columns_and_indexes(dataframe) 

635 ) 

636 

637 bq_schema = schema._to_schema_fields(bq_schema) 

638 bq_field_names = set(field.name for field in bq_schema) 

639 

640 extra_fields = bq_field_names - column_and_index_names 

641 if extra_fields: 

642 raise ValueError( 

643 "bq_schema contains fields not present in dataframe: {}".format( 

644 extra_fields 

645 ) 

646 ) 

647 

648 # It's okay for indexes to be missing from bq_schema, but it's not okay to 

649 # be missing columns. 

650 missing_fields = column_names - bq_field_names 

651 if missing_fields: 

652 raise ValueError( 

653 "bq_schema is missing fields from dataframe: {}".format(missing_fields) 

654 ) 

655 

656 arrow_arrays = [] 

657 arrow_names = [] 

658 arrow_fields = [] 

659 for bq_field in bq_schema: 

660 arrow_names.append(bq_field.name) 

661 arrow_arrays.append( 

662 bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field) 

663 ) 

664 arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type)) 

665 

666 if all((field is not None for field in arrow_fields)): 

667 return pyarrow.Table.from_arrays( 

668 arrow_arrays, schema=pyarrow.schema(arrow_fields) 

669 ) 

670 return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) 

671 

672 

673def dataframe_to_parquet( 

674 dataframe, 

675 bq_schema, 

676 filepath, 

677 parquet_compression="SNAPPY", 

678 parquet_use_compliant_nested_type=True, 

679): 

680 """Write dataframe as a Parquet file, according to the desired BQ schema. 

681 

682 This function requires the :mod:`pyarrow` package. Arrow is used as an 

683 intermediate format. 

684 

685 Args: 

686 dataframe (pandas.DataFrame): 

687 DataFrame to convert to Parquet file. 

688 bq_schema (Sequence[Union[ \ 

689 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

690 Mapping[str, Any] \ 

691 ]]): 

692 Desired BigQuery schema. Number of columns must match number of 

693 columns in the DataFrame. 

694 filepath (str): 

695 Path to write Parquet file to. 

696 parquet_compression (Optional[str]): 

697 The compression codec to use by the the ``pyarrow.parquet.write_table`` 

698 serializing method. Defaults to "SNAPPY". 

699 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table 

700 parquet_use_compliant_nested_type (bool): 

701 Whether the ``pyarrow.parquet.write_table`` serializing method should write 

702 compliant Parquet nested type (lists). Defaults to ``True``. 

703 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types 

704 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table 

705 

706 This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``. 

707 """ 

708 pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) 

709 

710 import pyarrow.parquet # type: ignore 

711 

712 kwargs = ( 

713 {"use_compliant_nested_type": parquet_use_compliant_nested_type} 

714 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type 

715 else {} 

716 ) 

717 

718 bq_schema = schema._to_schema_fields(bq_schema) 

719 arrow_table = dataframe_to_arrow(dataframe, bq_schema) 

720 pyarrow.parquet.write_table( 

721 arrow_table, 

722 filepath, 

723 compression=parquet_compression, 

724 **kwargs, 

725 ) 

726 

727 

728def _row_iterator_page_to_arrow(page, column_names, arrow_types): 

729 # Iterate over the page to force the API request to get the page data. 

730 try: 

731 next(iter(page)) 

732 except StopIteration: 

733 pass 

734 

735 arrays = [] 

736 for column_index, arrow_type in enumerate(arrow_types): 

737 arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type)) 

738 

739 if isinstance(column_names, pyarrow.Schema): 

740 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names) 

741 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) 

742 

743 

744def download_arrow_row_iterator(pages, bq_schema, timeout=None): 

745 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches. 

746 

747 Args: 

748 pages (Iterator[:class:`google.api_core.page_iterator.Page`]): 

749 An iterator over the result pages. 

750 bq_schema (Sequence[Union[ \ 

751 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

752 Mapping[str, Any] \ 

753 ]]): 

754 A decription of the fields in result pages. 

755 timeout (Optional[float]): 

756 The number of seconds to wait for the underlying download to complete. 

757 If ``None``, wait indefinitely. 

758 

759 Yields: 

760 :class:`pyarrow.RecordBatch` 

761 The next page of records as a ``pyarrow`` record batch. 

762 """ 

763 bq_schema = schema._to_schema_fields(bq_schema) 

764 column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] 

765 arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] 

766 

767 if timeout is None: 

768 for page in pages: 

769 yield _row_iterator_page_to_arrow(page, column_names, arrow_types) 

770 else: 

771 start_time = time.monotonic() 

772 for page in pages: 

773 if time.monotonic() - start_time > timeout: 

774 raise concurrent.futures.TimeoutError() 

775 

776 yield _row_iterator_page_to_arrow(page, column_names, arrow_types) 

777 

778 

779def _row_iterator_page_to_dataframe(page, column_names, dtypes): 

780 # Iterate over the page to force the API request to get the page data. 

781 try: 

782 next(iter(page)) 

783 except StopIteration: 

784 pass 

785 

786 columns = {} 

787 for column_index, column_name in enumerate(column_names): 

788 dtype = dtypes.get(column_name) 

789 columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype) 

790 

791 return pandas.DataFrame(columns, columns=column_names) 

792 

793 

794def download_dataframe_row_iterator(pages, bq_schema, dtypes, timeout=None): 

795 """Use HTTP JSON RowIterator to construct a DataFrame. 

796 

797 Args: 

798 pages (Iterator[:class:`google.api_core.page_iterator.Page`]): 

799 An iterator over the result pages. 

800 bq_schema (Sequence[Union[ \ 

801 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

802 Mapping[str, Any] \ 

803 ]]): 

804 A decription of the fields in result pages. 

805 dtypes(Mapping[str, numpy.dtype]): 

806 The types of columns in result data to hint construction of the 

807 resulting DataFrame. Not all column types have to be specified. 

808 timeout (Optional[float]): 

809 The number of seconds to wait for the underlying download to complete. 

810 If ``None``, wait indefinitely. 

811 

812 Yields: 

813 :class:`pandas.DataFrame` 

814 The next page of records as a ``pandas.DataFrame`` record batch. 

815 """ 

816 bq_schema = schema._to_schema_fields(bq_schema) 

817 column_names = [field.name for field in bq_schema] 

818 

819 if timeout is None: 

820 for page in pages: 

821 yield _row_iterator_page_to_dataframe(page, column_names, dtypes) 

822 else: 

823 start_time = time.monotonic() 

824 for page in pages: 

825 if time.monotonic() - start_time > timeout: 

826 raise concurrent.futures.TimeoutError() 

827 

828 yield _row_iterator_page_to_dataframe(page, column_names, dtypes) 

829 

830 

831def _bqstorage_page_to_arrow(page): 

832 return page.to_arrow() 

833 

834 

835def _bqstorage_page_to_dataframe(column_names, dtypes, page): 

836 # page.to_dataframe() does not preserve column order in some versions 

837 # of google-cloud-bigquery-storage. Access by column name to rearrange. 

838 return page.to_dataframe(dtypes=dtypes)[column_names] 

839 

840 

841def _download_table_bqstorage_stream( 

842 download_state, bqstorage_client, session, stream, worker_queue, page_to_item 

843): 

844 download_state.start() 

845 try: 

846 reader = bqstorage_client.read_rows(stream.name) 

847 

848 # Avoid deprecation warnings for passing in unnecessary read session. 

849 # https://github.com/googleapis/python-bigquery-storage/issues/229 

850 if _versions_helpers.BQ_STORAGE_VERSIONS.is_read_session_optional: 

851 rowstream = reader.rows() 

852 else: 

853 rowstream = reader.rows(session) 

854 

855 for page in rowstream.pages: 

856 item = page_to_item(page) 

857 

858 # Make sure we set a timeout on put() so that we give the worker 

859 # thread opportunities to shutdown gracefully, for example if the 

860 # parent thread shuts down or the parent generator object which 

861 # collects rows from all workers goes out of scope. See: 

862 # https://github.com/googleapis/python-bigquery/issues/2032 

863 while True: 

864 if download_state.done: 

865 return 

866 try: 

867 worker_queue.put(item, timeout=_PROGRESS_INTERVAL) 

868 break 

869 except queue.Full: 

870 continue 

871 finally: 

872 download_state.finish() 

873 

874 

875def _nowait(futures): 

876 """Separate finished and unfinished threads, much like 

877 :func:`concurrent.futures.wait`, but don't wait. 

878 """ 

879 done = [] 

880 not_done = [] 

881 for future in futures: 

882 if future.done(): 

883 done.append(future) 

884 else: 

885 not_done.append(future) 

886 return done, not_done 

887 

888 

889def _download_table_bqstorage( 

890 project_id: str, 

891 table: Any, 

892 bqstorage_client: Any, 

893 preserve_order: bool = False, 

894 selected_fields: Optional[List[Any]] = None, 

895 page_to_item: Optional[Callable] = None, 

896 max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, 

897 max_stream_count: Optional[int] = None, 

898 download_state: Optional[_DownloadState] = None, 

899 timeout: Optional[float] = None, 

900) -> Generator[Any, None, None]: 

901 """Downloads a BigQuery table using the BigQuery Storage API. 

902 

903 This method uses the faster, but potentially more expensive, BigQuery 

904 Storage API to download a table as a Pandas DataFrame. It supports 

905 parallel downloads and optional data transformations. 

906 

907 Args: 

908 project_id (str): The ID of the Google Cloud project containing 

909 the table. 

910 table (Any): The BigQuery table to download. 

911 bqstorage_client (Any): An 

912 authenticated BigQuery Storage API client. 

913 preserve_order (bool, optional): Whether to preserve the order 

914 of the rows as they are read from BigQuery. If True this limits 

915 the number of streams to one and overrides `max_stream_count`. 

916 Defaults to False. 

917 selected_fields (Optional[List[SchemaField]]): 

918 A list of BigQuery schema fields to select for download. If None, 

919 all fields are downloaded. Defaults to None. 

920 page_to_item (Optional[Callable]): An optional callable 

921 function that takes a page of data from the BigQuery Storage API 

922 max_stream_count (Optional[int]): The maximum number of 

923 concurrent streams to use for downloading data. If `preserve_order` 

924 is True, the requested streams are limited to 1 regardless of the 

925 `max_stream_count` value. If 0 or None, then the number of 

926 requested streams will be unbounded. Defaults to None. 

927 download_state (Optional[_DownloadState]): 

928 A threadsafe state object which can be used to observe the 

929 behavior of the worker threads created by this method. 

930 timeout (Optional[float]): 

931 The number of seconds to wait for the download to complete. 

932 If None, wait indefinitely. 

933 

934 Yields: 

935 pandas.DataFrame: Pandas DataFrames, one for each chunk of data 

936 downloaded from BigQuery. 

937 

938 Raises: 

939 ValueError: If attempting to read from a specific partition or snapshot. 

940 concurrent.futures.TimeoutError: 

941 If the download does not complete within the specified timeout. 

942 

943 Note: 

944 This method requires the `google-cloud-bigquery-storage` library 

945 to be installed. 

946 """ 

947 

948 from google.cloud import bigquery_storage 

949 

950 if "$" in table.table_id: 

951 raise ValueError( 

952 "Reading from a specific partition is not currently supported." 

953 ) 

954 if "@" in table.table_id: 

955 raise ValueError("Reading from a specific snapshot is not currently supported.") 

956 

957 start_time = time.monotonic() 

958 requested_streams = determine_requested_streams(preserve_order, max_stream_count) 

959 

960 requested_session = bigquery_storage.types.stream.ReadSession( 

961 table=table.to_bqstorage(), 

962 data_format=bigquery_storage.types.stream.DataFormat.ARROW, 

963 ) 

964 if selected_fields is not None: 

965 for field in selected_fields: 

966 requested_session.read_options.selected_fields.append(field.name) 

967 

968 if _ARROW_COMPRESSION_SUPPORT: 

969 requested_session.read_options.arrow_serialization_options.buffer_compression = ( 

970 # CompressionCodec(1) -> LZ4_FRAME 

971 ArrowSerializationOptions.CompressionCodec(1) 

972 ) 

973 

974 retry_policy = ( 

975 bq_retry.DEFAULT_RETRY.with_deadline(timeout) if timeout is not None else None 

976 ) 

977 

978 session = bqstorage_client.create_read_session( 

979 parent="projects/{}".format(project_id), 

980 read_session=requested_session, 

981 max_stream_count=requested_streams, 

982 retry=retry_policy, 

983 timeout=timeout, 

984 ) 

985 

986 _LOGGER.debug( 

987 "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format( 

988 table.project, table.dataset_id, table.table_id, session.name 

989 ) 

990 ) 

991 

992 # Avoid reading rows from an empty table. 

993 if not session.streams: 

994 return 

995 

996 total_streams = len(session.streams) 

997 

998 # Use _DownloadState to notify worker threads when to quit. 

999 # See: https://stackoverflow.com/a/29237343/101923 

1000 if download_state is None: 

1001 download_state = _DownloadState() 

1002 

1003 # Create a queue to collect frames as they are created in each thread. 

1004 # 

1005 # The queue needs to be bounded by default, because if the user code processes the 

1006 # fetched result pages too slowly, while at the same time new pages are rapidly being 

1007 # fetched from the server, the queue can grow to the point where the process runs 

1008 # out of memory. 

1009 if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT: 

1010 max_queue_size = total_streams 

1011 elif max_queue_size is None: 

1012 max_queue_size = 0 # unbounded 

1013 

1014 worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) 

1015 

1016 # Manually manage the pool to control shutdown behavior on timeout. 

1017 pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) 

1018 wait_on_shutdown = True 

1019 try: 

1020 # Manually submit jobs and wait for download to complete rather 

1021 # than using pool.map because pool.map continues running in the 

1022 # background even if there is an exception on the main thread. 

1023 # See: https://github.com/googleapis/google-cloud-python/pull/7698 

1024 not_done = [ 

1025 pool.submit( 

1026 _download_table_bqstorage_stream, 

1027 download_state, 

1028 bqstorage_client, 

1029 session, 

1030 stream, 

1031 worker_queue, 

1032 page_to_item, 

1033 ) 

1034 for stream in session.streams 

1035 ] 

1036 

1037 while not_done: 

1038 # Check for timeout 

1039 if timeout is not None: 

1040 elapsed = time.monotonic() - start_time 

1041 if elapsed > timeout: 

1042 wait_on_shutdown = False 

1043 raise concurrent.futures.TimeoutError( 

1044 f"Download timed out after {timeout} seconds." 

1045 ) 

1046 

1047 # Don't block on the worker threads. For performance reasons, 

1048 # we want to block on the queue's get method, instead. This 

1049 # prevents the queue from filling up, because the main thread 

1050 # has smaller gaps in time between calls to the queue's get 

1051 # method. For a detailed explanation, see: 

1052 # https://friendliness.dev/2019/06/18/python-nowait/ 

1053 done, not_done = _nowait(not_done) 

1054 for future in done: 

1055 # Call result() on any finished threads to raise any 

1056 # exceptions encountered. 

1057 future.result() 

1058 

1059 try: 

1060 frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) 

1061 yield frame 

1062 except queue.Empty: # pragma: NO COVER 

1063 continue 

1064 

1065 # Return any remaining values after the workers finished. 

1066 while True: # pragma: NO COVER 

1067 try: 

1068 frame = worker_queue.get_nowait() 

1069 yield frame 

1070 except queue.Empty: # pragma: NO COVER 

1071 break 

1072 finally: 

1073 # No need for a lock because reading/replacing a variable is 

1074 # defined to be an atomic operation in the Python language 

1075 # definition (enforced by the global interpreter lock). 

1076 download_state.done = True 

1077 

1078 # Shutdown all background threads, now that they should know to 

1079 # exit early. 

1080 pool.shutdown(wait=wait_on_shutdown) 

1081 

1082 

1083def download_arrow_bqstorage( 

1084 project_id, 

1085 table, 

1086 bqstorage_client, 

1087 preserve_order=False, 

1088 selected_fields=None, 

1089 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

1090 max_stream_count=None, 

1091 timeout=None, 

1092): 

1093 return _download_table_bqstorage( 

1094 project_id, 

1095 table, 

1096 bqstorage_client, 

1097 preserve_order=preserve_order, 

1098 selected_fields=selected_fields, 

1099 page_to_item=_bqstorage_page_to_arrow, 

1100 max_queue_size=max_queue_size, 

1101 max_stream_count=max_stream_count, 

1102 timeout=timeout, 

1103 ) 

1104 

1105 

1106def download_dataframe_bqstorage( 

1107 project_id, 

1108 table, 

1109 bqstorage_client, 

1110 column_names, 

1111 dtypes, 

1112 preserve_order=False, 

1113 selected_fields=None, 

1114 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

1115 max_stream_count=None, 

1116 timeout=None, 

1117): 

1118 page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) 

1119 return _download_table_bqstorage( 

1120 project_id, 

1121 table, 

1122 bqstorage_client, 

1123 preserve_order=preserve_order, 

1124 selected_fields=selected_fields, 

1125 page_to_item=page_to_item, 

1126 max_queue_size=max_queue_size, 

1127 max_stream_count=max_stream_count, 

1128 timeout=timeout, 

1129 ) 

1130 

1131 

1132def dataframe_to_json_generator(dataframe): 

1133 for row in dataframe.itertuples(index=False, name=None): 

1134 output = {} 

1135 for column, value in zip(dataframe.columns, row): 

1136 # Omit NaN values. 

1137 is_nan = pandas.isna(value) 

1138 

1139 # isna() can also return an array-like of bools, but the latter's boolean 

1140 # value is ambiguous, hence an extra check. An array-like value is *not* 

1141 # considered a NaN, however. 

1142 if isinstance(is_nan, bool) and is_nan: 

1143 continue 

1144 

1145 # Convert numpy types to corresponding Python types. 

1146 # https://stackoverflow.com/a/60441783/101923 

1147 if isinstance(value, numpy.bool_): 

1148 value = bool(value) 

1149 elif isinstance( 

1150 value, 

1151 ( 

1152 numpy.int64, 

1153 numpy.int32, 

1154 numpy.int16, 

1155 numpy.int8, 

1156 numpy.uint64, 

1157 numpy.uint32, 

1158 numpy.uint16, 

1159 numpy.uint8, 

1160 ), 

1161 ): 

1162 value = int(value) 

1163 output[column] = value 

1164 

1165 yield output 

1166 

1167 

1168def verify_pandas_imports(): 

1169 if pandas is None: 

1170 raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception 

1171 if db_dtypes is None: 

1172 raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception 

1173 

1174 

1175def determine_requested_streams( 

1176 preserve_order: bool, 

1177 max_stream_count: Union[int, None], 

1178) -> int: 

1179 """Determines the value of requested_streams based on the values of 

1180 `preserve_order` and `max_stream_count`. 

1181 

1182 Args: 

1183 preserve_order (bool): Whether to preserve the order of streams. If True, 

1184 this limits the number of streams to one. `preserve_order` takes 

1185 precedence over `max_stream_count`. 

1186 max_stream_count (Union[int, None]]): The maximum number of streams 

1187 allowed. Must be a non-negative number or None, where None indicates 

1188 the value is unset. NOTE: if `preserve_order` is also set, it takes 

1189 precedence over `max_stream_count`, thus to ensure that `max_stream_count` 

1190 is used, ensure that `preserve_order` is None. 

1191 

1192 Returns: 

1193 (int) The appropriate value for requested_streams. 

1194 """ 

1195 

1196 if preserve_order: 

1197 # If preserve order is set, it takes precedence. 

1198 # Limit the requested streams to 1, to ensure that order 

1199 # is preserved) 

1200 return 1 

1201 

1202 elif max_stream_count is not None: 

1203 # If preserve_order is not set, only then do we consider max_stream_count 

1204 if max_stream_count <= -1: 

1205 raise ValueError("max_stream_count must be non-negative OR None") 

1206 return max_stream_count 

1207 

1208 # Default to zero requested streams (unbounded). 

1209 return 0