Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/_pandas_helpers.py: 18%

369 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1# Copyright 2019 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Shared helper functions for connecting BigQuery and pandas.""" 

16 

17import concurrent.futures 

18from datetime import datetime 

19import functools 

20from itertools import islice 

21import logging 

22import queue 

23import warnings 

24from typing import Any, Union 

25 

26from packaging import version 

27 

28from google.cloud.bigquery import _helpers 

29from google.cloud.bigquery import schema 

30 

31try: 

32 import pandas # type: ignore 

33 

34 pandas_import_exception = None 

35except ImportError as exc: # pragma: NO COVER 

36 pandas = None 

37 pandas_import_exception = exc 

38else: 

39 import numpy 

40 

41try: 

42 import db_dtypes # type: ignore 

43 

44 date_dtype_name = db_dtypes.DateDtype.name 

45 time_dtype_name = db_dtypes.TimeDtype.name 

46 db_dtypes_import_exception = None 

47except ImportError as exc: # pragma: NO COVER 

48 db_dtypes = None 

49 db_dtypes_import_exception = exc 

50 date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype 

51 

52pyarrow = _helpers.PYARROW_VERSIONS.try_import() 

53 

54try: 

55 # _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array` 

56 from shapely.geometry.base import BaseGeometry as _BaseGeometry # type: ignore 

57except ImportError: # pragma: NO COVER 

58 # No shapely, use NoneType for _BaseGeometry as a placeholder. 

59 _BaseGeometry = type(None) 

60else: 

61 if pandas is not None: # pragma: NO COVER 

62 

63 def _to_wkb(): 

64 from shapely import wkb # type: ignore 

65 

66 write = wkb.dumps 

67 notnull = pandas.notnull 

68 

69 def _to_wkb(v): 

70 return write(v) if notnull(v) else v 

71 

72 return _to_wkb 

73 

74 _to_wkb = _to_wkb() 

75 

76try: 

77 from google.cloud.bigquery_storage import ArrowSerializationOptions 

78except ImportError: 

79 _ARROW_COMPRESSION_SUPPORT = False 

80else: 

81 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. 

82 _ARROW_COMPRESSION_SUPPORT = True 

83 

84_LOGGER = logging.getLogger(__name__) 

85 

86_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds. 

87 

88_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads 

89 

90_NO_PANDAS_ERROR = "Please install the 'pandas' package to use this function." 

91_NO_DB_TYPES_ERROR = "Please install the 'db-dtypes' package to use this function." 

92 

93_PANDAS_DTYPE_TO_BQ = { 

94 "bool": "BOOLEAN", 

95 "datetime64[ns, UTC]": "TIMESTAMP", 

96 "datetime64[ns]": "DATETIME", 

97 "float32": "FLOAT", 

98 "float64": "FLOAT", 

99 "int8": "INTEGER", 

100 "int16": "INTEGER", 

101 "int32": "INTEGER", 

102 "int64": "INTEGER", 

103 "uint8": "INTEGER", 

104 "uint16": "INTEGER", 

105 "uint32": "INTEGER", 

106 "geometry": "GEOGRAPHY", 

107 date_dtype_name: "DATE", 

108 time_dtype_name: "TIME", 

109} 

110 

111 

112class _DownloadState(object): 

113 """Flag to indicate that a thread should exit early.""" 

114 

115 def __init__(self): 

116 # No need for a lock because reading/replacing a variable is defined to 

117 # be an atomic operation in the Python language definition (enforced by 

118 # the global interpreter lock). 

119 self.done = False 

120 

121 

122def pyarrow_datetime(): 

123 return pyarrow.timestamp("us", tz=None) 

124 

125 

126def pyarrow_numeric(): 

127 return pyarrow.decimal128(38, 9) 

128 

129 

130def pyarrow_bignumeric(): 

131 # 77th digit is partial. 

132 # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types 

133 return pyarrow.decimal256(76, 38) 

134 

135 

136def pyarrow_time(): 

137 return pyarrow.time64("us") 

138 

139 

140def pyarrow_timestamp(): 

141 return pyarrow.timestamp("us", tz="UTC") 

142 

143 

144if pyarrow: 

145 # This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py 

146 # When modifying it be sure to update it there as well. 

