Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/_helpers.py: 29%

335 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1# Copyright 2015 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Shared helper functions for BigQuery API classes.""" 

16 

17import base64 

18import datetime 

19import decimal 

20import math 

21import re 

22import os 

23from typing import Any, Optional, Union 

24 

25from dateutil import relativedelta 

26from google.cloud._helpers import UTC # type: ignore 

27from google.cloud._helpers import _date_from_iso8601_date 

28from google.cloud._helpers import _datetime_from_microseconds 

29from google.cloud._helpers import _RFC3339_MICROS 

30from google.cloud._helpers import _RFC3339_NO_FRACTION 

31from google.cloud._helpers import _to_bytes 

32 

33import packaging.version 

34 

35from google.cloud.bigquery.exceptions import ( 

36 LegacyBigQueryStorageError, 

37 LegacyPyarrowError, 

38) 

39 

40_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f" 

41_TIMEONLY_WO_MICROS = "%H:%M:%S" 

42_TIMEONLY_W_MICROS = "%H:%M:%S.%f" 

43_PROJECT_PREFIX_PATTERN = re.compile( 

44 r""" 

45 (?P<project_id>\S+\:[^.]+)\.(?P<dataset_id>[^.]+)(?:$|\.(?P<custom_id>[^.]+)$) 

46""", 

47 re.VERBOSE, 

48) 

49 

50# BigQuery sends INTERVAL data in "canonical format" 

51# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#interval_type 

52_INTERVAL_PATTERN = re.compile( 

53 r"(?P<calendar_sign>-?)(?P<years>\d+)-(?P<months>\d+) " 

54 r"(?P<days>-?\d+) " 

55 r"(?P<time_sign>-?)(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)\.?(?P<fraction>\d*)?$" 

56) 

57 

58_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0") 

59 

60_MIN_PYARROW_VERSION = packaging.version.Version("3.0.0") 

61 

62_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0") 

63 

64BIGQUERY_EMULATOR_HOST = "BIGQUERY_EMULATOR_HOST" 

65"""Environment variable defining host for emulator.""" 

66 

67_DEFAULT_HOST = "https://bigquery.googleapis.com" 

68"""Default host for JSON API.""" 

69 

70 

71def _get_bigquery_host(): 

72 return os.environ.get(BIGQUERY_EMULATOR_HOST, _DEFAULT_HOST) 

73 

74 

75class BQStorageVersions: 

76 """Version comparisons for google-cloud-bigqueyr-storage package.""" 

77 

78 def __init__(self): 

79 self._installed_version = None 

80 

81 @property 

82 def installed_version(self) -> packaging.version.Version: 

83 """Return the parsed version of google-cloud-bigquery-storage.""" 

84 if self._installed_version is None: 

85 from google.cloud import bigquery_storage 

86 

87 self._installed_version = packaging.version.parse( 

88 # Use 0.0.0, since it is earlier than any released version. 

89 # Legacy versions also have the same property, but 

90 # creating a LegacyVersion has been deprecated. 

91 # https://github.com/pypa/packaging/issues/321 

92 getattr(bigquery_storage, "__version__", "0.0.0") 

93 ) 

94 

95 return self._installed_version # type: ignore 

96 

97 @property 

98 def is_read_session_optional(self) -> bool: 

99 """True if read_session is optional to rows(). 

100 

101 See: https://github.com/googleapis/python-bigquery-storage/pull/228 

102 """ 

103 return self.installed_version >= _BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION 

104 

105 def verify_version(self): 

106 """Verify that a recent enough version of BigQuery Storage extra is 

107 installed. 

108 

109 The function assumes that google-cloud-bigquery-storage extra is 

110 installed, and should thus be used in places where this assumption 

111 holds. 

112 

113 Because `pip` can install an outdated version of this extra despite the 

114 constraints in `setup.py`, the calling code can use this helper to 

115 verify the version compatibility at runtime. 

116 

117 Raises: 

118 LegacyBigQueryStorageError: 

119 If the google-cloud-bigquery-storage package is outdated. 

120 """ 

121 if self.installed_version < _MIN_BQ_STORAGE_VERSION: 

122 msg = ( 

123 "Dependency google-cloud-bigquery-storage is outdated, please upgrade " 

124 f"it to version >= {_MIN_BQ_STORAGE_VERSION} (version found: {self.installed_version})." 

125 ) 

