Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/_pandas_helpers.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

421 statements  

1# Copyright 2019 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Shared helper functions for connecting BigQuery and pandas. 

16 

17NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package, 

18instead. See: go/pandas-gbq-and-bigframes-redundancy and 

19https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pandas_to_bigquery.py 

20""" 

21 

22import concurrent.futures 

23from datetime import datetime 

24import functools 

25from itertools import islice 

26import logging 

27import queue 

28import threading 

29import warnings 

30from typing import Any, Union, Optional, Callable, Generator, List 

31 

32 

33from google.cloud.bigquery import _pyarrow_helpers 

34from google.cloud.bigquery import _versions_helpers 

35from google.cloud.bigquery import schema 

36 

37 

38try: 

39 import pandas # type: ignore 

40 

41 pandas_import_exception = None 

42except ImportError as exc: 

43 pandas = None 

44 pandas_import_exception = exc 

45else: 

46 import numpy 

47 

48 

49try: 

50 import pandas_gbq.schema.pandas_to_bigquery # type: ignore 

51 

52 pandas_gbq_import_exception = None 

53except ImportError as exc: 

54 pandas_gbq = None 

55 pandas_gbq_import_exception = exc 

56 

57 

58try: 

59 import db_dtypes # type: ignore 

60 

61 date_dtype_name = db_dtypes.DateDtype.name 

62 time_dtype_name = db_dtypes.TimeDtype.name 

63 db_dtypes_import_exception = None 

64except ImportError as exc: 

65 db_dtypes = None 

66 db_dtypes_import_exception = exc 

67 date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype 

68 

69pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import() 

70 

71try: 

72 # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array` 

73 from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore 

74except ImportError: 

75 # No shapely, use NoneType for _BaseGeometry as a placeholder. 

76 _BaseGeometry = type(None) 

77else: 

78 # We don't have any unit test sessions that install shapely but not pandas. 

79 if pandas is not None: # pragma: NO COVER 

80 

81 def _to_wkb(): 

82 from shapely import wkb # type: ignore 

83 

84 write = wkb.dumps 

85 notnull = pandas.notnull 

86 

87 def _to_wkb(v): 

88 return write(v) if notnull(v) else v 

89 

90 return _to_wkb 

91 

92 _to_wkb = _to_wkb() 

93 

94try: 

95 from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions 

96except ImportError: 

97 _ARROW_COMPRESSION_SUPPORT = False 

98else: 

99 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. 

100 _ARROW_COMPRESSION_SUPPORT = True 

101 

102_LOGGER = logging.getLogger(__name__) 

103 

104_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds. 

105 

106_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads 

107 

108_NO_PANDAS_ERROR = "Please install the 'pandas' package to use this function." 

109_NO_DB_TYPES_ERROR = "Please install the 'db-dtypes' package to use this function." 

110 

111_PANDAS_DTYPE_TO_BQ = { 

112 "bool": "BOOLEAN", 

113 "datetime64[ns, UTC]": "TIMESTAMP", 

114 "datetime64[ns]": "DATETIME", 

115 "float32": "FLOAT", 

116 "float64": "FLOAT", 

117 "int8": "INTEGER", 

118 "int16": "INTEGER", 

119 "int32": "INTEGER", 

120 "int64": "INTEGER", 

121 "uint8": "INTEGER", 

122 "uint16": "INTEGER", 

123 "uint32": "INTEGER", 

124 "geometry": "GEOGRAPHY", 

125 date_dtype_name: "DATE", 

126 time_dtype_name: "TIME", 

127} 

128 

129 

130class _DownloadState(object): 

131 """Flag to indicate that a thread should exit early.""" 

132 

133 def __init__(self): 

134 # No need for a lock because reading/replacing a variable is defined to 

135 # be an atomic operation in the Python language definition (enforced by 

136 # the global interpreter lock). 

137 self.done = False 

138 # To assist with testing and understanding the behavior of the 

139 # download, use this object as shared state to track how many worker 

140 # threads have started and have gracefully shutdown. 

141 self._started_workers_lock = threading.Lock() 

142 self.started_workers = 0 

143 self._finished_workers_lock = threading.Lock() 

144 self.finished_workers = 0 

145 

146 def start(self): 

147 with self._started_workers_lock: 

148 self.started_workers += 1 

149 

150 def finish(self): 

151 with self._finished_workers_lock: 

152 self.finished_workers += 1 

153 

154 

155BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = { 

156 "GEOGRAPHY": { 

157 b"ARROW:extension:name": b"google:sqlType:geography", 

158 b"ARROW:extension:metadata": b'{"encoding": "WKT"}', 

159 }, 

160 "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"}, 

161 "JSON": {b"ARROW:extension:name": b"google:sqlType:json"}, 

162} 

163 

164 

165def bq_to_arrow_struct_data_type(field): 

166 arrow_fields = [] 

167 for subfield in field.fields: 

168 arrow_subfield = bq_to_arrow_field(subfield) 

169 if arrow_subfield: 

170 arrow_fields.append(arrow_subfield) 

171 else: 

172 # Could not determine a subfield type. Fallback to type 

173 # inference. 

174 return None 

175 return pyarrow.struct(arrow_fields) 

176 

177 

178def bq_to_arrow_range_data_type(field): 

179 if field is None: 

180 raise ValueError( 

181 "Range element type cannot be None, must be one of " 

182 "DATE, DATETIME, or TIMESTAMP" 

183 ) 

184 element_type = field.element_type.upper() 

185 arrow_element_type = _pyarrow_helpers.bq_to_arrow_scalars(element_type)() 

186 return pyarrow.struct([("start", arrow_element_type), ("end", arrow_element_type)]) 

187 

188 

189def bq_to_arrow_data_type(field): 

190 """Return the Arrow data type, corresponding to a given BigQuery column. 

