Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/_pandas_helpers.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

429 statements  

1# Copyright 2019 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Shared helper functions for connecting BigQuery and pandas. 

16 

17NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package, 

18instead. See: go/pandas-gbq-and-bigframes-redundancy and 

19https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pandas_to_bigquery.py 

20""" 

21 

22import concurrent.futures 

23from datetime import datetime 

24import functools 

25from itertools import islice 

26import logging 

27import queue 

28import threading 

29import time 

30import warnings 

31from typing import Any, Union, Optional, Callable, Generator, List 

32 

33 

34from google.cloud.bigquery import _pyarrow_helpers 

35from google.cloud.bigquery import _versions_helpers 

36from google.cloud.bigquery import schema 

37 

38 

39try: 

40 import pandas # type: ignore 

41 

42 pandas_import_exception = None 

43except ImportError as exc: 

44 pandas = None 

45 pandas_import_exception = exc 

46else: 

47 import numpy 

48 

49 

50try: 

51 import pandas_gbq.schema.pandas_to_bigquery # type: ignore 

52 

53 pandas_gbq_import_exception = None 

54except ImportError as exc: 

55 pandas_gbq = None 

56 pandas_gbq_import_exception = exc 

57 

58 

59try: 

60 import db_dtypes # type: ignore 

61 

62 date_dtype_name = db_dtypes.DateDtype.name 

63 time_dtype_name = db_dtypes.TimeDtype.name 

64 db_dtypes_import_exception = None 

65except ImportError as exc: 

66 db_dtypes = None 

67 db_dtypes_import_exception = exc 

68 date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype 

69 

70pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import() 

71 

72try: 

73 # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array` 

74 from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore 

75except ImportError: 

76 # No shapely, use NoneType for _BaseGeometry as a placeholder. 

77 _BaseGeometry = type(None) 

78else: 

79 # We don't have any unit test sessions that install shapely but not pandas. 

80 if pandas is not None: # pragma: NO COVER 

81 

82 def _to_wkb(): 

83 from shapely import wkb # type: ignore 

84 

85 write = wkb.dumps 

86 notnull = pandas.notnull 

87 

88 def _to_wkb(v): 

89 return write(v) if notnull(v) else v 

90 

91 return _to_wkb 

92 

93 _to_wkb = _to_wkb() 

94 

95try: 

96 from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions 

97except ImportError: 

98 _ARROW_COMPRESSION_SUPPORT = False 

99else: 

100 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. 

101 _ARROW_COMPRESSION_SUPPORT = True 

102 

103_LOGGER = logging.getLogger(__name__) 

104 

105_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds. 

106 

107_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads 

108 

109_NO_PANDAS_ERROR = "Please install the 'pandas' package to use this function." 

110_NO_DB_TYPES_ERROR = "Please install the 'db-dtypes' package to use this function." 

111 

112_PANDAS_DTYPE_TO_BQ = { 

113 "bool": "BOOLEAN", 

114 "datetime64[ns, UTC]": "TIMESTAMP", 

115 "datetime64[ns]": "DATETIME", 

116 "float32": "FLOAT", 

117 "float64": "FLOAT", 

118 "int8": "INTEGER", 

119 "int16": "INTEGER", 

120 "int32": "INTEGER", 

121 "int64": "INTEGER", 

122 "uint8": "INTEGER", 

123 "uint16": "INTEGER", 

124 "uint32": "INTEGER", 

125 "geometry": "GEOGRAPHY", 

126 date_dtype_name: "DATE", 

127 time_dtype_name: "TIME", 

128} 

129 

130 

131class _DownloadState(object): 

132 """Flag to indicate that a thread should exit early.""" 

133 

134 def __init__(self): 

135 # No need for a lock because reading/replacing a variable is defined to 

136 # be an atomic operation in the Python language definition (enforced by 

137 # the global interpreter lock). 

138 self.done = False 

139 # To assist with testing and understanding the behavior of the 

140 # download, use this object as shared state to track how many worker 

141 # threads have started and have gracefully shutdown. 

142 self._started_workers_lock = threading.Lock() 

143 self.started_workers = 0 

144 self._finished_workers_lock = threading.Lock() 

145 self.finished_workers = 0 

146 

147 def start(self): 

148 with self._started_workers_lock: 

149 self.started_workers += 1 

150 

151 def finish(self): 

152 with self._finished_workers_lock: 

153 self.finished_workers += 1 

154 

155 

156BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = { 

157 "GEOGRAPHY": { 

158 b"ARROW:extension:name": b"google:sqlType:geography", 

159 b"ARROW:extension:metadata": b'{"encoding": "WKT"}', 

160 }, 

161 "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"}, 

162 "JSON": {b"ARROW:extension:name": b"google:sqlType:json"}, 

163} 

164 

165 

166def bq_to_arrow_struct_data_type(field): 

167 arrow_fields = [] 

168 for subfield in field.fields: 

169 arrow_subfield = bq_to_arrow_field(subfield) 

170 if arrow_subfield: 

171 arrow_fields.append(arrow_subfield) 

172 else: 

173 # Could not determine a subfield type. Fallback to type 

174 # inference. 

175 return None 

176 return pyarrow.struct(arrow_fields) 

177 

178 

179def bq_to_arrow_range_data_type(field): 

180 if field is None: 

181 raise ValueError( 

182 "Range element type cannot be None, must be one of " 

183 "DATE, DATETIME, or TIMESTAMP" 

184 ) 

185 element_type = field.element_type.upper() 

186 arrow_element_type = _pyarrow_helpers.bq_to_arrow_scalars(element_type)() 

187 return pyarrow.struct([("start", arrow_element_type), ("end", arrow_element_type)]) 

188 

189 

190def bq_to_arrow_data_type(field): 

191 """Return the Arrow data type, corresponding to a given BigQuery column. 