126 raise LegacyBigQueryStorageError(msg) 

127 

128 

129class PyarrowVersions: 

130 """Version comparisons for pyarrow package.""" 

131 

132 def __init__(self): 

133 self._installed_version = None 

134 

135 @property 

136 def installed_version(self) -> packaging.version.Version: 

137 """Return the parsed version of pyarrow.""" 

138 if self._installed_version is None: 

139 import pyarrow # type: ignore 

140 

141 self._installed_version = packaging.version.parse( 

142 # Use 0.0.0, since it is earlier than any released version. 

143 # Legacy versions also have the same property, but 

144 # creating a LegacyVersion has been deprecated. 

145 # https://github.com/pypa/packaging/issues/321 

146 getattr(pyarrow, "__version__", "0.0.0") 

147 ) 

148 

149 return self._installed_version 

150 

151 @property 

152 def use_compliant_nested_type(self) -> bool: 

153 return self.installed_version.major >= 4 

154 

155 def try_import(self, raise_if_error: bool = False) -> Any: 

156 """Verify that a recent enough version of pyarrow extra is 

157 installed. 

158 

159 The function assumes that pyarrow extra is installed, and should thus 

160 be used in places where this assumption holds. 

161 

162 Because `pip` can install an outdated version of this extra despite the 

163 constraints in `setup.py`, the calling code can use this helper to 

164 verify the version compatibility at runtime. 

165 

166 Returns: 

167 The ``pyarrow`` module or ``None``. 

168 

169 Raises: 

170 LegacyPyarrowError: 

171 If the pyarrow package is outdated and ``raise_if_error`` is ``True``. 

172 """ 

173 try: 

174 import pyarrow 

175 except ImportError as exc: # pragma: NO COVER 

176 if raise_if_error: 

177 raise LegacyPyarrowError( 

178 f"pyarrow package not found. Install pyarrow version >= {_MIN_PYARROW_VERSION}." 

179 ) from exc 

180 return None 

181 

182 if self.installed_version < _MIN_PYARROW_VERSION: 

183 if raise_if_error: 

184 msg = ( 

185 "Dependency pyarrow is outdated, please upgrade " 

186 f"it to version >= {_MIN_PYARROW_VERSION} (version found: {self.installed_version})." 

187 ) 

188 raise LegacyPyarrowError(msg) 

189 return None 

190 

191 return pyarrow 

192 

193 

194BQ_STORAGE_VERSIONS = BQStorageVersions() 

195PYARROW_VERSIONS = PyarrowVersions() 

196 

197 

198def _not_null(value, field): 

199 """Check whether 'value' should be coerced to 'field' type.""" 

200 return value is not None or (field is not None and field.mode != "NULLABLE") 

201 

202 

203def _int_from_json(value, field): 

204 """Coerce 'value' to an int, if set or not nullable.""" 

205 if _not_null(value, field): 

206 return int(value) 

207 

208 

209def _interval_from_json( 

210 value: Optional[str], field 

211) -> Optional[relativedelta.relativedelta]: 

212 """Coerce 'value' to an interval, if set or not nullable.""" 

213 if not _not_null(value, field): 

214 return None 

215 if value is None: 

216 raise TypeError(f"got {value} for REQUIRED field: {repr(field)}") 

217 

218 parsed = _INTERVAL_PATTERN.match(value) 

219 if parsed is None: 

220 raise ValueError(f"got interval: '{value}' with unexpected format") 

221 

222 calendar_sign = -1 if parsed.group("calendar_sign") == "-" else 1 

223 years = calendar_sign * int(parsed.group("years")) 

224 months = calendar_sign * int(parsed.group("months")) 

225 days = int(parsed.group("days")) 

226 time_sign = -1 if parsed.group("time_sign") == "-" else 1 

227 hours = time_sign * int(parsed.group("hours")) 

228 minutes = time_sign * int(parsed.group("minutes")) 

229 seconds = time_sign * int(parsed.group("seconds")) 

230 fraction = parsed.group("fraction") 

231 microseconds = time_sign * int(fraction.ljust(6, "0")[:6]) if fraction else 0 

232 

233 return relativedelta.relativedelta( 

234 years=years, 

235 months=months, 

236 days=days, 

237 hours=hours, 

238 minutes=minutes, 

239 seconds=seconds, 

240 microseconds=microseconds, 

241 ) 