191 

192 Returns: 

193 None: if default Arrow type inspection should be used. 

194 """ 

195 if field.mode is not None and field.mode.upper() == "REPEATED": 

196 inner_type = bq_to_arrow_data_type( 

197 schema.SchemaField(field.name, field.field_type, fields=field.fields) 

198 ) 

199 if inner_type: 

200 return pyarrow.list_(inner_type) 

201 return None 

202 

203 field_type_upper = field.field_type.upper() if field.field_type else "" 

204 if field_type_upper in schema._STRUCT_TYPES: 

205 return bq_to_arrow_struct_data_type(field) 

206 

207 if field_type_upper == "RANGE": 

208 return bq_to_arrow_range_data_type(field.range_element_type) 

209 

210 data_type_constructor = _pyarrow_helpers.bq_to_arrow_scalars(field_type_upper) 

211 if data_type_constructor is None: 

212 return None 

213 return data_type_constructor() 

214 

215 

216def bq_to_arrow_field(bq_field, array_type=None): 

217 """Return the Arrow field, corresponding to a given BigQuery column. 

218 

219 Returns: 

220 None: if the Arrow type cannot be determined. 

221 """ 

222 arrow_type = bq_to_arrow_data_type(bq_field) 

223 if arrow_type is not None: 

224 if array_type is not None: 

225 arrow_type = array_type # For GEOGRAPHY, at least initially 

226 metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get( 

227 bq_field.field_type.upper() if bq_field.field_type else "" 

228 ) 

229 return pyarrow.field( 

230 bq_field.name, 

231 arrow_type, 

232 # Even if the remote schema is REQUIRED, there's a chance there's 

233 # local NULL values. Arrow will gladly interpret these NULL values 

234 # as non-NULL and give you an arbitrary value. See: 

235 # https://github.com/googleapis/python-bigquery/issues/1692 

236 nullable=False if bq_field.mode.upper() == "REPEATED" else True, 

237 metadata=metadata, 

238 ) 

239 

240 warnings.warn( 

241 "Unable to determine Arrow type for field '{}'.".format(bq_field.name) 

242 ) 

243 return None 

244 

245 

246def bq_to_arrow_schema(bq_schema): 

247 """Return the Arrow schema, corresponding to a given BigQuery schema. 

248 

249 Returns: 

250 None: if any Arrow type cannot be determined. 

251 """ 

252 arrow_fields = [] 

253 for bq_field in bq_schema: 

254 arrow_field = bq_to_arrow_field(bq_field) 

255 if arrow_field is None: 

256 # Auto-detect the schema if there is an unknown field type. 

257 return None 

258 arrow_fields.append(arrow_field) 

259 return pyarrow.schema(arrow_fields) 

260 

261 

262def default_types_mapper( 

263 date_as_object: bool = False, 

264 bool_dtype: Union[Any, None] = None, 

265 int_dtype: Union[Any, None] = None, 

266 float_dtype: Union[Any, None] = None, 

267 string_dtype: Union[Any, None] = None, 

268 date_dtype: Union[Any, None] = None, 

269 datetime_dtype: Union[Any, None] = None, 

270 time_dtype: Union[Any, None] = None, 

271 timestamp_dtype: Union[Any, None] = None, 

272 range_date_dtype: Union[Any, None] = None, 

273 range_datetime_dtype: Union[Any, None] = None, 

274 range_timestamp_dtype: Union[Any, None] = None, 

275): 

276 """Create a mapping from pyarrow types to pandas types. 

277 

278 This overrides the pandas defaults to use null-safe extension types where 

279 available. 

280 

281 See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of 

282 data types. See: 

283 tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for 

284 BigQuery to Arrow type mapping. 

285 

286 Note to google-cloud-bigquery developers: If you update the default dtypes, 

287 also update the docs at docs/usage/pandas.rst. 