147 BQ_TO_ARROW_SCALARS = { 

148 "BOOL": pyarrow.bool_, 

149 "BOOLEAN": pyarrow.bool_, 

150 "BYTES": pyarrow.binary, 

151 "DATE": pyarrow.date32, 

152 "DATETIME": pyarrow_datetime, 

153 "FLOAT": pyarrow.float64, 

154 "FLOAT64": pyarrow.float64, 

155 "GEOGRAPHY": pyarrow.string, 

156 "INT64": pyarrow.int64, 

157 "INTEGER": pyarrow.int64, 

158 "NUMERIC": pyarrow_numeric, 

159 "STRING": pyarrow.string, 

160 "TIME": pyarrow_time, 

161 "TIMESTAMP": pyarrow_timestamp, 

162 } 

163 ARROW_SCALAR_IDS_TO_BQ = { 

164 # https://arrow.apache.org/docs/python/api/datatypes.html#type-classes 

165 pyarrow.bool_().id: "BOOL", 

166 pyarrow.int8().id: "INT64", 

167 pyarrow.int16().id: "INT64", 

168 pyarrow.int32().id: "INT64", 

169 pyarrow.int64().id: "INT64", 

170 pyarrow.uint8().id: "INT64", 

171 pyarrow.uint16().id: "INT64", 

172 pyarrow.uint32().id: "INT64", 

173 pyarrow.uint64().id: "INT64", 

174 pyarrow.float16().id: "FLOAT64", 

175 pyarrow.float32().id: "FLOAT64", 

176 pyarrow.float64().id: "FLOAT64", 

177 pyarrow.time32("ms").id: "TIME", 

178 pyarrow.time64("ns").id: "TIME", 

179 pyarrow.timestamp("ns").id: "TIMESTAMP", 

180 pyarrow.date32().id: "DATE", 

181 pyarrow.date64().id: "DATETIME", # because millisecond resolution 

182 pyarrow.binary().id: "BYTES", 

183 pyarrow.string().id: "STRING", # also alias for pyarrow.utf8() 

184 # The exact scale and precision don't matter, see below. 

185 pyarrow.decimal128(38, scale=9).id: "NUMERIC", 

186 } 

187 

188 if version.parse(pyarrow.__version__) >= version.parse("3.0.0"): 

189 BQ_TO_ARROW_SCALARS["BIGNUMERIC"] = pyarrow_bignumeric 

190 # The exact decimal's scale and precision are not important, as only 

191 # the type ID matters, and it's the same for all decimal256 instances. 

192 ARROW_SCALAR_IDS_TO_BQ[pyarrow.decimal256(76, scale=38).id] = "BIGNUMERIC" 

193 _BIGNUMERIC_SUPPORT = True 

194 else: 

195 _BIGNUMERIC_SUPPORT = False # pragma: NO COVER 

196 

197else: # pragma: NO COVER 

198 BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER 

199 ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER 

200 _BIGNUMERIC_SUPPORT = False # pragma: NO COVER 

201 

202 

203BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = { 

204 "GEOGRAPHY": { 

205 b"ARROW:extension:name": b"google:sqlType:geography", 

206 b"ARROW:extension:metadata": b'{"encoding": "WKT"}', 

207 }, 

208 "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"}, 

209} 

210 

211 

212def bq_to_arrow_struct_data_type(field): 

213 arrow_fields = [] 

214 for subfield in field.fields: 

215 arrow_subfield = bq_to_arrow_field(subfield) 

216 if arrow_subfield: 

217 arrow_fields.append(arrow_subfield) 

218 else: 

219 # Could not determine a subfield type. Fallback to type 

220 # inference. 

221 return None 

222 return pyarrow.struct(arrow_fields) 

223 

224 

225def bq_to_arrow_data_type(field): 

226 """Return the Arrow data type, corresponding to a given BigQuery column. 

227 

228 Returns: 

229 None: if default Arrow type inspection should be used. 

230 """ 

231 if field.mode is not None and field.mode.upper() == "REPEATED": 

232 inner_type = bq_to_arrow_data_type( 

233 schema.SchemaField(field.name, field.field_type, fields=field.fields) 

234 ) 

235 if inner_type: 

236 return pyarrow.list_(inner_type) 

237 return None 

238 

239 field_type_upper = field.field_type.upper() if field.field_type else "" 

240 if field_type_upper in schema._STRUCT_TYPES: 

241 return bq_to_arrow_struct_data_type(field) 

242 

243 data_type_constructor = BQ_TO_ARROW_SCALARS.get(field_type_upper) 

244 if data_type_constructor is None: 

245 return None 

246 return data_type_constructor() 

247 

248 

249def bq_to_arrow_field(bq_field, array_type=None): 

250 """Return the Arrow field, corresponding to a given BigQuery column. 

251 

252 Returns: 

253 None: if the Arrow type cannot be determined. 

254 """ 