242 

243 

244def _float_from_json(value, field): 

245 """Coerce 'value' to a float, if set or not nullable.""" 

246 if _not_null(value, field): 

247 return float(value) 

248 

249 

250def _decimal_from_json(value, field): 

251 """Coerce 'value' to a Decimal, if set or not nullable.""" 

252 if _not_null(value, field): 

253 return decimal.Decimal(value) 

254 

255 

256def _bool_from_json(value, field): 

257 """Coerce 'value' to a bool, if set or not nullable.""" 

258 if _not_null(value, field): 

259 return value.lower() in ["t", "true", "1"] 

260 

261 

262def _string_from_json(value, _): 

263 """NOOP string -> string coercion""" 

264 return value 

265 

266 

267def _bytes_from_json(value, field): 

268 """Base64-decode value""" 

269 if _not_null(value, field): 

270 return base64.standard_b64decode(_to_bytes(value)) 

271 

272 

273def _timestamp_from_json(value, field): 

274 """Coerce 'value' to a datetime, if set or not nullable.""" 

275 if _not_null(value, field): 

276 # value will be a integer in seconds, to microsecond precision, in UTC. 

277 return _datetime_from_microseconds(int(value)) 

278 

279 

280def _timestamp_query_param_from_json(value, field): 

281 """Coerce 'value' to a datetime, if set or not nullable. 

282 

283 Args: 

284 value (str): The timestamp. 

285 

286 field (google.cloud.bigquery.schema.SchemaField): 

287 The field corresponding to the value. 

288 

289 Returns: 

290 Optional[datetime.datetime]: 

291 The parsed datetime object from 

292 ``value`` if the ``field`` is not null (otherwise it is 

293 :data:`None`). 

294 """ 

295 if _not_null(value, field): 

296 # Canonical formats for timestamps in BigQuery are flexible. See: 

297 # g.co/cloud/bigquery/docs/reference/standard-sql/data-types#timestamp-type 

298 # The separator between the date and time can be 'T' or ' '. 

299 value = value.replace(" ", "T", 1) 

300 # The UTC timezone may be formatted as Z or +00:00. 

301 value = value.replace("Z", "") 

302 value = value.replace("+00:00", "") 

303 

304 if "." in value: 

305 # YYYY-MM-DDTHH:MM:SS.ffffff 

306 return datetime.datetime.strptime(value, _RFC3339_MICROS_NO_ZULU).replace( 

307 tzinfo=UTC 

308 ) 

309 else: 

310 # YYYY-MM-DDTHH:MM:SS 

311 return datetime.datetime.strptime(value, _RFC3339_NO_FRACTION).replace( 

312 tzinfo=UTC 

313 ) 

314 else: 

315 return None 

316 

317 

318def _datetime_from_json(value, field): 

319 """Coerce 'value' to a datetime, if set or not nullable. 

320 

321 Args: 

322 value (str): The timestamp. 

323 field (google.cloud.bigquery.schema.SchemaField): 

324 The field corresponding to the value. 

325 

326 Returns: 

327 Optional[datetime.datetime]: 

328 The parsed datetime object from 

329 ``value`` if the ``field`` is not null (otherwise it is 

330 :data:`None`). 

331 """ 

332 if _not_null(value, field): 

333 if "." in value: 

334 # YYYY-MM-DDTHH:MM:SS.ffffff 

335 return datetime.datetime.strptime(value, _RFC3339_MICROS_NO_ZULU) 

336 else: 

337 # YYYY-MM-DDTHH:MM:SS 

338 return datetime.datetime.strptime(value, _RFC3339_NO_FRACTION) 

339 else: 

340 return None 

341 

342 

343def _date_from_json(value, field): 

344 """Coerce 'value' to a datetime date, if set or not nullable""" 

345 if _not_null(value, field): 

346 # value will be a string, in YYYY-MM-DD form. 

347 return _date_from_iso8601_date(value) 

348 

349 

350def _time_from_json(value, field): 

351 """Coerce 'value' to a datetime date, if set or not nullable""" 

352 if _not_null(value, field): 

353 if len(value) == 8: # HH:MM:SS 

354 fmt = _TIMEONLY_WO_MICROS 