192 

193 Returns: 

194 None: if default Arrow type inspection should be used. 

195 """ 

196 if field.mode is not None and field.mode.upper() == "REPEATED": 

197 inner_type = bq_to_arrow_data_type( 

198 schema.SchemaField(field.name, field.field_type, fields=field.fields) 

199 ) 

200 if inner_type: 

201 return pyarrow.list_(inner_type) 

202 return None 

203 

204 field_type_upper = field.field_type.upper() if field.field_type else "" 

205 if field_type_upper in schema._STRUCT_TYPES: 

206 return bq_to_arrow_struct_data_type(field) 

207 

208 if field_type_upper == "RANGE": 

209 return bq_to_arrow_range_data_type(field.range_element_type) 

210 

211 data_type_constructor = _pyarrow_helpers.bq_to_arrow_scalars(field_type_upper) 

212 if data_type_constructor is None: 

213 return None 

214 return data_type_constructor() 

215 

216 

217def bq_to_arrow_field(bq_field, array_type=None): 

218 """Return the Arrow field, corresponding to a given BigQuery column. 

219 

220 Returns: 

221 None: if the Arrow type cannot be determined. 

222 """ 

223 arrow_type = bq_to_arrow_data_type(bq_field) 

224 if arrow_type is not None: 

225 if array_type is not None: 

226 arrow_type = array_type # For GEOGRAPHY, at least initially 

227 metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get( 

228 bq_field.field_type.upper() if bq_field.field_type else "" 

229 ) 

230 return pyarrow.field( 

231 bq_field.name, 

232 arrow_type, 

233 # Even if the remote schema is REQUIRED, there's a chance there's 

234 # local NULL values. Arrow will gladly interpret these NULL values 

235 # as non-NULL and give you an arbitrary value. See: 

236 # https://github.com/googleapis/python-bigquery/issues/1692 

237 nullable=False if bq_field.mode.upper() == "REPEATED" else True, 

238 metadata=metadata, 

239 ) 

240 

241 warnings.warn( 

242 "Unable to determine Arrow type for field '{}'.".format(bq_field.name) 

243 ) 

244 return None 

245 

246 

247def bq_to_arrow_schema(bq_schema): 

248 """Return the Arrow schema, corresponding to a given BigQuery schema. 

249 

250 Returns: 

251 None: if any Arrow type cannot be determined. 

252 """ 

253 arrow_fields = [] 

254 for bq_field in bq_schema: 

255 arrow_field = bq_to_arrow_field(bq_field) 

256 if arrow_field is None: 

257 # Auto-detect the schema if there is an unknown field type. 

258 return None 

259 arrow_fields.append(arrow_field) 

260 return pyarrow.schema(arrow_fields) 

261 

262 

263def default_types_mapper( 

264 date_as_object: bool = False, 

265 bool_dtype: Union[Any, None] = None, 

266 int_dtype: Union[Any, None] = None, 

267 float_dtype: Union[Any, None] = None, 

268 string_dtype: Union[Any, None] = None, 

269 date_dtype: Union[Any, None] = None, 

270 datetime_dtype: Union[Any, None] = None, 

271 time_dtype: Union[Any, None] = None, 

272 timestamp_dtype: Union[Any, None] = None, 

273 range_date_dtype: Union[Any, None] = None, 

274 range_datetime_dtype: Union[Any, None] = None, 

275 range_timestamp_dtype: Union[Any, None] = None, 

276): 

277 """Create a mapping from pyarrow types to pandas types. 

278 

279 This overrides the pandas defaults to use null-safe extension types where 

280 available. 

281 

282 See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of 

283 data types. See: 

284 tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for 

285 BigQuery to Arrow type mapping. 

286 

287 Note to google-cloud-bigquery developers: If you update the default dtypes, 

288 also update the docs at docs/usage/pandas.rst. 