288 """ 

289 

290 def types_mapper(arrow_data_type): 

291 if bool_dtype is not None and pyarrow.types.is_boolean(arrow_data_type): 

292 return bool_dtype 

293 

294 elif int_dtype is not None and pyarrow.types.is_integer(arrow_data_type): 

295 return int_dtype 

296 

297 elif float_dtype is not None and pyarrow.types.is_floating(arrow_data_type): 

298 return float_dtype 

299 

300 elif string_dtype is not None and pyarrow.types.is_string(arrow_data_type): 

301 return string_dtype 

302 

303 elif ( 

304 # If date_as_object is True, we know some DATE columns are 

305 # out-of-bounds of what is supported by pandas. 

306 date_dtype is not None 

307 and not date_as_object 

308 and pyarrow.types.is_date(arrow_data_type) 

309 ): 

310 return date_dtype 

311 

312 elif ( 

313 datetime_dtype is not None 

314 and pyarrow.types.is_timestamp(arrow_data_type) 

315 and arrow_data_type.tz is None 

316 ): 

317 return datetime_dtype 

318 

319 elif ( 

320 timestamp_dtype is not None 

321 and pyarrow.types.is_timestamp(arrow_data_type) 

322 and arrow_data_type.tz is not None 

323 ): 

324 return timestamp_dtype 

325 

326 elif time_dtype is not None and pyarrow.types.is_time(arrow_data_type): 

327 return time_dtype 

328 

329 elif pyarrow.types.is_struct(arrow_data_type): 

330 if range_datetime_dtype is not None and arrow_data_type.equals( 

331 range_datetime_dtype.pyarrow_dtype 

332 ): 

333 return range_datetime_dtype 

334 

335 elif range_date_dtype is not None and arrow_data_type.equals( 

336 range_date_dtype.pyarrow_dtype 

337 ): 

338 return range_date_dtype 

339 

340 elif range_timestamp_dtype is not None and arrow_data_type.equals( 

341 range_timestamp_dtype.pyarrow_dtype 

342 ): 

343 return range_timestamp_dtype 

344 

345 return types_mapper 

346 

347 

348def bq_to_arrow_array(series, bq_field): 

349 if bq_field.field_type.upper() == "GEOGRAPHY": 

350 arrow_type = None 

351 first = _first_valid(series) 

352 if first is not None: 

353 if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry): 

354 arrow_type = pyarrow.binary() 

355 # Convert shapey geometry to WKB binary format: 

356 series = series.apply(_to_wkb) 

357 elif isinstance(first, bytes): 

358 arrow_type = pyarrow.binary() 

359 elif series.dtype.name == "geometry": 

360 # We have a GeoSeries containing all nulls, convert it to a pandas series 

361 series = pandas.Series(numpy.array(series)) 

362 

363 if arrow_type is None: 

364 arrow_type = bq_to_arrow_data_type(bq_field) 

365 else: 

366 arrow_type = bq_to_arrow_data_type(bq_field) 

367 

368 field_type_upper = bq_field.field_type.upper() if bq_field.field_type else "" 

369 

370 try: 

371 if bq_field.mode.upper() == "REPEATED": 

372 return pyarrow.ListArray.from_pandas(series, type=arrow_type) 

373 if field_type_upper in schema._STRUCT_TYPES: 

374 return pyarrow.StructArray.from_pandas(series, type=arrow_type) 

375 return pyarrow.Array.from_pandas(series, type=arrow_type) 

376 except pyarrow.ArrowTypeError: 

377 msg = f"""Error converting Pandas column with name: "{series.name}" and datatype: "{series.dtype}" to an appropriate pyarrow datatype: Array, ListArray, or StructArray""" 

378 _LOGGER.error(msg) 

379 raise pyarrow.ArrowTypeError(msg) 

380 

381 

382def get_column_or_index(dataframe, name): 

383 """Return a column or index as a pandas series.""" 

384 if name in dataframe.columns: 

385 return dataframe[name].reset_index(drop=True) 

386 

387 if isinstance(dataframe.index, pandas.MultiIndex): 

388 if name in dataframe.index.names: 

389 return ( 

390 dataframe.index.get_level_values(name) 

391 .to_series() 

392 .reset_index(drop=True) 

393 ) 

394 else: 

395 if name == dataframe.index.name: 

396 return dataframe.index.to_series().reset_index(drop=True) 

397 

398 raise ValueError("column or index '{}' not found.".format(name)) 

399 

400 

401def list_columns_and_indexes(dataframe): 

402 """Return all index and column names with dtypes. 

403 

404 Returns: 

405 Sequence[Tuple[str, dtype]]: 

406 Returns a sorted list of indexes and column names with 

407 corresponding dtypes. If an index is missing a name or has the 

408 same name as a column, the index is omitted. 

409 """ 

410 column_names = frozenset(dataframe.columns) 

411 columns_and_indexes = [] 

412 if isinstance(dataframe.index, pandas.MultiIndex): 

413 for name in dataframe.index.names: 

414 if name and name not in column_names: 

415 values = dataframe.index.get_level_values(name) 

416 columns_and_indexes.append((name, values.dtype)) 

417 else: 

418 if dataframe.index.name and dataframe.index.name not in column_names: 

419 columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) 

420 

421 columns_and_indexes += zip(dataframe.columns, dataframe.dtypes) 

422 return columns_and_indexes 

423 

424 

425def _first_valid(series): 

426 first_valid_index = series.first_valid_index() 

427 if first_valid_index is not None: 

428 return series.at[first_valid_index] 

429 

430 

431def _first_array_valid(series): 

432 """Return the first "meaningful" element from the array series. 

433 

434 Here, "meaningful" means the first non-None element in one of the arrays that can 

435 be used for type detextion. 