355 elif len(value) == 15: # HH:MM:SS.micros 

356 fmt = _TIMEONLY_W_MICROS 

357 else: 

358 raise ValueError("Unknown time format: {}".format(value)) 

359 return datetime.datetime.strptime(value, fmt).time() 

360 

361 

362def _record_from_json(value, field): 

363 """Coerce 'value' to a mapping, if set or not nullable.""" 

364 if _not_null(value, field): 

365 record = {} 

366 record_iter = zip(field.fields, value["f"]) 

367 for subfield, cell in record_iter: 

368 converter = _CELLDATA_FROM_JSON[subfield.field_type] 

369 if subfield.mode == "REPEATED": 

370 value = [converter(item["v"], subfield) for item in cell["v"]] 

371 else: 

372 value = converter(cell["v"], subfield) 

373 record[subfield.name] = value 

374 return record 

375 

376 

377_CELLDATA_FROM_JSON = { 

378 "INTEGER": _int_from_json, 

379 "INT64": _int_from_json, 

380 "INTERVAL": _interval_from_json, 

381 "FLOAT": _float_from_json, 

382 "FLOAT64": _float_from_json, 

383 "NUMERIC": _decimal_from_json, 

384 "BIGNUMERIC": _decimal_from_json, 

385 "BOOLEAN": _bool_from_json, 

386 "BOOL": _bool_from_json, 

387 "STRING": _string_from_json, 

388 "GEOGRAPHY": _string_from_json, 

389 "BYTES": _bytes_from_json, 

390 "TIMESTAMP": _timestamp_from_json, 

391 "DATETIME": _datetime_from_json, 

392 "DATE": _date_from_json, 

393 "TIME": _time_from_json, 

394 "RECORD": _record_from_json, 

395} 

396 

397_QUERY_PARAMS_FROM_JSON = dict(_CELLDATA_FROM_JSON) 

398_QUERY_PARAMS_FROM_JSON["TIMESTAMP"] = _timestamp_query_param_from_json 

399 

400 

401def _field_to_index_mapping(schema): 

402 """Create a mapping from schema field name to index of field.""" 

403 return {f.name: i for i, f in enumerate(schema)} 

404 

405 

406def _field_from_json(resource, field): 

407 converter = _CELLDATA_FROM_JSON.get(field.field_type, lambda value, _: value) 

408 if field.mode == "REPEATED": 

409 return [converter(item["v"], field) for item in resource] 

410 else: 

411 return converter(resource, field) 

412 

413 

414def _row_tuple_from_json(row, schema): 

415 """Convert JSON row data to row with appropriate types. 

416 

417 Note: ``row['f']`` and ``schema`` are presumed to be of the same length. 

418 

419 Args: 

420 row (Dict): A JSON response row to be converted. 

421 schema (Sequence[Union[ \ 

422 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

423 Mapping[str, Any] \ 

424 ]]): Specification of the field types in ``row``. 

425 

426 Returns: 

427 Tuple: A tuple of data converted to native types. 

428 """ 

429 from google.cloud.bigquery.schema import _to_schema_fields 

430 

431 schema = _to_schema_fields(schema) 

432 

433 row_data = [] 

434 for field, cell in zip(schema, row["f"]): 

435 row_data.append(_field_from_json(cell["v"], field)) 

436 return tuple(row_data) 

437 

438 

439def _rows_from_json(values, schema): 

440 """Convert JSON row data to rows with appropriate types. 

441 

442 Args: 

443 values (Sequence[Dict]): The list of responses (JSON rows) to convert. 

444 schema (Sequence[Union[ \ 

445 :class:`~google.cloud.bigquery.schema.SchemaField`, \ 

446 Mapping[str, Any] \ 

447 ]]): 

448 The table's schema. If any item is a mapping, its content must be 

449 compatible with 

450 :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. 

451 

452 Returns: 

453 List[:class:`~google.cloud.bigquery.Row`] 

454 """ 

455 from google.cloud.bigquery import Row 

456 from google.cloud.bigquery.schema import _to_schema_fields 

457 

458 schema = _to_schema_fields(schema) 

459 field_to_index = _field_to_index_mapping(schema) 

460 return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values] 

461 

462 

463def _int_to_json(value): 

464 """Coerce 'value' to an JSON-compatible representation.""" 