289 """ 

290 

291 def types_mapper(arrow_data_type): 

292 if bool_dtype is not None and pyarrow.types.is_boolean(arrow_data_type): 

293 return bool_dtype 

294 

295 elif int_dtype is not None and pyarrow.types.is_integer(arrow_data_type): 

296 return int_dtype 

297 

298 elif float_dtype is not None and pyarrow.types.is_floating(arrow_data_type): 

299 return float_dtype 

300 

301 elif string_dtype is not None and pyarrow.types.is_string(arrow_data_type): 

302 return string_dtype 

303 

304 elif ( 

305 # If date_as_object is True, we know some DATE columns are 

306 # out-of-bounds of what is supported by pandas. 

307 date_dtype is not None 

308 and not date_as_object 

309 and pyarrow.types.is_date(arrow_data_type) 

310 ): 

311 return date_dtype 

312 

313 elif ( 

314 datetime_dtype is not None 

315 and pyarrow.types.is_timestamp(arrow_data_type) 

316 and arrow_data_type.tz is None 

317 ): 

318 return datetime_dtype 

319 

320 elif ( 

321 timestamp_dtype is not None 

322 and pyarrow.types.is_timestamp(arrow_data_type) 

323 and arrow_data_type.tz is not None 

324 ): 

325 return timestamp_dtype 

326 

327 elif time_dtype is not None and pyarrow.types.is_time(arrow_data_type): 

328 return time_dtype 

329 

330 elif pyarrow.types.is_struct(arrow_data_type): 

331 if range_datetime_dtype is not None and arrow_data_type.equals( 

332 range_datetime_dtype.pyarrow_dtype 

333 ): 

334 return range_datetime_dtype 

335 

336 elif range_date_dtype is not None and arrow_data_type.equals( 

337 range_date_dtype.pyarrow_dtype 

338 ): 

339 return range_date_dtype 

340 

341 elif range_timestamp_dtype is not None and arrow_data_type.equals( 

342 range_timestamp_dtype.pyarrow_dtype 

343 ): 

344 return range_timestamp_dtype 

345 

346 return types_mapper 

347 

348 

349def bq_to_arrow_array(series, bq_field): 

350 if bq_field.field_type.upper() == "GEOGRAPHY": 

351 arrow_type = None 

352 first = _first_valid(series) 

353 if first is not None: 

354 if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry): 

355 arrow_type = pyarrow.binary() 

356 # Convert shapey geometry to WKB binary format: 

357 series = series.apply(_to_wkb) 

358 elif isinstance(first, bytes): 

359 arrow_type = pyarrow.binary() 

360 elif series.dtype.name == "geometry": 

361 # We have a GeoSeries containing all nulls, convert it to a pandas series 

362 series = pandas.Series(numpy.array(series)) 

363 

364 if arrow_type is None: 

365 arrow_type = bq_to_arrow_data_type(bq_field) 

366 else: 

367 arrow_type = bq_to_arrow_data_type(bq_field) 

368 

369 field_type_upper = bq_field.field_type.upper() if bq_field.field_type else "" 

370 

371 try: 

372 if bq_field.mode.upper() == "REPEATED": 

373 return pyarrow.ListArray.from_pandas(series, type=arrow_type) 

374 if field_type_upper in schema._STRUCT_TYPES: 

375 return pyarrow.StructArray.from_pandas(series, type=arrow_type) 

376 return pyarrow.Array.from_pandas(series, type=arrow_type) 

377 except pyarrow.ArrowTypeError: 

378 msg = f"""Error converting Pandas column with name: "{series.name}" and datatype: "{series.dtype}" to an appropriate pyarrow datatype: Array, ListArray, or StructArray""" 

379 _LOGGER.error(msg) 

380 raise pyarrow.ArrowTypeError(msg) 

381 

382 

383def get_column_or_index(dataframe, name): 

384 """Return a column or index as a pandas series.""" 

385 if name in dataframe.columns: 

386 return dataframe[name].reset_index(drop=True) 

387 

388 if isinstance(dataframe.index, pandas.MultiIndex): 

389 if name in dataframe.index.names: 

390 return ( 

391 dataframe.index.get_level_values(name) 

392 .to_series() 

393 .reset_index(drop=True) 

394 ) 

395 else: 

396 if name == dataframe.index.name: 

397 return dataframe.index.to_series().reset_index(drop=True) 

398 

399 raise ValueError("column or index '{}' not found.".format(name)) 

400 

401 

402def list_columns_and_indexes(dataframe): 

403 """Return all index and column names with dtypes. 

404 

405 Returns: 

406 Sequence[Tuple[str, dtype]]: 

407 Returns a sorted list of indexes and column names with 

408 corresponding dtypes. If an index is missing a name or has the 

409 same name as a column, the index is omitted. 

410 """ 

411 column_names = frozenset(dataframe.columns) 

412 columns_and_indexes = [] 

413 if isinstance(dataframe.index, pandas.MultiIndex): 

414 for name in dataframe.index.names: 

415 if name and name not in column_names: 

416 values = dataframe.index.get_level_values(name) 

417 columns_and_indexes.append((name, values.dtype)) 

418 else: 

419 if dataframe.index.name and dataframe.index.name not in column_names: 

420 columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) 

421 

422 columns_and_indexes += zip(dataframe.columns, dataframe.dtypes) 

423 return columns_and_indexes 

424 

425 

426def _first_valid(series): 

427 first_valid_index = series.first_valid_index() 

428 if first_valid_index is not None: 

429 return series.at[first_valid_index] 

430 

431 

432def _first_array_valid(series): 

433 """Return the first "meaningful" element from the array series. 

434 

435 Here, "meaningful" means the first non-None element in one of the arrays that can 

436 be used for type detextion. 