255 arrow_type = bq_to_arrow_data_type(bq_field) 

256 if arrow_type is not None: 

257 if array_type is not None: 

258 arrow_type = array_type # For GEOGRAPHY, at least initially 

259 is_nullable = bq_field.mode.upper() == "NULLABLE" 

260 metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get( 

261 bq_field.field_type.upper() if bq_field.field_type else "" 

262 ) 

263 return pyarrow.field( 

264 bq_field.name, arrow_type, nullable=is_nullable, metadata=metadata 

265 ) 

266 

267 warnings.warn("Unable to determine type for field '{}'.".format(bq_field.name)) 

268 return None 

269 

270 

271def bq_to_arrow_schema(bq_schema): 

272 """Return the Arrow schema, corresponding to a given BigQuery schema. 

273 

274 Returns: 

275 None: if any Arrow type cannot be determined. 

276 """ 

277 arrow_fields = [] 

278 for bq_field in bq_schema: 

279 arrow_field = bq_to_arrow_field(bq_field) 

280 if arrow_field is None: 

281 # Auto-detect the schema if there is an unknown field type. 

282 return None 

283 arrow_fields.append(arrow_field) 

284 return pyarrow.schema(arrow_fields) 

285 

286 

287def default_types_mapper( 

288 date_as_object: bool = False, 

289 bool_dtype: Union[Any, None] = None, 

290 int_dtype: Union[Any, None] = None, 

291 float_dtype: Union[Any, None] = None, 

292 string_dtype: Union[Any, None] = None, 

293): 

294 """Create a mapping from pyarrow types to pandas types. 

295 

296 This overrides the pandas defaults to use null-safe extension types where 

297 available. 

298 

299 See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of 

300 data types. See: 

301 tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for 

302 BigQuery to Arrow type mapping. 

303 

304 Note to google-cloud-bigquery developers: If you update the default dtypes, 

305 also update the docs at docs/usage/pandas.rst. 

306 """ 

307 

308 def types_mapper(arrow_data_type): 

309 if bool_dtype is not None and pyarrow.types.is_boolean(arrow_data_type): 

310 return bool_dtype 

311 

312 elif int_dtype is not None and pyarrow.types.is_integer(arrow_data_type): 

313 return int_dtype 

314 

315 elif float_dtype is not None and pyarrow.types.is_floating(arrow_data_type): 

316 return float_dtype 

317 

318 elif string_dtype is not None and pyarrow.types.is_string(arrow_data_type): 

319 return string_dtype 

320 

321 elif ( 

322 # If date_as_object is True, we know some DATE columns are 

323 # out-of-bounds of what is supported by pandas. 

324 not date_as_object 

325 and pyarrow.types.is_date(arrow_data_type) 

326 ): 

327 return db_dtypes.DateDtype() 

328 

329 elif pyarrow.types.is_time(arrow_data_type): 

330 return db_dtypes.TimeDtype() 

331 

332 return types_mapper 

333 

334 

335def bq_to_arrow_array(series, bq_field): 

336 if bq_field.field_type.upper() == "GEOGRAPHY": 

337 arrow_type = None 

338 first = _first_valid(series) 

339 if first is not None: 

340 if series.dtype.name == "geometry" or isinstance(first, _BaseGeometry): 

341 arrow_type = pyarrow.binary() 

342 # Convert shapey geometry to WKB binary format: 

343 series = series.apply(_to_wkb) 

344 elif isinstance(first, bytes): 

345 arrow_type = pyarrow.binary() 

346 elif series.dtype.name == "geometry": 

347 # We have a GeoSeries containing all nulls, convert it to a pandas series 

348 series = pandas.Series(numpy.array(series)) 

349 

350 if arrow_type is None: 

351 arrow_type = bq_to_arrow_data_type(bq_field) 

352 else: 

353 arrow_type = bq_to_arrow_data_type(bq_field) 

354 

355 field_type_upper = bq_field.field_type.upper() if bq_field.field_type else "" 

356 

357 if bq_field.mode.upper() == "REPEATED": 

358 return pyarrow.ListArray.from_pandas(series, type=arrow_type) 

359 if field_type_upper in schema._STRUCT_TYPES: 

360 return pyarrow.StructArray.from_pandas(series, type=arrow_type) 

361 return pyarrow.Array.from_pandas(series, type=arrow_type) 

362 

363 

364def get_column_or_index(dataframe, name): 

365 """Return a column or index as a pandas series.""" 

366 if name in dataframe.columns: 

367 return dataframe[name].reset_index(drop=True) 

368 

369 if isinstance(dataframe.index, pandas.MultiIndex): 