465 if isinstance(value, int): 

466 value = str(value) 

467 return value 

468 

469 

470def _float_to_json(value) -> Union[None, str, float]: 

471 """Coerce 'value' to an JSON-compatible representation.""" 

472 if value is None: 

473 return None 

474 

475 if isinstance(value, str): 

476 value = float(value) 

477 

478 return str(value) if (math.isnan(value) or math.isinf(value)) else float(value) 

479 

480 

481def _decimal_to_json(value): 

482 """Coerce 'value' to a JSON-compatible representation.""" 

483 if isinstance(value, decimal.Decimal): 

484 value = str(value) 

485 return value 

486 

487 

488def _bool_to_json(value): 

489 """Coerce 'value' to an JSON-compatible representation.""" 

490 if isinstance(value, bool): 

491 value = "true" if value else "false" 

492 return value 

493 

494 

495def _bytes_to_json(value): 

496 """Coerce 'value' to an JSON-compatible representation.""" 

497 if isinstance(value, bytes): 

498 value = base64.standard_b64encode(value).decode("ascii") 

499 return value 

500 

501 

502def _timestamp_to_json_parameter(value): 

503 """Coerce 'value' to an JSON-compatible representation. 

504 

505 This version returns the string representation used in query parameters. 

506 """ 

507 if isinstance(value, datetime.datetime): 

508 if value.tzinfo not in (None, UTC): 

509 # Convert to UTC and remove the time zone info. 

510 value = value.replace(tzinfo=None) - value.utcoffset() 

511 value = "%s %s+00:00" % (value.date().isoformat(), value.time().isoformat()) 

512 return value 

513 

514 

515def _timestamp_to_json_row(value): 

516 """Coerce 'value' to an JSON-compatible representation.""" 

517 if isinstance(value, datetime.datetime): 

518 # For naive datetime objects UTC timezone is assumed, thus we format 

519 # those to string directly without conversion. 

520 if value.tzinfo is not None: 

521 value = value.astimezone(UTC) 

522 value = value.strftime(_RFC3339_MICROS) 

523 return value 

524 

525 

526def _datetime_to_json(value): 

527 """Coerce 'value' to an JSON-compatible representation.""" 

528 if isinstance(value, datetime.datetime): 

529 # For naive datetime objects UTC timezone is assumed, thus we format 

530 # those to string directly without conversion. 

531 if value.tzinfo is not None: 

532 value = value.astimezone(UTC) 

533 value = value.strftime(_RFC3339_MICROS_NO_ZULU) 

534 return value 

535 

536 

537def _date_to_json(value): 

538 """Coerce 'value' to an JSON-compatible representation.""" 

539 if isinstance(value, datetime.date): 

540 value = value.isoformat() 

541 return value 

542 

543 

544def _time_to_json(value): 

545 """Coerce 'value' to an JSON-compatible representation.""" 

546 if isinstance(value, datetime.time): 

547 value = value.isoformat() 

548 return value 

549 

550 

551# Converters used for scalar values marshalled as row data. 

552_SCALAR_VALUE_TO_JSON_ROW = { 

553 "INTEGER": _int_to_json, 

554 "INT64": _int_to_json, 

555 "FLOAT": _float_to_json, 

556 "FLOAT64": _float_to_json, 

557 "NUMERIC": _decimal_to_json, 

558 "BIGNUMERIC": _decimal_to_json, 

559 "BOOLEAN": _bool_to_json, 

560 "BOOL": _bool_to_json, 

561 "BYTES": _bytes_to_json, 

562 "TIMESTAMP": _timestamp_to_json_row, 

563 "DATETIME": _datetime_to_json, 

564 "DATE": _date_to_json, 

565 "TIME": _time_to_json, 

566 # Make sure DECIMAL and BIGDECIMAL are handled, even though 

567 # requests for them should be converted to NUMERIC. Better safe 

568 # than sorry. 

569 "DECIMAL": _decimal_to_json, 

570 "BIGDECIMAL": _decimal_to_json, 

571} 

572 

573 

574# Converters used for scalar values marshalled as query parameters. 

575_SCALAR_VALUE_TO_JSON_PARAM = _SCALAR_VALUE_TO_JSON_ROW.copy() 

576_SCALAR_VALUE_TO_JSON_PARAM["TIMESTAMP"] = _timestamp_to_json_parameter 