437 """ 

438 first_valid_index = series.first_valid_index() 

439 if first_valid_index is None: 

440 return None 

441 

442 valid_array = series.at[first_valid_index] 

443 valid_item = next((item for item in valid_array if not pandas.isna(item)), None) 

444 

445 if valid_item is not None: 

446 return valid_item 

447 

448 # Valid item is None because all items in the "valid" array are invalid. Try 

449 # to find a true valid array manually. 

450 for array in islice(series, first_valid_index + 1, None): 

451 try: 

452 array_iter = iter(array) 

453 except TypeError: 

454 continue # Not an array, apparently, e.g. None, thus skip. 

455 valid_item = next((item for item in array_iter if not pandas.isna(item)), None) 

456 if valid_item is not None: 

457 break 

458 

459 return valid_item 

460 

461 

462def dataframe_to_bq_schema(dataframe, bq_schema): 

463 """Convert a pandas DataFrame schema to a BigQuery schema. 

464 

465 DEPRECATED: Use 

466 pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(), 

467 instead. See: go/pandas-gbq-and-bigframes-redundancy. 

468 

469 Args: 

470 dataframe (pandas.DataFrame): 

471 DataFrame for which the client determines the BigQuery schema. 

472 bq_schema (Sequence[Union[ \ 

473 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

474 Mapping[str, Any] \ 

475 ]]): 

476 A BigQuery schema. Use this argument to override the autodetected 

477 type for some or all of the DataFrame columns. 

478 

479 Returns: 

480 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: 

481 The automatically determined schema. Returns None if the type of 

482 any column cannot be determined. 

483 """ 

484 if pandas_gbq is None: 

485 warnings.warn( 

486 "Loading pandas DataFrame into BigQuery will require pandas-gbq " 

487 "package version 0.26.1 or greater in the future. " 

488 f"Tried to import pandas-gbq and got: {pandas_gbq_import_exception}", 

489 category=FutureWarning, 

490 ) 

491 else: 

492 return pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( 

493 dataframe, 

494 override_bigquery_fields=bq_schema, 

495 index=True, 

496 ) 

497 

498 if bq_schema: 

499 bq_schema = schema._to_schema_fields(bq_schema) 

500 bq_schema_index = {field.name: field for field in bq_schema} 

501 bq_schema_unused = set(bq_schema_index.keys()) 

502 else: 

503 bq_schema_index = {} 

504 bq_schema_unused = set() 

505 

506 bq_schema_out = [] 

507 unknown_type_columns = [] 

508 dataframe_reset_index = dataframe.reset_index() 

509 for column, dtype in list_columns_and_indexes(dataframe): 

510 # Step 1: use provided type from schema, if present. 

511 bq_field = bq_schema_index.get(column) 

512 if bq_field: 

513 bq_schema_out.append(bq_field) 

514 bq_schema_unused.discard(bq_field.name) 

515 continue 

516 

517 # Step 2: try to automatically determine the type based on the 

518 # pandas dtype. 

519 bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) 

520 if bq_type is None: 

521 sample_data = _first_valid(dataframe_reset_index[column]) 

522 if ( 

523 isinstance(sample_data, _BaseGeometry) 

524 and sample_data is not None # Paranoia 

525 ): 

526 bq_type = "GEOGRAPHY" 

527 if bq_type is not None: 

528 bq_schema_out.append(schema.SchemaField(column, bq_type)) 

529 continue 

530 

531 # Step 3: try with pyarrow if available 

532 bq_field = _get_schema_by_pyarrow(column, dataframe_reset_index[column]) 

533 if bq_field is not None: 

534 bq_schema_out.append(bq_field) 

535 continue 

536 

537 unknown_type_columns.append(column) 

538 

539 # Catch any schema mismatch. The developer explicitly asked to serialize a 

540 # column, but it was not found. 

541 if bq_schema_unused: 

542 raise ValueError( 

543 "bq_schema contains fields not present in dataframe: {}".format( 

544 bq_schema_unused 

545 ) 

546 ) 

547 

548 if unknown_type_columns != []: 

549 msg = "Could not determine the type of columns: {}".format( 

550 ", ".join(unknown_type_columns) 

551 ) 

552 warnings.warn(msg) 

553 return None # We cannot detect the schema in full. 

554 

555 return tuple(bq_schema_out) 

556 

557 

558def _get_schema_by_pyarrow(name, series): 

559 """Attempt to detect the type of the given series by leveraging PyArrow's 

560 type detection capabilities. 

561 

562 This function requires the ``pyarrow`` library to be installed and 

563 available. If the series type cannot be determined or ``pyarrow`` is not 

564 available, ``None`` is returned. 

565 

566 Args: 

567 name (str): 

568 the column name of the SchemaField. 

569 series (pandas.Series): 

570 The Series data for which to detect the data type. 

571 Returns: 

572 Optional[google.cloud.bigquery.schema.SchemaField]: 

573 A tuple containing the BigQuery-compatible type string (e.g., 

574 "STRING", "INTEGER", "TIMESTAMP", "DATETIME", "NUMERIC", "BIGNUMERIC") 

575 and the mode string ("NULLABLE", "REPEATED"). 

576 Returns ``None`` if the type cannot be determined or ``pyarrow`` 

577 is not imported. 