370 if name in dataframe.index.names: 

371 return ( 

372 dataframe.index.get_level_values(name) 

373 .to_series() 

374 .reset_index(drop=True) 

375 ) 

376 else: 

377 if name == dataframe.index.name: 

378 return dataframe.index.to_series().reset_index(drop=True) 

379 

380 raise ValueError("column or index '{}' not found.".format(name)) 

381 

382 

383def list_columns_and_indexes(dataframe): 

384 """Return all index and column names with dtypes. 

385 

386 Returns: 

387 Sequence[Tuple[str, dtype]]: 

388 Returns a sorted list of indexes and column names with 

389 corresponding dtypes. If an index is missing a name or has the 

390 same name as a column, the index is omitted. 

391 """ 

392 column_names = frozenset(dataframe.columns) 

393 columns_and_indexes = [] 

394 if isinstance(dataframe.index, pandas.MultiIndex): 

395 for name in dataframe.index.names: 

396 if name and name not in column_names: 

397 values = dataframe.index.get_level_values(name) 

398 columns_and_indexes.append((name, values.dtype)) 

399 else: 

400 if dataframe.index.name and dataframe.index.name not in column_names: 

401 columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype)) 

402 

403 columns_and_indexes += zip(dataframe.columns, dataframe.dtypes) 

404 return columns_and_indexes 

405 

406 

407def _first_valid(series): 

408 first_valid_index = series.first_valid_index() 

409 if first_valid_index is not None: 

410 return series.at[first_valid_index] 

411 

412 

413def _first_array_valid(series): 

414 """Return the first "meaningful" element from the array series. 

415 

416 Here, "meaningful" means the first non-None element in one of the arrays that can 

417 be used for type detextion. 

418 """ 

419 first_valid_index = series.first_valid_index() 

420 if first_valid_index is None: 

421 return None 

422 

423 valid_array = series.at[first_valid_index] 

424 valid_item = next((item for item in valid_array if not pandas.isna(item)), None) 

425 

426 if valid_item is not None: 

427 return valid_item 

428 

429 # Valid item is None because all items in the "valid" array are invalid. Try 

430 # to find a true valid array manually. 

431 for array in islice(series, first_valid_index + 1, None): 

432 try: 

433 array_iter = iter(array) 

434 except TypeError: 

435 continue # Not an array, apparently, e.g. None, thus skip. 

436 valid_item = next((item for item in array_iter if not pandas.isna(item)), None) 

437 if valid_item is not None: 

438 break 

439 

440 return valid_item 

441 

442 

443def dataframe_to_bq_schema(dataframe, bq_schema): 

444 """Convert a pandas DataFrame schema to a BigQuery schema. 

445 

446 Args: 

447 dataframe (pandas.DataFrame): 

448 DataFrame for which the client determines the BigQuery schema. 

449 bq_schema (Sequence[Union[ \ 

450 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

451 Mapping[str, Any] \ 

452 ]]): 

453 A BigQuery schema. Use this argument to override the autodetected 

454 type for some or all of the DataFrame columns. 

455 

456 Returns: 

457 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]: 

458 The automatically determined schema. Returns None if the type of 

459 any column cannot be determined. 

460 """ 

461 if bq_schema: 

462 bq_schema = schema._to_schema_fields(bq_schema) 

463 bq_schema_index = {field.name: field for field in bq_schema} 

464 bq_schema_unused = set(bq_schema_index.keys()) 

465 else: 

466 bq_schema_index = {} 

467 bq_schema_unused = set() 

468 

469 bq_schema_out = [] 

470 unknown_type_fields = [] 

471 

472 for column, dtype in list_columns_and_indexes(dataframe): 

473 # Use provided type from schema, if present. 

474 bq_field = bq_schema_index.get(column) 

475 if bq_field: 

476 bq_schema_out.append(bq_field) 

477 bq_schema_unused.discard(bq_field.name) 

478 continue 

479 

480 # Otherwise, try to automatically determine the type based on the 

481 # pandas dtype. 

482 bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name) 

483 if bq_type is None: 

484 sample_data = _first_valid(dataframe[column]) 

485 if ( 

486 isinstance(sample_data, _BaseGeometry) 

487 and sample_data is not None # Paranoia 

488 ): 

489 bq_type = "GEOGRAPHY" 

490 bq_field = schema.SchemaField(column, bq_type) 

491 bq_schema_out.append(bq_field) 

492 

493 if bq_field.field_type is None: 

494 unknown_type_fields.append(bq_field) 

495 

496 # Catch any schema mismatch. The developer explicitly asked to serialize a 