577 

578 

579def _scalar_field_to_json(field, row_value): 

580 """Maps a field and value to a JSON-safe value. 

581 

582 Args: 

583 field (google.cloud.bigquery.schema.SchemaField): 

584 The SchemaField to use for type conversion and field name. 

585 row_value (Any): 

586 Value to be converted, based on the field's type. 

587 

588 Returns: 

589 Any: A JSON-serializable object. 

590 """ 

591 converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type) 

592 if converter is None: # STRING doesn't need converting 

593 return row_value 

594 return converter(row_value) 

595 

596 

597def _repeated_field_to_json(field, row_value): 

598 """Convert a repeated/array field to its JSON representation. 

599 

600 Args: 

601 field (google.cloud.bigquery.schema.SchemaField): 

602 The SchemaField to use for type conversion and field name. The 

603 field mode must equal ``REPEATED``. 

604 row_value (Sequence[Any]): 

605 A sequence of values to convert to JSON-serializable values. 

606 

607 Returns: 

608 List[Any]: A list of JSON-serializable objects. 

609 """ 

610 values = [] 

611 for item in row_value: 

612 values.append(_single_field_to_json(field, item)) 

613 return values 

614 

615 

616def _record_field_to_json(fields, row_value): 

617 """Convert a record/struct field to its JSON representation. 

618 

619 Args: 

620 fields (Sequence[google.cloud.bigquery.schema.SchemaField]): 

621 The :class:`~google.cloud.bigquery.schema.SchemaField`s of the 

622 record's subfields to use for type conversion and field names. 

623 row_value (Union[Tuple[Any], Mapping[str, Any]): 

624 A tuple or dictionary to convert to JSON-serializable values. 

625 

626 Returns: 

627 Mapping[str, Any]: A JSON-serializable dictionary. 

628 """ 

629 isdict = isinstance(row_value, dict) 

630 

631 # If row is passed as a tuple, make the length sanity check to avoid either 

632 # uninformative index errors a few lines below or silently omitting some of 

633 # the values from the result (we cannot know exactly which fields are missing 

634 # or redundant, since we don't have their names). 

635 if not isdict and len(row_value) != len(fields): 

636 msg = "The number of row fields ({}) does not match schema length ({}).".format( 

637 len(row_value), len(fields) 

638 ) 

639 raise ValueError(msg) 

640 

641 record = {} 

642 

643 if isdict: 

644 processed_fields = set() 

645 

646 for subindex, subfield in enumerate(fields): 

647 subname = subfield.name 

648 subvalue = row_value.get(subname) if isdict else row_value[subindex] 

649 

650 # None values are unconditionally omitted 

651 if subvalue is not None: 

652 record[subname] = _field_to_json(subfield, subvalue) 

653 

654 if isdict: 

655 processed_fields.add(subname) 

656 

657 # Unknown fields should not be silently dropped, include them. Since there 

658 # is no schema information available for them, include them as strings 

659 # to make them JSON-serializable. 

660 if isdict: 

661 not_processed = set(row_value.keys()) - processed_fields 

662 

663 for field_name in not_processed: 

664 value = row_value[field_name] 

665 if value is not None: 

666 record[field_name] = str(value) 

667 

668 return record 

669 

670 

671def _single_field_to_json(field, row_value): 

672 """Convert a single field into JSON-serializable values. 

673 

674 Ignores mode so that this can function for ARRAY / REPEATING fields 

675 without requiring a deepcopy of the field. See: 

676 https://github.com/googleapis/python-bigquery/issues/6 

677 

678 Args: 

679 field (google.cloud.bigquery.schema.SchemaField): 

680 The SchemaField to use for type conversion and field name. 

681 

682 row_value (Any): 

683 Scalar or Struct to be inserted. The type 

684 is inferred from the SchemaField's field_type. 

685 

686 Returns: 

687 Any: A JSON-serializable object. 

688 """ 

689 if row_value is None: 

690 return None 

691 

692 if field.field_type == "RECORD": 

693 return _record_field_to_json(field.fields, row_value) 

694 

695 return _scalar_field_to_json(field, row_value) 

696 

697 

698def _field_to_json(field, row_value): 