436 """ 

437 first_valid_index = series.first_valid_index() 

438 if first_valid_index is None: 

439 return None 

440 

441 valid_array = series.at[first_valid_index] 

442 valid_item = next((item for item in valid_array if not pandas.isna(item)), None) 

443 

444 if valid_item is not None: 

445 return valid_item 

446 

447 # Valid item is None because all items in the "valid" array are invalid. Try 

448 # to find a true valid array manually. 

449 for array in islice(series, first_valid_index + 1, None): 

450 try: 

451 array_iter = iter(array) 

452 except TypeError: 

453 continue # Not an array, apparently, e.g. None, thus skip. 

454 valid_item = next((item for item in array_iter if not pandas.isna(item)), None) 

455 if valid_item is not None: 

456 break 

457 

458 return valid_item 

459 

460 

461def dataframe_to_bq_schema(dataframe, bq_schema): 

462 """Convert a pandas DataFrame schema to a BigQuery schema. 

463 

464 DEPRECATED: Use 

465 pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(), 

466 instead. See: go/pandas-gbq-and-bigframes-redundancy. 

467 

468 Args: 

469 dataframe (pandas.DataFrame): 

470 DataFrame for which the client determines the BigQuery schema. 

471 bq_schema (Sequence[Union[ \ 

472 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

473 Mapping[str, Any] \ 

474 ]]): 

475 A BigQuery schema. Use this argument to override the autodetected 

476 type for some or all of the DataFrame columns. 

477 

478 Returns: 

479 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: 

480 The automatically determined schema. Returns None if the type of 

481 any column cannot be determined. 

482 """ 

483 if pandas_gbq is None: 

484 warnings.warn( 

485 "Loading pandas DataFrame into BigQuery will require pandas-gbq " 

486 "package version 0.26.1 or greater in the future. " 

487 f"Tried to import pandas-gbq and got: {pandas_gbq_import_exception}", 

488 category=FutureWarning, 

489 ) 

490 else: 

491 return pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( 

492 dataframe, 

493 override_bigquery_fields=bq_schema, 

494 index=True, 

495 ) 

496 

497 if bq_schema: 

498 bq_schema = schema._to_schema_fields(bq_schema) 

499 bq_schema_index = {field.name: field for field in bq_schema} 

500 bq_schema_unused = set(bq_schema_index.keys()) 

501 else: 

502 bq_schema_index = {} 

503 bq_schema_unused = set() 

504 

505 bq_schema_out = [] 

506 unknown_type_columns = [] 

507 dataframe_reset_index = dataframe.reset_index() 

508 for column, dtype in list_columns_and_indexes(dataframe): 

509 # Step 1: use provided type from schema, if present. 

510 bq_field = bq_schema_index.get(column) 

511 if bq_field: 

512 bq_schema_out.append(bq_field) 

513 bq_schema_unused.discard(bq_field.name) 

514 continue 

515 

516 # Step 2: try to automatically determine the type based on the 

517 # pandas dtype. 

518 bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) 

519 if bq_type is None: 

520 sample_data = _first_valid(dataframe_reset_index[column]) 

521 if ( 

522 isinstance(sample_data, _BaseGeometry) 

523 and sample_data is not None # Paranoia 

524 ): 

525 bq_type = "GEOGRAPHY" 

526 if bq_type is not None: 

527 bq_schema_out.append(schema.SchemaField(column, bq_type)) 

528 continue 

529 

530 # Step 3: try with pyarrow if available 

531 bq_field = _get_schema_by_pyarrow(column, dataframe_reset_index[column]) 

532 if bq_field is not None: 

533 bq_schema_out.append(bq_field) 

534 continue 

535 

536 unknown_type_columns.append(column) 

537 

538 # Catch any schema mismatch. The developer explicitly asked to serialize a 

539 # column, but it was not found. 

540 if bq_schema_unused: 

541 raise ValueError( 

542 "bq_schema contains fields not present in dataframe: {}".format( 

543 bq_schema_unused 

544 ) 

545 ) 

546 

547 if unknown_type_columns != []: 

548 msg = "Could not determine the type of columns: {}".format( 

549 ", ".join(unknown_type_columns) 

550 ) 

551 warnings.warn(msg) 

552 return None # We cannot detect the schema in full. 

553 

554 return tuple(bq_schema_out) 

555 

556 

557def _get_schema_by_pyarrow(name, series): 

558 """Attempt to detect the type of the given series by leveraging PyArrow's 

559 type detection capabilities. 

560 

561 This function requires the ``pyarrow`` library to be installed and 

562 available. If the series type cannot be determined or ``pyarrow`` is not 

563 available, ``None`` is returned. 

564 

565 Args: 

566 name (str): 

567 the column name of the SchemaField. 

568 series (pandas.Series): 

569 The Series data for which to detect the data type. 

570 Returns: 

571 Optional[google.cloud.bigquery.schema.SchemaField]: 

572 A tuple containing the BigQuery-compatible type string (e.g., 

573 "STRING", "INTEGER", "TIMESTAMP", "DATETIME", "NUMERIC", "BIGNUMERIC") 

574 and the mode string ("NULLABLE", "REPEATED"). 