497 # column, but it was not found. 

498 if bq_schema_unused: 

499 raise ValueError( 

500 "bq_schema contains fields not present in dataframe: {}".format( 

501 bq_schema_unused 

502 ) 

503 ) 

504 

505 # If schema detection was not successful for all columns, also try with 

506 # pyarrow, if available. 

507 if unknown_type_fields: 

508 if not pyarrow: 

509 msg = "Could not determine the type of columns: {}".format( 

510 ", ".join(field.name for field in unknown_type_fields) 

511 ) 

512 warnings.warn(msg) 

513 return None # We cannot detect the schema in full. 

514 

515 # The augment_schema() helper itself will also issue unknown type 

516 # warnings if detection still fails for any of the fields. 

517 bq_schema_out = augment_schema(dataframe, bq_schema_out) 

518 

519 return tuple(bq_schema_out) if bq_schema_out else None 

520 

521 

522def augment_schema(dataframe, current_bq_schema): 

523 """Try to deduce the unknown field types and return an improved schema. 

524 

525 This function requires ``pyarrow`` to run. If all the missing types still 

526 cannot be detected, ``None`` is returned. If all types are already known, 

527 a shallow copy of the given schema is returned. 

528 

529 Args: 

530 dataframe (pandas.DataFrame): 

531 DataFrame for which some of the field types are still unknown. 

532 current_bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): 

533 A BigQuery schema for ``dataframe``. The types of some or all of 

534 the fields may be ``None``. 

535 Returns: 

536 Optional[Sequence[google.cloud.bigquery.schema.SchemaField]] 

537 """ 

538 # pytype: disable=attribute-error 

539 augmented_schema = [] 

540 unknown_type_fields = [] 

541 

542 for field in current_bq_schema: 

543 if field.field_type is not None: 

544 augmented_schema.append(field) 

545 continue 

546 

547 arrow_table = pyarrow.array(dataframe[field.name]) 

548 

549 if pyarrow.types.is_list(arrow_table.type): 

550 # `pyarrow.ListType` 

551 detected_mode = "REPEATED" 

552 detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id) 

553 

554 # For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds 

555 # it to such datetimes, causing them to be recognized as TIMESTAMP type. 

556 # We thus additionally check the actual data to see if we need to overrule 

557 # that and choose DATETIME instead. 

558 # Note that this should only be needed for datetime values inside a list, 

559 # since scalar datetime values have a proper Pandas dtype that allows 

560 # distinguishing between timezone-naive and timezone-aware values before 

561 # even requiring the additional schema augment logic in this method. 

562 if detected_type == "TIMESTAMP": 

563 valid_item = _first_array_valid(dataframe[field.name]) 

564 if isinstance(valid_item, datetime) and valid_item.tzinfo is None: 

565 detected_type = "DATETIME" 

566 else: 

567 detected_mode = field.mode 

568 detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id) 

569 

570 if detected_type is None: 

571 unknown_type_fields.append(field) 

572 continue 

573 

574 new_field = schema.SchemaField( 

575 name=field.name, 

576 field_type=detected_type, 

577 mode=detected_mode, 

578 description=field.description, 

579 fields=field.fields, 

580 ) 

581 augmented_schema.append(new_field) 

582 

583 if unknown_type_fields: 

584 warnings.warn( 

585 "Pyarrow could not determine the type of columns: {}.".format( 

586 ", ".join(field.name for field in unknown_type_fields) 

587 ) 

588 ) 

589 return None 

590 

591 return augmented_schema 

592 # pytype: enable=attribute-error 

593 

594 

595def dataframe_to_arrow(dataframe, bq_schema): 

596 """Convert pandas dataframe to Arrow table, using BigQuery schema. 

597 

598 Args: 

599 dataframe (pandas.DataFrame): 

600 DataFrame to convert to Arrow table. 

601 bq_schema (Sequence[Union[ \ 

602 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

603 Mapping[str, Any] \ 

604 ]]): 

605 Desired BigQuery schema. The number of columns must match the 

606 number of columns in the DataFrame. 

607 

608 Returns: 

609 pyarrow.Table: 

610 Table containing dataframe data, with schema derived from 

611 BigQuery schema. 

612 """ 

613 column_names = set(dataframe.columns) 

614 column_and_index_names = set( 

615 name for name, _ in list_columns_and_indexes(dataframe) 

616 ) 

617 

618 bq_schema = schema._to_schema_fields(bq_schema) 

619 bq_field_names = set(field.name for field in bq_schema) 

620 