699 """Convert a field into JSON-serializable values. 

700 

701 Args: 

702 field (google.cloud.bigquery.schema.SchemaField): 

703 The SchemaField to use for type conversion and field name. 

704 

705 row_value (Union[Sequence[List], Any]): 

706 Row data to be inserted. If the SchemaField's mode is 

707 REPEATED, assume this is a list. If not, the type 

708 is inferred from the SchemaField's field_type. 

709 

710 Returns: 

711 Any: A JSON-serializable object. 

712 """ 

713 if row_value is None: 

714 return None 

715 

716 if field.mode == "REPEATED": 

717 return _repeated_field_to_json(field, row_value) 

718 

719 return _single_field_to_json(field, row_value) 

720 

721 

722def _snake_to_camel_case(value): 

723 """Convert snake case string to camel case.""" 

724 words = value.split("_") 

725 return words[0] + "".join(map(str.capitalize, words[1:])) 

726 

727 

728def _get_sub_prop(container, keys, default=None): 

729 """Get a nested value from a dictionary. 

730 

731 This method works like ``dict.get(key)``, but for nested values. 

732 

733 Args: 

734 container (Dict): 

735 A dictionary which may contain other dictionaries as values. 

736 keys (Iterable): 

737 A sequence of keys to attempt to get the value for. If ``keys`` is a 

738 string, it is treated as sequence containing a single string key. Each item 

739 in the sequence represents a deeper nesting. The first key is for 

740 the top level. If there is a dictionary there, the second key 

741 attempts to get the value within that, and so on. 

742 default (Optional[object]): 

743 Value to returned if any of the keys are not found. 

744 Defaults to ``None``. 

745 

746 Examples: 

747 Get a top-level value (equivalent to ``container.get('key')``). 

748 

749 >>> _get_sub_prop({'key': 'value'}, ['key']) 

750 'value' 

751 

752 Get a top-level value, providing a default (equivalent to 

753 ``container.get('key', default='default')``). 

754 

755 >>> _get_sub_prop({'nothere': 123}, ['key'], default='not found') 

756 'not found' 

757 

758 Get a nested value. 

759 

760 >>> _get_sub_prop({'key': {'subkey': 'value'}}, ['key', 'subkey']) 

761 'value' 

762 

763 Returns: 

764 object: The value if present or the default. 

765 """ 

766 if isinstance(keys, str): 

767 keys = [keys] 

768 

769 sub_val = container 

770 for key in keys: 

771 if key not in sub_val: 

772 return default 

773 sub_val = sub_val[key] 

774 return sub_val 

775 

776 

777def _set_sub_prop(container, keys, value): 

778 """Set a nested value in a dictionary. 

779 

780 Args: 

781 container (Dict): 

782 A dictionary which may contain other dictionaries as values. 

783 keys (Iterable): 

784 A sequence of keys to attempt to set the value for. If ``keys`` is a 

785 string, it is treated as sequence containing a single string key. Each item 

786 in the sequence represents a deeper nesting. The first key is for 

787 the top level. If there is a dictionary there, the second key 

788 attempts to get the value within that, and so on. 

789 value (object): Value to set within the container. 

790 

791 Examples: 

792 Set a top-level value (equivalent to ``container['key'] = 'value'``). 

793 

794 >>> container = {} 

795 >>> _set_sub_prop(container, ['key'], 'value') 

796 >>> container 

797 {'key': 'value'} 

798 

799 Set a nested value. 

800 

801 >>> container = {} 

802 >>> _set_sub_prop(container, ['key', 'subkey'], 'value') 

803 >>> container 

804 {'key': {'subkey': 'value'}} 

805 

806 Replace a nested value. 

807 

808 >>> container = {'key': {'subkey': 'prev'}} 

809 >>> _set_sub_prop(container, ['key', 'subkey'], 'new') 

810 >>> container 

811 {'key': {'subkey': 'new'}} 

812 """ 

813 if isinstance(keys, str): 

814 keys = [keys] 

815 

816 sub_val = container 

817 for key in keys[:-1]: 

818 if key not in sub_val: 

819 sub_val[key] = {} 

820 sub_val = sub_val[key] 

821 sub_val[keys[-1]] = value 

822 

823 

824def _del_sub_prop(container, keys): 