575 Returns ``None`` if the type cannot be determined or ``pyarrow`` 

576 is not imported. 

577 """ 

578 

579 if not pyarrow: 

580 return None 

581 

582 arrow_table = pyarrow.array(series) 

583 if pyarrow.types.is_list(arrow_table.type): 

584 # `pyarrow.ListType` 

585 mode = "REPEATED" 

586 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.values.type.id) 

587 

588 # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds 

589 # it to such datetimes, causing them to be recognized as TIMESTAMP type. 

590 # We thus additionally check the actual data to see if we need to overrule 

591 # that and choose DATETIME instead. 

592 # Note that this should only be needed for datetime values inside a list, 

593 # since scalar datetime values have a proper Pandas dtype that allows 

594 # distinguishing between timezone-naive and timezone-aware values before 

595 # even requiring the additional schema augment logic in this method. 

596 if type == "TIMESTAMP": 

597 valid_item = _first_array_valid(series) 

598 if isinstance(valid_item, datetime) and valid_item.tzinfo is None: 

599 type = "DATETIME" 

600 else: 

601 mode = "NULLABLE" # default mode 

602 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.type.id) 

603 if type == "NUMERIC" and arrow_table.type.scale > 9: 

604 type = "BIGNUMERIC" 

605 

606 if type is not None: 

607 return schema.SchemaField(name, type, mode) 

608 else: 

609 return None 

610 

611 

612def dataframe_to_arrow(dataframe, bq_schema): 

613 """Convert pandas dataframe to Arrow table, using BigQuery schema. 

614 

615 Args: 

616 dataframe (pandas.DataFrame): 

617 DataFrame to convert to Arrow table. 

618 bq_schema (Sequence[Union[ \ 

619 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

620 Mapping[str, Any] \ 

621 ]]): 

622 Desired BigQuery schema. The number of columns must match the 

623 number of columns in the DataFrame. 

624 

625 Returns: 

626 pyarrow.Table: 

627 Table containing dataframe data, with schema derived from 

628 BigQuery schema. 

629 """ 

630 column_names = set(dataframe.columns) 

631 column_and_index_names = set( 

632 name for name, _ in list_columns_and_indexes(dataframe) 

633 ) 

634 

635 bq_schema = schema._to_schema_fields(bq_schema) 

636 bq_field_names = set(field.name for field in bq_schema) 

637 

638 extra_fields = bq_field_names - column_and_index_names 

639 if extra_fields: 

640 raise ValueError( 

641 "bq_schema contains fields not present in dataframe: {}".format( 

642 extra_fields 

643 ) 

644 ) 

645 

646 # It's okay for indexes to be missing from bq_schema, but it's not okay to 

647 # be missing columns. 

648 missing_fields = column_names - bq_field_names 

649 if missing_fields: 

650 raise ValueError( 

651 "bq_schema is missing fields from dataframe: {}".format(missing_fields) 

652 ) 

653 

654 arrow_arrays = [] 

655 arrow_names = [] 

656 arrow_fields = [] 

657 for bq_field in bq_schema: 

658 arrow_names.append(bq_field.name) 

659 arrow_arrays.append( 

660 bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field) 

661 ) 

662 arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type)) 

663 

664 if all((field is not None for field in arrow_fields)): 

665 return pyarrow.Table.from_arrays( 

666 arrow_arrays, schema=pyarrow.schema(arrow_fields) 

667 ) 

668 return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) 

669 

670 

671def dataframe_to_parquet( 

672 dataframe, 

673 bq_schema, 

674 filepath, 

675 parquet_compression="SNAPPY", 

676 parquet_use_compliant_nested_type=True, 

677): 

678 """Write dataframe as a Parquet file, according to the desired BQ schema. 

679 

680 This function requires the :mod:`pyarrow` package. Arrow is used as an 

681 intermediate format. 

682 

683 Args: 

684 dataframe (pandas.DataFrame): 

685 DataFrame to convert to Parquet file. 

686 bq_schema (Sequence[Union[ \ 

687 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

688 Mapping[str, Any] \ 

689 ]]): 

690 Desired BigQuery schema. Number of columns must match number of 

691 columns in the DataFrame. 

692 filepath (str): 

693 Path to write Parquet file to. 

694 parquet_compression (Optional[str]): 

695 The compression codec to use by the the ``pyarrow.parquet.write_table`` 

696 serializing method. Defaults to "SNAPPY". 

697 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table 

698 parquet_use_compliant_nested_type (bool): 

699 Whether the ``pyarrow.parquet.write_table`` serializing method should write 

700 compliant Parquet nested type (lists). Defaults to ``True``. 

701 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types 

702 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table 

703 

704 This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``. 