578 """ 

579 

580 if not pyarrow: 

581 return None 

582 

583 arrow_table = pyarrow.array(series) 

584 if pyarrow.types.is_list(arrow_table.type): 

585 # `pyarrow.ListType` 

586 mode = "REPEATED" 

587 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.values.type.id) 

588 

589 # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds 

590 # it to such datetimes, causing them to be recognized as TIMESTAMP type. 

591 # We thus additionally check the actual data to see if we need to overrule 

592 # that and choose DATETIME instead. 

593 # Note that this should only be needed for datetime values inside a list, 

594 # since scalar datetime values have a proper Pandas dtype that allows 

595 # distinguishing between timezone-naive and timezone-aware values before 

596 # even requiring the additional schema augment logic in this method. 

597 if type == "TIMESTAMP": 

598 valid_item = _first_array_valid(series) 

599 if isinstance(valid_item, datetime) and valid_item.tzinfo is None: 

600 type = "DATETIME" 

601 else: 

602 mode = "NULLABLE" # default mode 

603 type = _pyarrow_helpers.arrow_scalar_ids_to_bq(arrow_table.type.id) 

604 if type == "NUMERIC" and arrow_table.type.scale > 9: 

605 type = "BIGNUMERIC" 

606 

607 if type is not None: 

608 return schema.SchemaField(name, type, mode) 

609 else: 

610 return None 

611 

612 

613def dataframe_to_arrow(dataframe, bq_schema): 

614 """Convert pandas dataframe to Arrow table, using BigQuery schema. 

615 

616 Args: 

617 dataframe (pandas.DataFrame): 

618 DataFrame to convert to Arrow table. 

619 bq_schema (Sequence[Union[ \ 

620 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

621 Mapping[str, Any] \ 

622 ]]): 

623 Desired BigQuery schema. The number of columns must match the 

624 number of columns in the DataFrame. 

625 

626 Returns: 

627 pyarrow.Table: 

628 Table containing dataframe data, with schema derived from 

629 BigQuery schema. 

630 """ 

631 column_names = set(dataframe.columns) 

632 column_and_index_names = set( 

633 name for name, _ in list_columns_and_indexes(dataframe) 

634 ) 

635 

636 bq_schema = schema._to_schema_fields(bq_schema) 

637 bq_field_names = set(field.name for field in bq_schema) 

638 

639 extra_fields = bq_field_names - column_and_index_names 

640 if extra_fields: 

641 raise ValueError( 

642 "bq_schema contains fields not present in dataframe: {}".format( 

643 extra_fields 

644 ) 

645 ) 

646 

647 # It's okay for indexes to be missing from bq_schema, but it's not okay to 

648 # be missing columns. 

649 missing_fields = column_names - bq_field_names 

650 if missing_fields: 

651 raise ValueError( 

652 "bq_schema is missing fields from dataframe: {}".format(missing_fields) 

653 ) 

654 

655 arrow_arrays = [] 

656 arrow_names = [] 

657 arrow_fields = [] 

658 for bq_field in bq_schema: 

659 arrow_names.append(bq_field.name) 

660 arrow_arrays.append( 

661 bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field) 

662 ) 

663 arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type)) 

664 

665 if all((field is not None for field in arrow_fields)): 

666 return pyarrow.Table.from_arrays( 

667 arrow_arrays, schema=pyarrow.schema(arrow_fields) 

668 ) 

669 return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) 

670 

671 

672def dataframe_to_parquet( 

673 dataframe, 

674 bq_schema, 

675 filepath, 

676 parquet_compression="SNAPPY", 

677 parquet_use_compliant_nested_type=True, 

678): 

679 """Write dataframe as a Parquet file, according to the desired BQ schema. 

680 

681 This function requires the :mod:`pyarrow` package. Arrow is used as an 

682 intermediate format. 

683 

684 Args: 

685 dataframe (pandas.DataFrame): 

686 DataFrame to convert to Parquet file. 

687 bq_schema (Sequence[Union[ \ 

688 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

689 Mapping[str, Any] \ 

690 ]]): 

691 Desired BigQuery schema. Number of columns must match number of 

692 columns in the DataFrame. 

693 filepath (str): 

694 Path to write Parquet file to. 

695 parquet_compression (Optional[str]): 

696 The compression codec to use by the the ``pyarrow.parquet.write_table`` 

697 serializing method. Defaults to "SNAPPY". 

698 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table 

699 parquet_use_compliant_nested_type (bool): 

700 Whether the ``pyarrow.parquet.write_table`` serializing method should write 

701 compliant Parquet nested type (lists). Defaults to ``True``. 

702 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types 

703 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table 

704 

705 This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``. 

706 """ 

707 pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) 

708 

709 import pyarrow.parquet # type: ignore 

710 

711 kwargs = ( 

712 {"use_compliant_nested_type": parquet_use_compliant_nested_type} 

713 if _versions_helpers.PYARROW_VERSIONS.use_compliant_nested_type 

714 else {} 

715 ) 

716 

717 bq_schema = schema._to_schema_fields(bq_schema) 

718 arrow_table = dataframe_to_arrow(dataframe, bq_schema) 

719 pyarrow.parquet.write_table( 

720 arrow_table, 

721 filepath, 

722 compression=parquet_compression, 

723 **kwargs, 

724 ) 

725 

726 

727def _row_iterator_page_to_arrow(page, column_names, arrow_types): 

728 # Iterate over the page to force the API request to get the page data. 

729 try: 

730 next(iter(page)) 

731 except StopIteration: 

732 pass 

733 

734 arrays = [] 

735 for column_index, arrow_type in enumerate(arrow_types): 

736 arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type)) 

737 

738 if isinstance(column_names, pyarrow.Schema): 

739 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names) 

740 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) 

741 

742 

743def download_arrow_row_iterator(pages, bq_schema): 

744 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches. 

745 

746 Args: 

747 pages (Iterator[:class:`google.api_core.page_iterator.Page`]): 

748 An iterator over the result pages. 

749 bq_schema (Sequence[Union[ \ 

750 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

751 Mapping[str, Any] \ 

752 ]]): 

753 A decription of the fields in result pages. 

754 Yields: 

755 :class:`pyarrow.RecordBatch` 

756 The next page of records as a ``pyarrow`` record batch. 

757 """ 