825 """Remove a nested key fro a dictionary. 

826 

827 Args: 

828 container (Dict): 

829 A dictionary which may contain other dictionaries as values. 

830 keys (Iterable): 

831 A sequence of keys to attempt to clear the value for. Each item in 

832 the sequence represents a deeper nesting. The first key is for 

833 the top level. If there is a dictionary there, the second key 

834 attempts to get the value within that, and so on. 

835 

836 Examples: 

837 Remove a top-level value (equivalent to ``del container['key']``). 

838 

839 >>> container = {'key': 'value'} 

840 >>> _del_sub_prop(container, ['key']) 

841 >>> container 

842 {} 

843 

844 Remove a nested value. 

845 

846 >>> container = {'key': {'subkey': 'value'}} 

847 >>> _del_sub_prop(container, ['key', 'subkey']) 

848 >>> container 

849 {'key': {}} 

850 """ 

851 sub_val = container 

852 for key in keys[:-1]: 

853 if key not in sub_val: 

854 sub_val[key] = {} 

855 sub_val = sub_val[key] 

856 if keys[-1] in sub_val: 

857 del sub_val[keys[-1]] 

858 

859 

860def _int_or_none(value): 

861 """Helper: deserialize int value from JSON string.""" 

862 if isinstance(value, int): 

863 return value 

864 if value is not None: 

865 return int(value) 

866 

867 

868def _str_or_none(value): 

869 """Helper: serialize value to JSON string.""" 

870 if value is not None: 

871 return str(value) 

872 

873 

874def _split_id(full_id): 

875 """Helper: split full_id into composite parts. 

876 

877 Args: 

878 full_id (str): Fully-qualified ID in standard SQL format. 

879 

880 Returns: 

881 List[str]: ID's parts separated into components. 

882 """ 

883 with_prefix = _PROJECT_PREFIX_PATTERN.match(full_id) 

884 if with_prefix is None: 

885 parts = full_id.split(".") 

886 else: 

887 parts = with_prefix.groups() 

888 parts = [part for part in parts if part] 

889 return parts 

890 

891 

892def _parse_3_part_id(full_id, default_project=None, property_name="table_id"): 

893 output_project_id = default_project 

894 output_dataset_id = None 

895 output_resource_id = None 

896 parts = _split_id(full_id) 

897 

898 if len(parts) != 2 and len(parts) != 3: 

899 raise ValueError( 

900 "{property_name} must be a fully-qualified ID in " 

901 'standard SQL format, e.g., "project.dataset.{property_name}", ' 

902 "got {}".format(full_id, property_name=property_name) 

903 ) 

904 

905 if len(parts) == 2 and not default_project: 

906 raise ValueError( 

907 "When default_project is not set, {property_name} must be a " 

908 "fully-qualified ID in standard SQL format, " 

909 'e.g., "project.dataset_id.{property_name}", got {}'.format( 

910 full_id, property_name=property_name 

911 ) 

912 ) 

913 

914 if len(parts) == 2: 

915 output_dataset_id, output_resource_id = parts 

916 else: 

917 output_project_id, output_dataset_id, output_resource_id = parts 

918 

919 return output_project_id, output_dataset_id, output_resource_id 

920 

921 

922def _build_resource_from_properties(obj, filter_fields): 

923 """Build a resource based on a ``_properties`` dictionary, filtered by 

924 ``filter_fields``, which follow the name of the Python object. 

925 """ 

926 partial = {} 

927 for filter_field in filter_fields: 

928 api_field = obj._PROPERTY_TO_API_FIELD.get(filter_field) 

929 if api_field is None and filter_field not in obj._properties: 

930 raise ValueError("No property %s" % filter_field) 

931 elif api_field is not None: 

932 partial[api_field] = obj._properties.get(api_field) 

933 else: 

934 # allows properties that are not defined in the library 

935 # and properties that have the same name as API resource key 

936 partial[filter_field] = obj._properties[filter_field] 

937 

938 return partial 

939 

940 

941def _verify_job_config_type(job_config, expected_type, param_name="job_config"): 

942 if not isinstance(job_config, expected_type): 

943 msg = ( 

944 "Expected an instance of {expected_type} class for the {param_name} parameter, " 

945 "but received {param_name} = {job_config}" 

946 ) 

947 raise TypeError( 

948 msg.format( 

949 expected_type=expected_type.__name__, 

950 param_name=param_name, 

951 job_config=job_config, 

952 ) 

953 )