705 """ 

706 pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) 

707 

708 import pyarrow.parquet # type: ignore 

709 

710 kwargs = ( 

711 {"use_compliant_nested_type": parquet_use_compliant_nested_type} 

712 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type 

713 else {} 

714 ) 

715 

716 bq_schema = schema._to_schema_fields(bq_schema) 

717 arrow_table = dataframe_to_arrow(dataframe, bq_schema) 

718 pyarrow.parquet.write_table( 

719 arrow_table, 

720 filepath, 

721 compression=parquet_compression, 

722 **kwargs, 

723 ) 

724 

725 

726def _row_iterator_page_to_arrow(page, column_names, arrow_types): 

727 # Iterate over the page to force the API request to get the page data. 

728 try: 

729 next(iter(page)) 

730 except StopIteration: 

731 pass 

732 

733 arrays = [] 

734 for column_index, arrow_type in enumerate(arrow_types): 

735 arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type)) 

736 

737 if isinstance(column_names, pyarrow.Schema): 

738 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names) 

739 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) 

740 

741 

742def download_arrow_row_iterator(pages, bq_schema): 

743 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches. 

744 

745 Args: 

746 pages (Iterator[:class:`google.api_core.page_iterator.Page`]): 

747 An iterator over the result pages. 

748 bq_schema (Sequence[Union[ \ 

749 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

750 Mapping[str, Any] \ 

751 ]]): 

752 A decription of the fields in result pages. 

753 Yields: 

754 :class:`pyarrow.RecordBatch` 

755 The next page of records as a ``pyarrow`` record batch. 

756 """ 

757 bq_schema = schema._to_schema_fields(bq_schema) 

758 column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] 

759 arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] 

760 

761 for page in pages: 

762 yield _row_iterator_page_to_arrow(page, column_names, arrow_types) 

763 

764 

765def _row_iterator_page_to_dataframe(page, column_names, dtypes): 

766 # Iterate over the page to force the API request to get the page data. 

767 try: 

768 next(iter(page)) 

769 except StopIteration: 

770 pass 

771 

772 columns = {} 

773 for column_index, column_name in enumerate(column_names): 

774 dtype = dtypes.get(column_name) 

775 columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype) 

776 

777 return pandas.DataFrame(columns, columns=column_names) 

778 

779 

780def download_dataframe_row_iterator(pages, bq_schema, dtypes): 

781 """Use HTTP JSON RowIterator to construct a DataFrame. 

782 

783 Args: 

784 pages (Iterator[:class:`google.api_core.page_iterator.Page`]): 

785 An iterator over the result pages. 

786 bq_schema (Sequence[Union[ \ 

787 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

788 Mapping[str, Any] \ 

789 ]]): 

790 A decription of the fields in result pages. 

791 dtypes(Mapping[str, numpy.dtype]): 

792 The types of columns in result data to hint construction of the 

793 resulting DataFrame. Not all column types have to be specified. 

794 Yields: 

795 :class:`pandas.DataFrame` 

796 The next page of records as a ``pandas.DataFrame`` record batch. 

797 """ 

798 bq_schema = schema._to_schema_fields(bq_schema) 

799 column_names = [field.name for field in bq_schema] 

800 for page in pages: 

801 yield _row_iterator_page_to_dataframe(page, column_names, dtypes) 

802 

803 

804def _bqstorage_page_to_arrow(page): 

805 return page.to_arrow() 

806 

807 

808def _bqstorage_page_to_dataframe(column_names, dtypes, page): 

809 # page.to_dataframe() does not preserve column order in some versions 

810 # of google-cloud-bigquery-storage. Access by column name to rearrange. 

811 return page.to_dataframe(dtypes=dtypes)[column_names] 

812 

813 

814def _download_table_bqstorage_stream( 

815 download_state, bqstorage_client, session, stream, worker_queue, page_to_item 

816): 

817 download_state.start() 

818 try: 

819 reader = bqstorage_client.read_rows(stream.name) 

820 

821 # Avoid deprecation warnings for passing in unnecessary read session. 

822 # https://github.com/googleapis/python-bigquery-storage/issues/229 

823 if _versions_helpers.BQ_STORAGE_VERSIONS.is_read_session_optional: 

824 rowstream = reader.rows() 

825 else: 

826 rowstream = reader.rows(session) 

827 

828 for page in rowstream.pages: 

829 item = page_to_item(page) 

830 

831 # Make sure we set a timeout on put() so that we give the worker 

832 # thread opportunities to shutdown gracefully, for example if the 

833 # parent thread shuts down or the parent generator object which 

834 # collects rows from all workers goes out of scope. See: 

835 # https://github.com/googleapis/python-bigquery/issues/2032 

836 while True: 

837 if download_state.done: 

838 return 

839 try: 

840 worker_queue.put(item, timeout=_PROGRESS_INTERVAL) 

841 break 

842 except queue.Full: 

843 continue 

844 finally: 

845 download_state.finish() 

846 

847 

848def _nowait(futures): 

849 """Separate finished and unfinished threads, much like 

850 :func:`concurrent.futures.wait`, but don't wait. 

851 """ 

852 done = [] 

853 not_done = [] 

854 for future in futures: 

855 if future.done(): 

856 done.append(future) 

857 else: 

858 not_done.append(future) 

859 return done, not_done 

860 

861 

862def _download_table_bqstorage( 

863 project_id: str, 

864 table: Any, 

865 bqstorage_client: Any, 

866 preserve_order: bool = False, 

867 selected_fields: Optional[List[Any]] = None, 

868 page_to_item: Optional[Callable] = None, 

869 max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, 

870 max_stream_count: Optional[int] = None, 

871 download_state: Optional[_DownloadState] = None, 

872) -> Generator[Any, None, None]: 

873 """Downloads a BigQuery table using the BigQuery Storage API. 

874 

875 This method uses the faster, but potentially more expensive, BigQuery 

876 Storage API to download a table as a Pandas DataFrame. It supports 

877 parallel downloads and optional data transformations. 

878 

879 Args: 

880 project_id (str): The ID of the Google Cloud project containing 

881 the table. 

882 table (Any): The BigQuery table to download. 

883 bqstorage_client (Any): An 

884 authenticated BigQuery Storage API client. 

885 preserve_order (bool, optional): Whether to preserve the order 

886 of the rows as they are read from BigQuery. If True this limits 

887 the number of streams to one and overrides `max_stream_count`. 

888 Defaults to False. 

889 selected_fields (Optional[List[SchemaField]]): 

890 A list of BigQuery schema fields to select for download. If None, 

891 all fields are downloaded. Defaults to None. 

892 page_to_item (Optional[Callable]): An optional callable 

893 function that takes a page of data from the BigQuery Storage API 

894 max_stream_count (Optional[int]): The maximum number of 

895 concurrent streams to use for downloading data. If `preserve_order` 

896 is True, the requested streams are limited to 1 regardless of the 

897 `max_stream_count` value. If 0 or None, then the number of 

898 requested streams will be unbounded. Defaults to None. 

899 download_state (Optional[_DownloadState]): 

900 A threadsafe state object which can be used to observe the 

901 behavior of the worker threads created by this method. 

902 

903 Yields: 

904 pandas.DataFrame: Pandas DataFrames, one for each chunk of data 

905 downloaded from BigQuery. 

906 

907 Raises: 

908 ValueError: If attempting to read from a specific partition or snapshot. 

909 

910 Note: 

911 This method requires the `google-cloud-bigquery-storage` library 

912 to be installed. 

913 """ 

914 

915 from google.cloud import bigquery_storage 

916 

917 if "$" in table.table_id: 

918 raise ValueError( 

919 "Reading from a specific partition is not currently supported." 

920 ) 

921 if "@" in table.table_id: 

922 raise ValueError("Reading from a specific snapshot is not currently supported.") 

923 

924 requested_streams = determine_requested_streams(preserve_order, max_stream_count) 

925 

926 requested_session = bigquery_storage.types.stream.ReadSession( 

927 table=table.to_bqstorage(), 

928 data_format=bigquery_storage.types.stream.DataFormat.ARROW, 

929 ) 

930 if selected_fields is not None: 

931 for field in selected_fields: 

932 requested_session.read_options.selected_fields.append(field.name) 

933 

934 if _ARROW_COMPRESSION_SUPPORT: 

935 requested_session.read_options.arrow_serialization_options.buffer_compression = ( 

936 # CompressionCodec(1) -> LZ4_FRAME 

937 ArrowSerializationOptions.CompressionCodec(1) 

938 ) 

939 

940 session = bqstorage_client.create_read_session( 

941 parent="projects/{}".format(project_id), 

942 read_session=requested_session, 

943 max_stream_count=requested_streams, 

944 ) 

945 

946 _LOGGER.debug( 

947 "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format( 

948 table.project, table.dataset_id, table.table_id, session.name 

949 ) 

950 ) 

951 

952 # Avoid reading rows from an empty table. 

953 if not session.streams: 

954 return 

955 

956 total_streams = len(session.streams) 

957 

958 # Use _DownloadState to notify worker threads when to quit. 

959 # See: https://stackoverflow.com/a/29237343/101923 

960 if download_state is None: 

961 download_state = _DownloadState() 

962 

963 # Create a queue to collect frames as they are created in each thread. 

964 # 

965 # The queue needs to be bounded by default, because if the user code processes the 

966 # fetched result pages too slowly, while at the same time new pages are rapidly being 

967 # fetched from the server, the queue can grow to the point where the process runs 

968 # out of memory. 

969 if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT: 

970 max_queue_size = total_streams 

971 elif max_queue_size is None: 

972 max_queue_size = 0 # unbounded 

973 

974 worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) 

975 

976 with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: 

977 try: 

978 # Manually submit jobs and wait for download to complete rather 

979 # than using pool.map because pool.map continues running in the 

980 # background even if there is an exception on the main thread. 

981 # See: https://github.com/googleapis/google-cloud-python/pull/7698 

982 not_done = [ 

983 pool.submit( 

984 _download_table_bqstorage_stream, 

985 download_state, 

986 bqstorage_client, 

987 session, 

988 stream, 

989 worker_queue, 

990 page_to_item, 

991 ) 

992 for stream in session.streams 

993 ] 

994 

995 while not_done: 

996 # Don't block on the worker threads. For performance reasons, 

997 # we want to block on the queue's get method, instead. This 

998 # prevents the queue from filling up, because the main thread 

999 # has smaller gaps in time between calls to the queue's get 

1000 # method. For a detailed explanation, see: 

1001 # https://friendliness.dev/2019/06/18/python-nowait/ 

1002 done, not_done = _nowait(not_done) 

1003 for future in done: 

1004 # Call result() on any finished threads to raise any 

1005 # exceptions encountered. 

1006 future.result() 

1007 

1008 try: 

1009 frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) 

1010 yield frame 

1011 except queue.Empty: # pragma: NO COVER 

1012 continue 

1013 

1014 # Return any remaining values after the workers finished. 

1015 while True: # pragma: NO COVER 

1016 try: 

1017 frame = worker_queue.get_nowait() 

1018 yield frame 

1019 except queue.Empty: # pragma: NO COVER 

1020 break 

1021 finally: 

1022 # No need for a lock because reading/replacing a variable is 

1023 # defined to be an atomic operation in the Python language 

1024 # definition (enforced by the global interpreter lock). 

1025 download_state.done = True 

1026 

1027 # Shutdown all background threads, now that they should know to 

1028 # exit early. 

1029 pool.shutdown(wait=True) 

1030 

1031 

1032def download_arrow_bqstorage( 

1033 project_id, 

1034 table, 

1035 bqstorage_client, 

1036 preserve_order=False, 

1037 selected_fields=None, 

1038 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

1039 max_stream_count=None, 

1040): 

1041 return _download_table_bqstorage( 

1042 project_id, 

1043 table, 

1044 bqstorage_client, 

1045 preserve_order=preserve_order, 

1046 selected_fields=selected_fields, 

1047 page_to_item=_bqstorage_page_to_arrow, 

1048 max_queue_size=max_queue_size, 

1049 max_stream_count=max_stream_count, 

1050 ) 

1051 

1052 

1053def download_dataframe_bqstorage( 

1054 project_id, 

1055 table, 

1056 bqstorage_client, 

1057 column_names, 

1058 dtypes, 

1059 preserve_order=False, 

1060 selected_fields=None, 

1061 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

1062 max_stream_count=None, 

1063): 

1064 page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) 

1065 return _download_table_bqstorage( 

1066 project_id, 

1067 table, 

1068 bqstorage_client, 

1069 preserve_order=preserve_order, 

1070 selected_fields=selected_fields, 

1071 page_to_item=page_to_item, 

1072 max_queue_size=max_queue_size, 

1073 max_stream_count=max_stream_count, 

1074 ) 

1075 

1076 

1077def dataframe_to_json_generator(dataframe): 

1078 for row in dataframe.itertuples(index=False, name=None): 

1079 output = {} 

1080 for column, value in zip(dataframe.columns, row): 

1081 # Omit NaN values. 

1082 is_nan = pandas.isna(value) 

1083 

1084 # isna() can also return an array-like of bools, but the latter's boolean 

1085 # value is ambiguous, hence an extra check. An array-like value is *not* 

1086 # considered a NaN, however. 

1087 if isinstance(is_nan, bool) and is_nan: 

1088 continue 

1089 

1090 # Convert numpy types to corresponding Python types. 

1091 # https://stackoverflow.com/a/60441783/101923 

1092 if isinstance(value, numpy.bool_): 

1093 value = bool(value) 

1094 elif isinstance( 

1095 value, 

1096 ( 

1097 numpy.int64, 

1098 numpy.int32, 

1099 numpy.int16, 

1100 numpy.int8, 

1101 numpy.uint64, 

1102 numpy.uint32, 

1103 numpy.uint16, 

1104 numpy.uint8, 

1105 ), 

1106 ): 

1107 value = int(value) 

1108 output[column] = value 

1109 

1110 yield output 

1111 

1112 

1113def verify_pandas_imports(): 

1114 if pandas is None: 

1115 raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception 

1116 if db_dtypes is None: 

1117 raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception 

1118 

1119 

1120def determine_requested_streams( 

1121 preserve_order: bool, 

1122 max_stream_count: Union[int, None], 

1123) -> int: 

1124 """Determines the value of requested_streams based on the values of 

1125 `preserve_order` and `max_stream_count`. 

1126 

1127 Args: 

1128 preserve_order (bool): Whether to preserve the order of streams. If True, 

1129 this limits the number of streams to one. `preserve_order` takes 

1130 precedence over `max_stream_count`. 

1131 max_stream_count (Union[int, None]]): The maximum number of streams 

1132 allowed. Must be a non-negative number or None, where None indicates 

1133 the value is unset. NOTE: if `preserve_order` is also set, it takes 

1134 precedence over `max_stream_count`, thus to ensure that `max_stream_count` 

1135 is used, ensure that `preserve_order` is None. 

1136 

1137 Returns: 

1138 (int) The appropriate value for requested_streams. 

1139 """ 

1140 

1141 if preserve_order: 

1142 # If preserve order is set, it takes precedence. 

1143 # Limit the requested streams to 1, to ensure that order 

1144 # is preserved) 

1145 return 1 

1146 

1147 elif max_stream_count is not None: 

1148 # If preserve_order is not set, only then do we consider max_stream_count 

1149 if max_stream_count <= -1: 

1150 raise ValueError("max_stream_count must be non-negative OR None") 

1151 return max_stream_count 

1152 

1153 # Default to zero requested streams (unbounded). 

1154 return 0