621 extra_fields = bq_field_names - column_and_index_names 

622 if extra_fields: 

623 raise ValueError( 

624 "bq_schema contains fields not present in dataframe: {}".format( 

625 extra_fields 

626 ) 

627 ) 

628 

629 # It's okay for indexes to be missing from bq_schema, but it's not okay to 

630 # be missing columns. 

631 missing_fields = column_names - bq_field_names 

632 if missing_fields: 

633 raise ValueError( 

634 "bq_schema is missing fields from dataframe: {}".format(missing_fields) 

635 ) 

636 

637 arrow_arrays = [] 

638 arrow_names = [] 

639 arrow_fields = [] 

640 for bq_field in bq_schema: 

641 arrow_names.append(bq_field.name) 

642 arrow_arrays.append( 

643 bq_to_arrow_array(get_column_or_index(dataframe, bq_field.name), bq_field) 

644 ) 

645 arrow_fields.append(bq_to_arrow_field(bq_field, arrow_arrays[-1].type)) 

646 

647 if all((field is not None for field in arrow_fields)): 

648 return pyarrow.Table.from_arrays( 

649 arrow_arrays, schema=pyarrow.schema(arrow_fields) 

650 ) 

651 return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) 

652 

653 

654def dataframe_to_parquet( 

655 dataframe, 

656 bq_schema, 

657 filepath, 

658 parquet_compression="SNAPPY", 

659 parquet_use_compliant_nested_type=True, 

660): 

661 """Write dataframe as a Parquet file, according to the desired BQ schema. 

662 

663 This function requires the :mod:`pyarrow` package. Arrow is used as an 

664 intermediate format. 

665 

666 Args: 

667 dataframe (pandas.DataFrame): 

668 DataFrame to convert to Parquet file. 

669 bq_schema (Sequence[Union[ \ 

670 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

671 Mapping[str, Any] \ 

672 ]]): 

673 Desired BigQuery schema. Number of columns must match number of 

674 columns in the DataFrame. 

675 filepath (str): 

676 Path to write Parquet file to. 

677 parquet_compression (Optional[str]): 

678 The compression codec to use by the the ``pyarrow.parquet.write_table`` 

679 serializing method. Defaults to "SNAPPY". 

680 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table 

681 parquet_use_compliant_nested_type (bool): 

682 Whether the ``pyarrow.parquet.write_table`` serializing method should write 

683 compliant Parquet nested type (lists). Defaults to ``True``. 

684 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types 

685 https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table 

686 

687 This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``. 

688 """ 

689 pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True) 

690 

691 import pyarrow.parquet # type: ignore 

692 

693 kwargs = ( 

694 {"use_compliant_nested_type": parquet_use_compliant_nested_type} 

695 if _helpers.PYARROW_VERSIONS.use_compliant_nested_type 

696 else {} 

697 ) 

698 

699 bq_schema = schema._to_schema_fields(bq_schema) 

700 arrow_table = dataframe_to_arrow(dataframe, bq_schema) 

701 pyarrow.parquet.write_table( 

702 arrow_table, 

703 filepath, 

704 compression=parquet_compression, 

705 **kwargs, 

706 ) 

707 

708 

709def _row_iterator_page_to_arrow(page, column_names, arrow_types): 

710 # Iterate over the page to force the API request to get the page data. 

711 try: 

712 next(iter(page)) 

713 except StopIteration: 

714 pass 

715 

716 arrays = [] 

717 for column_index, arrow_type in enumerate(arrow_types): 

718 arrays.append(pyarrow.array(page._columns[column_index], type=arrow_type)) 

719 

720 if isinstance(column_names, pyarrow.Schema): 

721 return pyarrow.RecordBatch.from_arrays(arrays, schema=column_names) 

722 return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) 

723 

724 

725def download_arrow_row_iterator(pages, bq_schema): 

726 """Use HTTP JSON RowIterator to construct an iterable of RecordBatches. 

727 

728 Args: 

729 pages (Iterator[:class:`google.api_core.page_iterator.Page`]): 

730 An iterator over the result pages. 

731 bq_schema (Sequence[Union[ \ 

732 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

733 Mapping[str, Any] \ 

734 ]]): 

735 A decription of the fields in result pages. 

736 Yields: 

737 :class:`pyarrow.RecordBatch` 

738 The next page of records as a ``pyarrow`` record batch. 

739 """ 

740 bq_schema = schema._to_schema_fields(bq_schema) 

741 column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] 

742 arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] 

743 

744 for page in pages: 

745 yield _row_iterator_page_to_arrow(page, column_names, arrow_types) 

746 