758 bq_schema = schema._to_schema_fields(bq_schema) 

759 column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] 

760 arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] 

761 

762 for page in pages: 

763 yield _row_iterator_page_to_arrow(page, column_names, arrow_types) 

764 

765 

766def _row_iterator_page_to_dataframe(page, column_names, dtypes): 

767 # Iterate over the page to force the API request to get the page data. 

768 try: 

769 next(iter(page)) 

770 except StopIteration: 

771 pass 

772 

773 columns = {} 

774 for column_index, column_name in enumerate(column_names): 

775 dtype = dtypes.get(column_name) 

776 columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype) 

777 

778 return pandas.DataFrame(columns, columns=column_names) 

779 

780 

781def download_dataframe_row_iterator(pages, bq_schema, dtypes): 

782 """Use HTTP JSON RowIterator to construct a DataFrame. 

783 

784 Args: 

785 pages (Iterator[:class:`google.api_core.page_iterator.Page`]): 

786 An iterator over the result pages. 

787 bq_schema (Sequence[Union[ \ 

788 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

789 Mapping[str, Any] \ 

790 ]]): 

791 A decription of the fields in result pages. 

792 dtypes(Mapping[str, numpy.dtype]): 

793 The types of columns in result data to hint construction of the 

794 resulting DataFrame. Not all column types have to be specified. 

795 Yields: 

796 :class:`pandas.DataFrame` 

797 The next page of records as a ``pandas.DataFrame`` record batch. 

798 """ 

799 bq_schema = schema._to_schema_fields(bq_schema) 

800 column_names = [field.name for field in bq_schema] 

801 for page in pages: 

802 yield _row_iterator_page_to_dataframe(page, column_names, dtypes) 

803 

804 

805def _bqstorage_page_to_arrow(page): 

806 return page.to_arrow() 

807 

808 

809def _bqstorage_page_to_dataframe(column_names, dtypes, page): 

810 # page.to_dataframe() does not preserve column order in some versions 

811 # of google-cloud-bigquery-storage. Access by column name to rearrange. 

812 return page.to_dataframe(dtypes=dtypes)[column_names] 

813 

814 

815def _download_table_bqstorage_stream( 

816 download_state, bqstorage_client, session, stream, worker_queue, page_to_item 

817): 

818 download_state.start() 

819 try: 

820 reader = bqstorage_client.read_rows(stream.name) 

821 

822 # Avoid deprecation warnings for passing in unnecessary read session. 

823 # https://github.com/googleapis/python-bigquery-storage/issues/229 

824 if _versions_helpers.BQ_STORAGE_VERSIONS.is_read_session_optional: 

825 rowstream = reader.rows() 

826 else: 

827 rowstream = reader.rows(session) 

828 

829 for page in rowstream.pages: 

830 item = page_to_item(page) 

831 

832 # Make sure we set a timeout on put() so that we give the worker 

833 # thread opportunities to shutdown gracefully, for example if the 

834 # parent thread shuts down or the parent generator object which 

835 # collects rows from all workers goes out of scope. See: 

836 # https://github.com/googleapis/python-bigquery/issues/2032 

837 while True: 

838 if download_state.done: 

839 return 

840 try: 

841 worker_queue.put(item, timeout=_PROGRESS_INTERVAL) 

842 break 

843 except queue.Full: 

844 continue 

845 finally: 

846 download_state.finish() 

847 

848 

849def _nowait(futures): 

850 """Separate finished and unfinished threads, much like 

851 :func:`concurrent.futures.wait`, but don't wait. 

852 """ 

853 done = [] 

854 not_done = [] 

855 for future in futures: 

856 if future.done(): 

857 done.append(future) 

858 else: 

859 not_done.append(future) 

860 return done, not_done 

861 

862 

863def _download_table_bqstorage( 

864 project_id: str, 

865 table: Any, 

866 bqstorage_client: Any, 

867 preserve_order: bool = False, 

868 selected_fields: Optional[List[Any]] = None, 

869 page_to_item: Optional[Callable] = None, 

870 max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, 

871 max_stream_count: Optional[int] = None, 

872 download_state: Optional[_DownloadState] = None, 

873 timeout: Optional[float] = None, 

874) -> Generator[Any, None, None]: 