747 

748def _row_iterator_page_to_dataframe(page, column_names, dtypes): 

749 # Iterate over the page to force the API request to get the page data. 

750 try: 

751 next(iter(page)) 

752 except StopIteration: 

753 pass 

754 

755 columns = {} 

756 for column_index, column_name in enumerate(column_names): 

757 dtype = dtypes.get(column_name) 

758 columns[column_name] = pandas.Series(page._columns[column_index], dtype=dtype) 

759 

760 return pandas.DataFrame(columns, columns=column_names) 

761 

762 

763def download_dataframe_row_iterator(pages, bq_schema, dtypes): 

764 """Use HTTP JSON RowIterator to construct a DataFrame. 

765 

766 Args: 

767 pages (Iterator[:class:`google.api_core.page_iterator.Page`]): 

768 An iterator over the result pages. 

769 bq_schema (Sequence[Union[ \ 

770 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

771 Mapping[str, Any] \ 

772 ]]): 

773 A decription of the fields in result pages. 

774 dtypes(Mapping[str, numpy.dtype]): 

775 The types of columns in result data to hint construction of the 

776 resulting DataFrame. Not all column types have to be specified. 

777 Yields: 

778 :class:`pandas.DataFrame` 

779 The next page of records as a ``pandas.DataFrame`` record batch. 

780 """ 

781 bq_schema = schema._to_schema_fields(bq_schema) 

782 column_names = [field.name for field in bq_schema] 

783 for page in pages: 

784 yield _row_iterator_page_to_dataframe(page, column_names, dtypes) 

785 

786 

787def _bqstorage_page_to_arrow(page): 

788 return page.to_arrow() 

789 

790 

791def _bqstorage_page_to_dataframe(column_names, dtypes, page): 

792 # page.to_dataframe() does not preserve column order in some versions 

793 # of google-cloud-bigquery-storage. Access by column name to rearrange. 

794 return page.to_dataframe(dtypes=dtypes)[column_names] 

795 

796 

797def _download_table_bqstorage_stream( 

798 download_state, bqstorage_client, session, stream, worker_queue, page_to_item 

799): 

800 reader = bqstorage_client.read_rows(stream.name) 

801 

802 # Avoid deprecation warnings for passing in unnecessary read session. 

803 # https://github.com/googleapis/python-bigquery-storage/issues/229 

804 if _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional: 

805 rowstream = reader.rows() 

806 else: 

807 rowstream = reader.rows(session) 

808 

809 for page in rowstream.pages: 

810 if download_state.done: 

811 return 

812 item = page_to_item(page) 

813 worker_queue.put(item) 

814 

815 

816def _nowait(futures): 

817 """Separate finished and unfinished threads, much like 

818 :func:`concurrent.futures.wait`, but don't wait. 

819 """ 

820 done = [] 

821 not_done = [] 

822 for future in futures: 

823 if future.done(): 

824 done.append(future) 

825 else: 

826 not_done.append(future) 

827 return done, not_done 

828 

829 

830def _download_table_bqstorage( 

831 project_id, 

832 table, 

833 bqstorage_client, 

834 preserve_order=False, 

835 selected_fields=None, 

836 page_to_item=None, 

837 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

838): 

839 """Use (faster, but billable) BQ Storage API to construct DataFrame.""" 

840 

841 # Passing a BQ Storage client in implies that the BigQuery Storage library 

842 # is available and can be imported. 

843 from google.cloud import bigquery_storage 

844 

845 if "$" in table.table_id: 

846 raise ValueError( 

847 "Reading from a specific partition is not currently supported." 

848 ) 

849 if "@" in table.table_id: 

850 raise ValueError("Reading from a specific snapshot is not currently supported.") 

851 

852 requested_streams = 1 if preserve_order else 0 

853 

854 requested_session = bigquery_storage.types.ReadSession( 

855 table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW 

856 ) 

857 if selected_fields is not None: 

858 for field in selected_fields: 

859 requested_session.read_options.selected_fields.append(field.name) 

860 

861 if _ARROW_COMPRESSION_SUPPORT: 

862 requested_session.read_options.arrow_serialization_options.buffer_compression = ( 

863 ArrowSerializationOptions.CompressionCodec.LZ4_FRAME 

864 ) 

865 

866 session = bqstorage_client.create_read_session( 

867 parent="projects/{}".format(project_id), 

868 read_session=requested_session, 

869 max_stream_count=requested_streams, 

870 ) 

871 

872 _LOGGER.debug( 

873 "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format( 

874 table.project, table.dataset_id, table.table_id, session.name 

875 ) 