875 """Downloads a BigQuery table using the BigQuery Storage API. 

876 

877 This method uses the faster, but potentially more expensive, BigQuery 

878 Storage API to download a table as a Pandas DataFrame. It supports 

879 parallel downloads and optional data transformations. 

880 

881 Args: 

882 project_id (str): The ID of the Google Cloud project containing 

883 the table. 

884 table (Any): The BigQuery table to download. 

885 bqstorage_client (Any): An 

886 authenticated BigQuery Storage API client. 

887 preserve_order (bool, optional): Whether to preserve the order 

888 of the rows as they are read from BigQuery. If True this limits 

889 the number of streams to one and overrides `max_stream_count`. 

890 Defaults to False. 

891 selected_fields (Optional[List[SchemaField]]): 

892 A list of BigQuery schema fields to select for download. If None, 

893 all fields are downloaded. Defaults to None. 

894 page_to_item (Optional[Callable]): An optional callable 

895 function that takes a page of data from the BigQuery Storage API 

896 max_stream_count (Optional[int]): The maximum number of 

897 concurrent streams to use for downloading data. If `preserve_order` 

898 is True, the requested streams are limited to 1 regardless of the 

899 `max_stream_count` value. If 0 or None, then the number of 

900 requested streams will be unbounded. Defaults to None. 

901 download_state (Optional[_DownloadState]): 

902 A threadsafe state object which can be used to observe the 

903 behavior of the worker threads created by this method. 

904 timeout (Optional[float]): 

905 The number of seconds to wait for the download to complete. 

906 If None, wait indefinitely. 

907 

908 Yields: 

909 pandas.DataFrame: Pandas DataFrames, one for each chunk of data 

910 downloaded from BigQuery. 

911 

912 Raises: 

913 ValueError: If attempting to read from a specific partition or snapshot. 

914 concurrent.futures.TimeoutError: 

915 If the download does not complete within the specified timeout. 

916 

917 Note: 

918 This method requires the `google-cloud-bigquery-storage` library 

919 to be installed. 

920 """ 

921 

922 from google.cloud import bigquery_storage 

923 

924 if "$" in table.table_id: 

925 raise ValueError( 

926 "Reading from a specific partition is not currently supported." 

927 ) 

928 if "@" in table.table_id: 

929 raise ValueError("Reading from a specific snapshot is not currently supported.") 

930 

931 requested_streams = determine_requested_streams(preserve_order, max_stream_count) 

932 

933 requested_session = bigquery_storage.types.stream.ReadSession( 

934 table=table.to_bqstorage(), 

935 data_format=bigquery_storage.types.stream.DataFormat.ARROW, 

936 ) 

937 if selected_fields is not None: 

938 for field in selected_fields: 

939 requested_session.read_options.selected_fields.append(field.name) 

940 

941 if _ARROW_COMPRESSION_SUPPORT: 

942 requested_session.read_options.arrow_serialization_options.buffer_compression = ( 

943 # CompressionCodec(1) -> LZ4_FRAME 

944 ArrowSerializationOptions.CompressionCodec(1) 

945 ) 

946 

947 session = bqstorage_client.create_read_session( 

948 parent="projects/{}".format(project_id), 

949 read_session=requested_session, 

950 max_stream_count=requested_streams, 

951 ) 

952 

953 _LOGGER.debug( 

954 "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format( 

955 table.project, table.dataset_id, table.table_id, session.name 

956 ) 

957 ) 

958 

959 # Avoid reading rows from an empty table. 

960 if not session.streams: 

961 return 

962 

963 total_streams = len(session.streams) 

964 

965 # Use _DownloadState to notify worker threads when to quit. 

966 # See: https://stackoverflow.com/a/29237343/101923 

967 if download_state is None: 

968 download_state = _DownloadState() 

969 

970 # Create a queue to collect frames as they are created in each thread. 

971 # 

972 # The queue needs to be bounded by default, because if the user code processes the 

973 # fetched result pages too slowly, while at the same time new pages are rapidly being 

974 # fetched from the server, the queue can grow to the point where the process runs 

975 # out of memory. 

976 if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT: 

977 max_queue_size = total_streams 

978 elif max_queue_size is None: 

979 max_queue_size = 0 # unbounded 

980 

981 worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) 

982 

983 # Manually manage the pool to control shutdown behavior on timeout. 

984 pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) 

985 wait_on_shutdown = True 

986 start_time = time.time() 

987 

988 try: 

989 # Manually submit jobs and wait for download to complete rather 

990 # than using pool.map because pool.map continues running in the 

991 # background even if there is an exception on the main thread. 

992 # See: https://github.com/googleapis/google-cloud-python/pull/7698 

993 not_done = [ 

994 pool.submit( 

995 _download_table_bqstorage_stream, 

996 download_state, 

997 bqstorage_client, 

998 session, 

999 stream, 

1000 worker_queue, 

1001 page_to_item, 

1002 ) 

1003 for stream in session.streams 

1004 ] 

1005 

1006 while not_done: 

1007 # Check for timeout 

1008 if timeout is not None: 

1009 elapsed = time.time() - start_time 

1010 if elapsed > timeout: 

1011 wait_on_shutdown = False 

1012 raise concurrent.futures.TimeoutError( 

1013 f"Download timed out after {timeout} seconds." 

1014 ) 

1015 

1016 # Don't block on the worker threads. For performance reasons, 

1017 # we want to block on the queue's get method, instead. This 

1018 # prevents the queue from filling up, because the main thread 

1019 # has smaller gaps in time between calls to the queue's get 

1020 # method. For a detailed explanation, see: 

1021 # https://friendliness.dev/2019/06/18/python-nowait/ 

1022 done, not_done = _nowait(not_done) 

1023 for future in done: 

1024 # Call result() on any finished threads to raise any 

1025 # exceptions encountered. 

1026 future.result() 

1027 

1028 try: 

1029 frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) 

1030 yield frame 

1031 except queue.Empty: # pragma: NO COVER 

1032 continue 

1033 

1034 # Return any remaining values after the workers finished. 

1035 while True: # pragma: NO COVER 

1036 try: 

1037 frame = worker_queue.get_nowait() 

1038 yield frame 

1039 except queue.Empty: # pragma: NO COVER 

1040 break 

1041 finally: 

1042 # No need for a lock because reading/replacing a variable is 

1043 # defined to be an atomic operation in the Python language 

1044 # definition (enforced by the global interpreter lock). 

1045 download_state.done = True 

1046 

1047 # Shutdown all background threads, now that they should know to 

1048 # exit early. 

1049 pool.shutdown(wait=wait_on_shutdown) 

1050 

1051 

1052def download_arrow_bqstorage( 

1053 project_id, 

1054 table, 

1055 bqstorage_client, 

1056 preserve_order=False, 

1057 selected_fields=None, 

1058 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

1059 max_stream_count=None, 

1060 timeout=None, 

1061): 

1062 return _download_table_bqstorage( 

1063 project_id, 

1064 table, 

1065 bqstorage_client, 

1066 preserve_order=preserve_order, 

1067 selected_fields=selected_fields, 

1068 page_to_item=_bqstorage_page_to_arrow, 

1069 max_queue_size=max_queue_size, 

1070 max_stream_count=max_stream_count, 

1071 timeout=timeout, 

1072 ) 

1073 

1074 

1075def download_dataframe_bqstorage( 

1076 project_id, 

1077 table, 

1078 bqstorage_client, 

1079 column_names, 

1080 dtypes, 

1081 preserve_order=False, 

1082 selected_fields=None, 

1083 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

1084 max_stream_count=None, 

1085 timeout=None, 

1086): 

1087 page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) 

1088 return _download_table_bqstorage( 

1089 project_id, 

1090 table, 

1091 bqstorage_client, 

1092 preserve_order=preserve_order, 

1093 selected_fields=selected_fields, 

1094 page_to_item=page_to_item, 

1095 max_queue_size=max_queue_size, 

1096 max_stream_count=max_stream_count, 

1097 timeout=timeout, 

1098 ) 

1099 

1100 

1101def dataframe_to_json_generator(dataframe): 

1102 for row in dataframe.itertuples(index=False, name=None): 

1103 output = {} 

1104 for column, value in zip(dataframe.columns, row): 

1105 # Omit NaN values. 

1106 is_nan = pandas.isna(value) 

1107 

1108 # isna() can also return an array-like of bools, but the latter's boolean 

1109 # value is ambiguous, hence an extra check. An array-like value is *not* 

1110 # considered a NaN, however. 

1111 if isinstance(is_nan, bool) and is_nan: 

1112 continue 

1113 

1114 # Convert numpy types to corresponding Python types. 

1115 # https://stackoverflow.com/a/60441783/101923 

1116 if isinstance(value, numpy.bool_): 

1117 value = bool(value) 

1118 elif isinstance( 

1119 value, 

1120 ( 

1121 numpy.int64, 

1122 numpy.int32, 

1123 numpy.int16, 

1124 numpy.int8, 

1125 numpy.uint64, 

1126 numpy.uint32, 

1127 numpy.uint16, 

1128 numpy.uint8, 

1129 ), 

1130 ): 

1131 value = int(value) 

1132 output[column] = value 

1133 

1134 yield output 

1135 

1136 

1137def verify_pandas_imports(): 

1138 if pandas is None: 

1139 raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception 

1140 if db_dtypes is None: 

1141 raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception 

1142 

1143 

1144def determine_requested_streams( 

1145 preserve_order: bool, 

1146 max_stream_count: Union[int, None], 

1147) -> int: 

1148 """Determines the value of requested_streams based on the values of 

1149 `preserve_order` and `max_stream_count`. 

1150 

1151 Args: 

1152 preserve_order (bool): Whether to preserve the order of streams. If True, 

1153 this limits the number of streams to one. `preserve_order` takes 

1154 precedence over `max_stream_count`. 

1155 max_stream_count (Union[int, None]]): The maximum number of streams 

1156 allowed. Must be a non-negative number or None, where None indicates 

1157 the value is unset. NOTE: if `preserve_order` is also set, it takes 

1158 precedence over `max_stream_count`, thus to ensure that `max_stream_count` 

1159 is used, ensure that `preserve_order` is None. 

1160 

1161 Returns: 

1162 (int) The appropriate value for requested_streams. 

1163 """ 

1164 

1165 if preserve_order: 

1166 # If preserve order is set, it takes precedence. 

1167 # Limit the requested streams to 1, to ensure that order 

1168 # is preserved) 

1169 return 1 

1170 

1171 elif max_stream_count is not None: 

1172 # If preserve_order is not set, only then do we consider max_stream_count 

1173 if max_stream_count <= -1: 

1174 raise ValueError("max_stream_count must be non-negative OR None") 

1175 return max_stream_count 

1176 

1177 # Default to zero requested streams (unbounded). 

1178 return 0