876 ) 

877 

878 # Avoid reading rows from an empty table. 

879 if not session.streams: 

880 return 

881 

882 total_streams = len(session.streams) 

883 

884 # Use _DownloadState to notify worker threads when to quit. 

885 # See: https://stackoverflow.com/a/29237343/101923 

886 download_state = _DownloadState() 

887 

888 # Create a queue to collect frames as they are created in each thread. 

889 # 

890 # The queue needs to be bounded by default, because if the user code processes the 

891 # fetched result pages too slowly, while at the same time new pages are rapidly being 

892 # fetched from the server, the queue can grow to the point where the process runs 

893 # out of memory. 

894 if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT: 

895 max_queue_size = total_streams 

896 elif max_queue_size is None: 

897 max_queue_size = 0 # unbounded 

898 

899 worker_queue = queue.Queue(maxsize=max_queue_size) 

900 

901 with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: 

902 try: 

903 # Manually submit jobs and wait for download to complete rather 

904 # than using pool.map because pool.map continues running in the 

905 # background even if there is an exception on the main thread. 

906 # See: https://github.com/googleapis/google-cloud-python/pull/7698 

907 not_done = [ 

908 pool.submit( 

909 _download_table_bqstorage_stream, 

910 download_state, 

911 bqstorage_client, 

912 session, 

913 stream, 

914 worker_queue, 

915 page_to_item, 

916 ) 

917 for stream in session.streams 

918 ] 

919 

920 while not_done: 

921 # Don't block on the worker threads. For performance reasons, 

922 # we want to block on the queue's get method, instead. This 

923 # prevents the queue from filling up, because the main thread 

924 # has smaller gaps in time between calls to the queue's get 

925 # method. For a detailed explaination, see: 

926 # https://friendliness.dev/2019/06/18/python-nowait/ 

927 done, not_done = _nowait(not_done) 

928 for future in done: 

929 # Call result() on any finished threads to raise any 

930 # exceptions encountered. 

931 future.result() 

932 

933 try: 

934 frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) 

935 yield frame 

936 except queue.Empty: # pragma: NO COVER 

937 continue 

938 

939 # Return any remaining values after the workers finished. 

940 while True: # pragma: NO COVER 

941 try: 

942 frame = worker_queue.get_nowait() 

943 yield frame 

944 except queue.Empty: # pragma: NO COVER 

945 break 

946 finally: 

947 # No need for a lock because reading/replacing a variable is 

948 # defined to be an atomic operation in the Python language 

949 # definition (enforced by the global interpreter lock). 

950 download_state.done = True 

951 

952 # Shutdown all background threads, now that they should know to 

953 # exit early. 

954 pool.shutdown(wait=True) 

955 

956 

957def download_arrow_bqstorage( 

958 project_id, 

959 table, 

960 bqstorage_client, 

961 preserve_order=False, 

962 selected_fields=None, 

963 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

964): 

965 return _download_table_bqstorage( 

966 project_id, 

967 table, 

968 bqstorage_client, 

969 preserve_order=preserve_order, 

970 selected_fields=selected_fields, 

971 page_to_item=_bqstorage_page_to_arrow, 

972 max_queue_size=max_queue_size, 

973 ) 

974 

975 

976def download_dataframe_bqstorage( 

977 project_id, 

978 table, 

979 bqstorage_client, 

980 column_names, 

981 dtypes, 

982 preserve_order=False, 

983 selected_fields=None, 

984 max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, 

985): 

986 page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) 

987 return _download_table_bqstorage( 

988 project_id, 

989 table, 

990 bqstorage_client, 

991 preserve_order=preserve_order, 

992 selected_fields=selected_fields, 

993 page_to_item=page_to_item, 

994 max_queue_size=max_queue_size, 

995 ) 

996 

997 

998def dataframe_to_json_generator(dataframe): 

999 for row in dataframe.itertuples(index=False, name=None): 

1000 output = {} 

1001 for column, value in zip(dataframe.columns, row): 

1002 # Omit NaN values. 

1003 is_nan = pandas.isna(value) 

1004 

1005 # isna() can also return an array-like of bools, but the latter's boolean 

1006 # value is ambiguous, hence an extra check. An array-like value is *not* 

1007 # considered a NaN, however. 

1008 if isinstance(is_nan, bool) and is_nan: 

1009 continue 

1010 output[column] = value 

1011 

1012 yield output 

1013 

1014 

1015def verify_pandas_imports(): 

1016 if pandas is None: 

1017 raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception 

1018 if db_dtypes is None: 

1019 raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception