1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Shared helper functions for BigQuery API classes."""
16
17import base64
18import datetime
19import decimal
20import json
21import math
22import re
23import os
24import textwrap
25import warnings
26from typing import Any, Optional, Tuple, Type, Union
27
28from dateutil import relativedelta
29from google.cloud._helpers import UTC # type: ignore
30from google.cloud._helpers import _date_from_iso8601_date
31from google.cloud._helpers import _datetime_from_microseconds
32from google.cloud._helpers import _RFC3339_MICROS
33from google.cloud._helpers import _RFC3339_NO_FRACTION
34from google.cloud._helpers import _to_bytes
35from google.auth import credentials as ga_credentials # type: ignore
36from google.api_core import client_options as client_options_lib
37
38TimeoutType = Union[float, None]
39
40_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
41_TIMEONLY_WO_MICROS = "%H:%M:%S"
42_TIMEONLY_W_MICROS = "%H:%M:%S.%f"
43_PROJECT_PREFIX_PATTERN = re.compile(
44 r"""
45 (?P<project_id>\S+\:[^.]+)\.(?P<dataset_id>[^.]+)(?:$|\.(?P<custom_id>[^.]+)$)
46""",
47 re.VERBOSE,
48)
49
50# BigQuery sends INTERVAL data in "canonical format"
51# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#interval_type
52_INTERVAL_PATTERN = re.compile(
53 r"(?P<calendar_sign>-?)(?P<years>\d+)-(?P<months>\d+) "
54 r"(?P<days>-?\d+) "
55 r"(?P<time_sign>-?)(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)\.?(?P<fraction>\d*)?$"
56)
57_RANGE_PATTERN = re.compile(r"\[.*, .*\)")
58
59BIGQUERY_EMULATOR_HOST = "BIGQUERY_EMULATOR_HOST"
60"""Environment variable defining host for emulator."""
61
62_DEFAULT_HOST = "https://bigquery.googleapis.com"
63"""Default host for JSON API."""
64
65_DEFAULT_HOST_TEMPLATE = "https://bigquery.{UNIVERSE_DOMAIN}"
66""" Templatized endpoint format. """
67
68_DEFAULT_UNIVERSE = "googleapis.com"
69"""Default universe for the JSON API."""
70
71_UNIVERSE_DOMAIN_ENV = "GOOGLE_CLOUD_UNIVERSE_DOMAIN"
72"""Environment variable for setting universe domain."""
73
74_SUPPORTED_RANGE_ELEMENTS = {"TIMESTAMP", "DATETIME", "DATE"}
75
76
77def _get_client_universe(
78 client_options: Optional[Union[client_options_lib.ClientOptions, dict]]
79) -> str:
80 """Retrieves the specified universe setting.
81
82 Args:
83 client_options: specified client options.
84 Returns:
85 str: resolved universe setting.
86
87 """
88 if isinstance(client_options, dict):
89 client_options = client_options_lib.from_dict(client_options)
90 universe = _DEFAULT_UNIVERSE
91 options_universe = getattr(client_options, "universe_domain", None)
92 if (
93 options_universe
94 and isinstance(options_universe, str)
95 and len(options_universe) > 0
96 ):
97 universe = options_universe
98 else:
99 env_universe = os.getenv(_UNIVERSE_DOMAIN_ENV)
100 if isinstance(env_universe, str) and len(env_universe) > 0:
101 universe = env_universe
102 return universe
103
104
105def _validate_universe(client_universe: str, credentials: ga_credentials.Credentials):
106 """Validates that client provided universe and universe embedded in credentials match.
107
108 Args:
109 client_universe (str): The universe domain configured via the client options.
110 credentials (ga_credentials.Credentials): The credentials being used in the client.
111
112 Raises:
113 ValueError: when client_universe does not match the universe in credentials.
114 """
115 if hasattr(credentials, "universe_domain"):
116 cred_universe = getattr(credentials, "universe_domain")
117 if isinstance(cred_universe, str):
118 if client_universe != cred_universe:
119 raise ValueError(
120 "The configured universe domain "
121 f"({client_universe}) does not match the universe domain "
122 f"found in the credentials ({cred_universe}). "
123 "If you haven't configured the universe domain explicitly, "
124 f"`{_DEFAULT_UNIVERSE}` is the default."
125 )
126
127
128def _get_bigquery_host():
129 return os.environ.get(BIGQUERY_EMULATOR_HOST, _DEFAULT_HOST)
130
131
132def _not_null(value, field):
133 """Check whether 'value' should be coerced to 'field' type."""
134 return value is not None or (field is not None and field.mode != "NULLABLE")
135
136
137class CellDataParser:
138 """Converter from BigQuery REST resource to Python value for RowIterator and similar classes.
139
140 See: "rows" field of
141 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list and
142 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults.
143 """
144
145 def to_py(self, resource, field):
146 def default_converter(value, field):
147 _warn_unknown_field_type(field)
148 return value
149
150 converter = getattr(
151 self, f"{field.field_type.lower()}_to_py", default_converter
152 )
153 if field.mode == "REPEATED":
154 return [converter(item["v"], field) for item in resource]
155 else:
156 return converter(resource, field)
157
158 def bool_to_py(self, value, field):
159 """Coerce 'value' to a bool, if set or not nullable."""
160 if _not_null(value, field):
161 # TODO(tswast): Why does _not_null care if the field is NULLABLE or
162 # REQUIRED? Do we actually need such client-side validation?
163 if value is None:
164 raise TypeError(f"got None for required boolean field {field}")
165 return value.lower() in ("t", "true", "1")
166
167 def boolean_to_py(self, value, field):
168 """Coerce 'value' to a bool, if set or not nullable."""
169 return self.bool_to_py(value, field)
170
171 def integer_to_py(self, value, field):
172 """Coerce 'value' to an int, if set or not nullable."""
173 if _not_null(value, field):
174 return int(value)
175
176 def int64_to_py(self, value, field):
177 """Coerce 'value' to an int, if set or not nullable."""
178 return self.integer_to_py(value, field)
179
180 def interval_to_py(
181 self, value: Optional[str], field
182 ) -> Optional[relativedelta.relativedelta]:
183 """Coerce 'value' to an interval, if set or not nullable."""
184 if not _not_null(value, field):
185 return None
186 if value is None:
187 raise TypeError(f"got {value} for REQUIRED field: {repr(field)}")
188
189 parsed = _INTERVAL_PATTERN.match(value)
190 if parsed is None:
191 raise ValueError(
192 textwrap.dedent(
193 f"""
194 Got interval: '{value}' with unexpected format.
195 Expected interval in canonical format of "[sign]Y-M [sign]D [sign]H:M:S[.F]".
196 See:
197 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#interval_type
198 for more information.
199 """
200 ),
201 )
202
203 calendar_sign = -1 if parsed.group("calendar_sign") == "-" else 1
204 years = calendar_sign * int(parsed.group("years"))
205 months = calendar_sign * int(parsed.group("months"))
206 days = int(parsed.group("days"))
207 time_sign = -1 if parsed.group("time_sign") == "-" else 1
208 hours = time_sign * int(parsed.group("hours"))
209 minutes = time_sign * int(parsed.group("minutes"))
210 seconds = time_sign * int(parsed.group("seconds"))
211 fraction = parsed.group("fraction")
212 microseconds = time_sign * int(fraction.ljust(6, "0")[:6]) if fraction else 0
213
214 return relativedelta.relativedelta(
215 years=years,
216 months=months,
217 days=days,
218 hours=hours,
219 minutes=minutes,
220 seconds=seconds,
221 microseconds=microseconds,
222 )
223
224 def float_to_py(self, value, field):
225 """Coerce 'value' to a float, if set or not nullable."""
226 if _not_null(value, field):
227 return float(value)
228
229 def float64_to_py(self, value, field):
230 """Coerce 'value' to a float, if set or not nullable."""
231 return self.float_to_py(value, field)
232
233 def numeric_to_py(self, value, field):
234 """Coerce 'value' to a Decimal, if set or not nullable."""
235 if _not_null(value, field):
236 return decimal.Decimal(value)
237
238 def bignumeric_to_py(self, value, field):
239 """Coerce 'value' to a Decimal, if set or not nullable."""
240 return self.numeric_to_py(value, field)
241
242 def string_to_py(self, value, _):
243 """NOOP string -> string coercion"""
244 return value
245
246 def geography_to_py(self, value, _):
247 """NOOP string -> string coercion"""
248 return value
249
250 def bytes_to_py(self, value, field):
251 """Base64-decode value"""
252 if _not_null(value, field):
253 return base64.standard_b64decode(_to_bytes(value))
254
255 def timestamp_to_py(self, value, field):
256 """Coerce 'value' to a datetime, if set or not nullable."""
257 if _not_null(value, field):
258 # value will be a integer in seconds, to microsecond precision, in UTC.
259 return _datetime_from_microseconds(int(value))
260
261 def datetime_to_py(self, value, field):
262 """Coerce 'value' to a datetime, if set or not nullable.
263
264 Args:
265 value (str): The timestamp.
266 field (google.cloud.bigquery.schema.SchemaField):
267 The field corresponding to the value.
268
269 Returns:
270 Optional[datetime.datetime]:
271 The parsed datetime object from
272 ``value`` if the ``field`` is not null (otherwise it is
273 :data:`None`).
274 """
275 if _not_null(value, field):
276 if "." in value:
277 # YYYY-MM-DDTHH:MM:SS.ffffff
278 return datetime.datetime.strptime(value, _RFC3339_MICROS_NO_ZULU)
279 else:
280 # YYYY-MM-DDTHH:MM:SS
281 return datetime.datetime.strptime(value, _RFC3339_NO_FRACTION)
282 else:
283 return None
284
285 def date_to_py(self, value, field):
286 """Coerce 'value' to a datetime date, if set or not nullable"""
287 if _not_null(value, field):
288 # value will be a string, in YYYY-MM-DD form.
289 return _date_from_iso8601_date(value)
290
291 def time_to_py(self, value, field):
292 """Coerce 'value' to a datetime date, if set or not nullable"""
293 if _not_null(value, field):
294 if len(value) == 8: # HH:MM:SS
295 fmt = _TIMEONLY_WO_MICROS
296 elif len(value) == 15: # HH:MM:SS.micros
297 fmt = _TIMEONLY_W_MICROS
298 else:
299 raise ValueError(
300 textwrap.dedent(
301 f"""
302 Got {repr(value)} with unknown time format.
303 Expected HH:MM:SS or HH:MM:SS.micros. See
304 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type
305 for more information.
306 """
307 ),
308 )
309 return datetime.datetime.strptime(value, fmt).time()
310
311 def record_to_py(self, value, field):
312 """Coerce 'value' to a mapping, if set or not nullable."""
313 if _not_null(value, field):
314 record = {}
315 record_iter = zip(field.fields, value["f"])
316 for subfield, cell in record_iter:
317 record[subfield.name] = self.to_py(cell["v"], subfield)
318 return record
319
320 def struct_to_py(self, value, field):
321 """Coerce 'value' to a mapping, if set or not nullable."""
322 return self.record_to_py(value, field)
323
324 def json_to_py(self, value, field):
325 """Coerce 'value' to a Pythonic JSON representation."""
326 if _not_null(value, field):
327 return json.loads(value)
328 else:
329 return None
330
331 def _range_element_to_py(self, value, field_element_type):
332 """Coerce 'value' to a range element value."""
333 # Avoid circular imports by importing here.
334 from google.cloud.bigquery import schema
335
336 if value == "UNBOUNDED":
337 return None
338 if field_element_type.element_type in _SUPPORTED_RANGE_ELEMENTS:
339 return self.to_py(
340 value,
341 schema.SchemaField("placeholder", field_element_type.element_type),
342 )
343 else:
344 raise ValueError(
345 textwrap.dedent(
346 f"""
347 Got unsupported range element type: {field_element_type.element_type}.
348 Exptected one of {repr(_SUPPORTED_RANGE_ELEMENTS)}. See:
349 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#declare_a_range_type
350 for more information.
351 """
352 ),
353 )
354
355 def range_to_py(self, value, field):
356 """Coerce 'value' to a range, if set or not nullable.
357
358 Args:
359 value (str): The literal representation of the range.
360 field (google.cloud.bigquery.schema.SchemaField):
361 The field corresponding to the value.
362
363 Returns:
364 Optional[dict]:
365 The parsed range object from ``value`` if the ``field`` is not
366 null (otherwise it is :data:`None`).
367 """
368 if _not_null(value, field):
369 if _RANGE_PATTERN.match(value):
370 start, end = value[1:-1].split(", ")
371 start = self._range_element_to_py(start, field.range_element_type)
372 end = self._range_element_to_py(end, field.range_element_type)
373 return {"start": start, "end": end}
374 else:
375 raise ValueError(
376 textwrap.dedent(
377 f"""
378 Got unknown format for range value: {value}.
379 Expected format '[lower_bound, upper_bound)'. See:
380 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_with_literal
381 for more information.
382 """
383 ),
384 )
385
386
387CELL_DATA_PARSER = CellDataParser()
388
389
390class DataFrameCellDataParser(CellDataParser):
391 """Override of CellDataParser to handle differences in expression of values in DataFrame-like outputs.
392
393 This is used to turn the output of the REST API into a pyarrow Table,
394 emulating the serialized arrow from the BigQuery Storage Read API.
395 """
396
397 def json_to_py(self, value, _):
398 """No-op because DataFrame expects string for JSON output."""
399 return value
400
401
402DATA_FRAME_CELL_DATA_PARSER = DataFrameCellDataParser()
403
404
405class ScalarQueryParamParser(CellDataParser):
406 """Override of CellDataParser to handle the differences in the response from query params.
407
408 See: "value" field of
409 https://cloud.google.com/bigquery/docs/reference/rest/v2/QueryParameter#QueryParameterValue
410 """
411
412 def timestamp_to_py(self, value, field):
413 """Coerce 'value' to a datetime, if set or not nullable.
414
415 Args:
416 value (str): The timestamp.
417
418 field (google.cloud.bigquery.schema.SchemaField):
419 The field corresponding to the value.
420
421 Returns:
422 Optional[datetime.datetime]:
423 The parsed datetime object from
424 ``value`` if the ``field`` is not null (otherwise it is
425 :data:`None`).
426 """
427 if _not_null(value, field):
428 # Canonical formats for timestamps in BigQuery are flexible. See:
429 # g.co/cloud/bigquery/docs/reference/standard-sql/data-types#timestamp-type
430 # The separator between the date and time can be 'T' or ' '.
431 value = value.replace(" ", "T", 1)
432 # The UTC timezone may be formatted as Z or +00:00.
433 value = value.replace("Z", "")
434 value = value.replace("+00:00", "")
435
436 if "." in value:
437 # YYYY-MM-DDTHH:MM:SS.ffffff
438 return datetime.datetime.strptime(
439 value, _RFC3339_MICROS_NO_ZULU
440 ).replace(tzinfo=UTC)
441 else:
442 # YYYY-MM-DDTHH:MM:SS
443 return datetime.datetime.strptime(value, _RFC3339_NO_FRACTION).replace(
444 tzinfo=UTC
445 )
446 else:
447 return None
448
449
450SCALAR_QUERY_PARAM_PARSER = ScalarQueryParamParser()
451
452
453def _field_to_index_mapping(schema):
454 """Create a mapping from schema field name to index of field."""
455 return {f.name: i for i, f in enumerate(schema)}
456
457
458def _row_tuple_from_json(row, schema):
459 """Convert JSON row data to row with appropriate types.
460
461 Note: ``row['f']`` and ``schema`` are presumed to be of the same length.
462
463 Args:
464 row (Dict): A JSON response row to be converted.
465 schema (Sequence[Union[ \
466 :class:`~google.cloud.bigquery.schema.SchemaField`, \
467 Mapping[str, Any] \
468 ]]): Specification of the field types in ``row``.
469
470 Returns:
471 Tuple: A tuple of data converted to native types.
472 """
473 from google.cloud.bigquery.schema import _to_schema_fields
474
475 schema = _to_schema_fields(schema)
476
477 row_data = []
478 for field, cell in zip(schema, row["f"]):
479 row_data.append(CELL_DATA_PARSER.to_py(cell["v"], field))
480 return tuple(row_data)
481
482
483def _rows_from_json(values, schema):
484 """Convert JSON row data to rows with appropriate types.
485
486 Args:
487 values (Sequence[Dict]): The list of responses (JSON rows) to convert.
488 schema (Sequence[Union[ \
489 :class:`~google.cloud.bigquery.schema.SchemaField`, \
490 Mapping[str, Any] \
491 ]]):
492 The table's schema. If any item is a mapping, its content must be
493 compatible with
494 :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
495
496 Returns:
497 List[:class:`~google.cloud.bigquery.Row`]
498 """
499 from google.cloud.bigquery import Row
500 from google.cloud.bigquery.schema import _to_schema_fields
501
502 schema = _to_schema_fields(schema)
503 field_to_index = _field_to_index_mapping(schema)
504 return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values]
505
506
507def _int_to_json(value):
508 """Coerce 'value' to an JSON-compatible representation."""
509 if isinstance(value, int):
510 value = str(value)
511 return value
512
513
514def _float_to_json(value) -> Union[None, str, float]:
515 """Coerce 'value' to an JSON-compatible representation."""
516 if value is None:
517 return None
518
519 if isinstance(value, str):
520 value = float(value)
521
522 return str(value) if (math.isnan(value) or math.isinf(value)) else float(value)
523
524
525def _decimal_to_json(value):
526 """Coerce 'value' to a JSON-compatible representation."""
527 if isinstance(value, decimal.Decimal):
528 value = str(value)
529 return value
530
531
532def _bool_to_json(value):
533 """Coerce 'value' to an JSON-compatible representation."""
534 if isinstance(value, bool):
535 value = "true" if value else "false"
536 return value
537
538
539def _bytes_to_json(value):
540 """Coerce 'value' to an JSON-compatible representation."""
541 if isinstance(value, bytes):
542 value = base64.standard_b64encode(value).decode("ascii")
543 return value
544
545
546def _json_to_json(value):
547 """Coerce 'value' to a BigQuery REST API representation."""
548 if value is None:
549 return None
550 return json.dumps(value)
551
552
553def _string_to_json(value):
554 """NOOP string -> string coercion"""
555 return value
556
557
558def _timestamp_to_json_parameter(value):
559 """Coerce 'value' to an JSON-compatible representation.
560
561 This version returns the string representation used in query parameters.
562 """
563 if isinstance(value, datetime.datetime):
564 if value.tzinfo not in (None, UTC):
565 # Convert to UTC and remove the time zone info.
566 value = value.replace(tzinfo=None) - value.utcoffset()
567 value = "%s %s+00:00" % (value.date().isoformat(), value.time().isoformat())
568 return value
569
570
571def _timestamp_to_json_row(value):
572 """Coerce 'value' to an JSON-compatible representation."""
573 if isinstance(value, datetime.datetime):
574 # For naive datetime objects UTC timezone is assumed, thus we format
575 # those to string directly without conversion.
576 if value.tzinfo is not None:
577 value = value.astimezone(UTC)
578 value = value.strftime(_RFC3339_MICROS)
579 return value
580
581
582def _datetime_to_json(value):
583 """Coerce 'value' to an JSON-compatible representation."""
584 if isinstance(value, datetime.datetime):
585 # For naive datetime objects UTC timezone is assumed, thus we format
586 # those to string directly without conversion.
587 if value.tzinfo is not None:
588 value = value.astimezone(UTC)
589 value = value.strftime(_RFC3339_MICROS_NO_ZULU)
590 return value
591
592
593def _date_to_json(value):
594 """Coerce 'value' to an JSON-compatible representation."""
595 if isinstance(value, datetime.date):
596 value = value.isoformat()
597 return value
598
599
600def _time_to_json(value):
601 """Coerce 'value' to an JSON-compatible representation."""
602 if isinstance(value, datetime.time):
603 value = value.isoformat()
604 return value
605
606
607def _range_element_to_json(value, element_type=None):
608 """Coerce 'value' to an JSON-compatible representation."""
609 if value is None:
610 return None
611 elif isinstance(value, str):
612 if value.upper() in ("UNBOUNDED", "NULL"):
613 return None
614 else:
615 # We do not enforce range element value to be valid to reduce
616 # redundancy with backend.
617 return value
618 elif (
619 element_type and element_type.element_type.upper() in _SUPPORTED_RANGE_ELEMENTS
620 ):
621 converter = _SCALAR_VALUE_TO_JSON_ROW.get(element_type.element_type.upper())
622 return converter(value)
623 else:
624 raise ValueError(
625 f"Unsupported RANGE element type {element_type}, or "
626 "element type is empty. Must be DATE, DATETIME, or "
627 "TIMESTAMP"
628 )
629
630
631def _range_field_to_json(range_element_type, value):
632 """Coerce 'value' to an JSON-compatible representation."""
633 if isinstance(value, str):
634 # string literal
635 if _RANGE_PATTERN.match(value):
636 start, end = value[1:-1].split(", ")
637 else:
638 raise ValueError(f"RANGE literal {value} has incorrect format")
639 elif isinstance(value, dict):
640 # dictionary
641 start = value.get("start")
642 end = value.get("end")
643 else:
644 raise ValueError(
645 f"Unsupported type of RANGE value {value}, must be " "string or dict"
646 )
647
648 start = _range_element_to_json(start, range_element_type)
649 end = _range_element_to_json(end, range_element_type)
650 return {"start": start, "end": end}
651
652
653# Converters used for scalar values marshalled to the BigQuery API, such as in
654# query parameters or the tabledata.insert API.
655_SCALAR_VALUE_TO_JSON_ROW = {
656 "INTEGER": _int_to_json,
657 "INT64": _int_to_json,
658 "FLOAT": _float_to_json,
659 "FLOAT64": _float_to_json,
660 "NUMERIC": _decimal_to_json,
661 "BIGNUMERIC": _decimal_to_json,
662 "BOOLEAN": _bool_to_json,
663 "BOOL": _bool_to_json,
664 "BYTES": _bytes_to_json,
665 "TIMESTAMP": _timestamp_to_json_row,
666 "DATETIME": _datetime_to_json,
667 "DATE": _date_to_json,
668 "TIME": _time_to_json,
669 "JSON": _json_to_json,
670 "STRING": _string_to_json,
671 # Make sure DECIMAL and BIGDECIMAL are handled, even though
672 # requests for them should be converted to NUMERIC. Better safe
673 # than sorry.
674 "DECIMAL": _decimal_to_json,
675 "BIGDECIMAL": _decimal_to_json,
676}
677
678
679# Converters used for scalar values marshalled as query parameters.
680_SCALAR_VALUE_TO_JSON_PARAM = _SCALAR_VALUE_TO_JSON_ROW.copy()
681_SCALAR_VALUE_TO_JSON_PARAM["TIMESTAMP"] = _timestamp_to_json_parameter
682
683
684def _warn_unknown_field_type(field):
685 warnings.warn(
686 "Unknown type '{}' for field '{}'. Behavior reading and writing this type is not officially supported and may change in the future.".format(
687 field.field_type, field.name
688 ),
689 FutureWarning,
690 )
691
692
693def _scalar_field_to_json(field, row_value):
694 """Maps a field and value to a JSON-safe value.
695
696 Args:
697 field (google.cloud.bigquery.schema.SchemaField):
698 The SchemaField to use for type conversion and field name.
699 row_value (Any):
700 Value to be converted, based on the field's type.
701
702 Returns:
703 Any: A JSON-serializable object.
704 """
705
706 def default_converter(value):
707 _warn_unknown_field_type(field)
708 return value
709
710 converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type, default_converter)
711 return converter(row_value)
712
713
714def _repeated_field_to_json(field, row_value):
715 """Convert a repeated/array field to its JSON representation.
716
717 Args:
718 field (google.cloud.bigquery.schema.SchemaField):
719 The SchemaField to use for type conversion and field name. The
720 field mode must equal ``REPEATED``.
721 row_value (Sequence[Any]):
722 A sequence of values to convert to JSON-serializable values.
723
724 Returns:
725 List[Any]: A list of JSON-serializable objects.
726 """
727 values = []
728 for item in row_value:
729 values.append(_single_field_to_json(field, item))
730 return values
731
732
733def _record_field_to_json(fields, row_value):
734 """Convert a record/struct field to its JSON representation.
735
736 Args:
737 fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
738 The :class:`~google.cloud.bigquery.schema.SchemaField`s of the
739 record's subfields to use for type conversion and field names.
740 row_value (Union[Tuple[Any], Mapping[str, Any]):
741 A tuple or dictionary to convert to JSON-serializable values.
742
743 Returns:
744 Mapping[str, Any]: A JSON-serializable dictionary.
745 """
746 isdict = isinstance(row_value, dict)
747
748 # If row is passed as a tuple, make the length sanity check to avoid either
749 # uninformative index errors a few lines below or silently omitting some of
750 # the values from the result (we cannot know exactly which fields are missing
751 # or redundant, since we don't have their names).
752 if not isdict and len(row_value) != len(fields):
753 msg = "The number of row fields ({}) does not match schema length ({}).".format(
754 len(row_value), len(fields)
755 )
756 raise ValueError(msg)
757
758 record = {}
759
760 if isdict:
761 processed_fields = set()
762
763 for subindex, subfield in enumerate(fields):
764 subname = subfield.name
765 subvalue = row_value.get(subname) if isdict else row_value[subindex]
766
767 # None values are unconditionally omitted
768 if subvalue is not None:
769 record[subname] = _field_to_json(subfield, subvalue)
770
771 if isdict:
772 processed_fields.add(subname)
773
774 # Unknown fields should not be silently dropped, include them. Since there
775 # is no schema information available for them, include them as strings
776 # to make them JSON-serializable.
777 if isdict:
778 not_processed = set(row_value.keys()) - processed_fields
779
780 for field_name in not_processed:
781 value = row_value[field_name]
782 if value is not None:
783 record[field_name] = str(value)
784
785 return record
786
787
788def _single_field_to_json(field, row_value):
789 """Convert a single field into JSON-serializable values.
790
791 Ignores mode so that this can function for ARRAY / REPEATING fields
792 without requiring a deepcopy of the field. See:
793 https://github.com/googleapis/python-bigquery/issues/6
794
795 Args:
796 field (google.cloud.bigquery.schema.SchemaField):
797 The SchemaField to use for type conversion and field name.
798
799 row_value (Any):
800 Scalar or Struct to be inserted. The type
801 is inferred from the SchemaField's field_type.
802
803 Returns:
804 Any: A JSON-serializable object.
805 """
806 if row_value is None:
807 return None
808
809 if field.field_type == "RECORD":
810 return _record_field_to_json(field.fields, row_value)
811 if field.field_type == "RANGE":
812 return _range_field_to_json(field.range_element_type, row_value)
813
814 return _scalar_field_to_json(field, row_value)
815
816
817def _field_to_json(field, row_value):
818 """Convert a field into JSON-serializable values.
819
820 Args:
821 field (google.cloud.bigquery.schema.SchemaField):
822 The SchemaField to use for type conversion and field name.
823
824 row_value (Union[Sequence[List], Any]):
825 Row data to be inserted. If the SchemaField's mode is
826 REPEATED, assume this is a list. If not, the type
827 is inferred from the SchemaField's field_type.
828
829 Returns:
830 Any: A JSON-serializable object.
831 """
832 if row_value is None:
833 return None
834
835 if field.mode == "REPEATED":
836 return _repeated_field_to_json(field, row_value)
837
838 return _single_field_to_json(field, row_value)
839
840
841def _snake_to_camel_case(value):
842 """Convert snake case string to camel case."""
843 words = value.split("_")
844 return words[0] + "".join(map(str.capitalize, words[1:]))
845
846
847def _get_sub_prop(container, keys, default=None):
848 """Get a nested value from a dictionary.
849
850 This method works like ``dict.get(key)``, but for nested values.
851
852 Args:
853 container (Dict):
854 A dictionary which may contain other dictionaries as values.
855 keys (Iterable):
856 A sequence of keys to attempt to get the value for. If ``keys`` is a
857 string, it is treated as sequence containing a single string key. Each item
858 in the sequence represents a deeper nesting. The first key is for
859 the top level. If there is a dictionary there, the second key
860 attempts to get the value within that, and so on.
861 default (Optional[object]):
862 Value to returned if any of the keys are not found.
863 Defaults to ``None``.
864
865 Examples:
866 Get a top-level value (equivalent to ``container.get('key')``).
867
868 >>> _get_sub_prop({'key': 'value'}, ['key'])
869 'value'
870
871 Get a top-level value, providing a default (equivalent to
872 ``container.get('key', default='default')``).
873
874 >>> _get_sub_prop({'nothere': 123}, ['key'], default='not found')
875 'not found'
876
877 Get a nested value.
878
879 >>> _get_sub_prop({'key': {'subkey': 'value'}}, ['key', 'subkey'])
880 'value'
881
882 Returns:
883 object: The value if present or the default.
884 """
885 if isinstance(keys, str):
886 keys = [keys]
887
888 sub_val = container
889 for key in keys:
890 if key not in sub_val:
891 return default
892 sub_val = sub_val[key]
893 return sub_val
894
895
896def _set_sub_prop(container, keys, value):
897 """Set a nested value in a dictionary.
898
899 Args:
900 container (Dict):
901 A dictionary which may contain other dictionaries as values.
902 keys (Iterable):
903 A sequence of keys to attempt to set the value for. If ``keys`` is a
904 string, it is treated as sequence containing a single string key. Each item
905 in the sequence represents a deeper nesting. The first key is for
906 the top level. If there is a dictionary there, the second key
907 attempts to get the value within that, and so on.
908 value (object): Value to set within the container.
909
910 Examples:
911 Set a top-level value (equivalent to ``container['key'] = 'value'``).
912
913 >>> container = {}
914 >>> _set_sub_prop(container, ['key'], 'value')
915 >>> container
916 {'key': 'value'}
917
918 Set a nested value.
919
920 >>> container = {}
921 >>> _set_sub_prop(container, ['key', 'subkey'], 'value')
922 >>> container
923 {'key': {'subkey': 'value'}}
924
925 Replace a nested value.
926
927 >>> container = {'key': {'subkey': 'prev'}}
928 >>> _set_sub_prop(container, ['key', 'subkey'], 'new')
929 >>> container
930 {'key': {'subkey': 'new'}}
931 """
932 if isinstance(keys, str):
933 keys = [keys]
934
935 sub_val = container
936 for key in keys[:-1]:
937 if key not in sub_val:
938 sub_val[key] = {}
939 sub_val = sub_val[key]
940 sub_val[keys[-1]] = value
941
942
943def _del_sub_prop(container, keys):
944 """Remove a nested key fro a dictionary.
945
946 Args:
947 container (Dict):
948 A dictionary which may contain other dictionaries as values.
949 keys (Iterable):
950 A sequence of keys to attempt to clear the value for. Each item in
951 the sequence represents a deeper nesting. The first key is for
952 the top level. If there is a dictionary there, the second key
953 attempts to get the value within that, and so on.
954
955 Examples:
956 Remove a top-level value (equivalent to ``del container['key']``).
957
958 >>> container = {'key': 'value'}
959 >>> _del_sub_prop(container, ['key'])
960 >>> container
961 {}
962
963 Remove a nested value.
964
965 >>> container = {'key': {'subkey': 'value'}}
966 >>> _del_sub_prop(container, ['key', 'subkey'])
967 >>> container
968 {'key': {}}
969 """
970 sub_val = container
971 for key in keys[:-1]:
972 if key not in sub_val:
973 sub_val[key] = {}
974 sub_val = sub_val[key]
975 if keys[-1] in sub_val:
976 del sub_val[keys[-1]]
977
978
979def _int_or_none(value):
980 """Helper: deserialize int value from JSON string."""
981 if isinstance(value, int):
982 return value
983 if value is not None:
984 return int(value)
985
986
987def _str_or_none(value):
988 """Helper: serialize value to JSON string."""
989 if value is not None:
990 return str(value)
991
992
993def _split_id(full_id):
994 """Helper: split full_id into composite parts.
995
996 Args:
997 full_id (str): Fully-qualified ID in standard SQL format.
998
999 Returns:
1000 List[str]: ID's parts separated into components.
1001 """
1002 with_prefix = _PROJECT_PREFIX_PATTERN.match(full_id)
1003 if with_prefix is None:
1004 parts = full_id.split(".")
1005 else:
1006 parts = with_prefix.groups()
1007 parts = [part for part in parts if part]
1008 return parts
1009
1010
1011def _parse_3_part_id(full_id, default_project=None, property_name="table_id"):
1012 output_project_id = default_project
1013 output_dataset_id = None
1014 output_resource_id = None
1015 parts = _split_id(full_id)
1016
1017 if len(parts) != 2 and len(parts) != 3:
1018 raise ValueError(
1019 "{property_name} must be a fully-qualified ID in "
1020 'standard SQL format, e.g., "project.dataset.{property_name}", '
1021 "got {}".format(full_id, property_name=property_name)
1022 )
1023
1024 if len(parts) == 2 and not default_project:
1025 raise ValueError(
1026 "When default_project is not set, {property_name} must be a "
1027 "fully-qualified ID in standard SQL format, "
1028 'e.g., "project.dataset_id.{property_name}", got {}'.format(
1029 full_id, property_name=property_name
1030 )
1031 )
1032
1033 if len(parts) == 2:
1034 output_dataset_id, output_resource_id = parts
1035 else:
1036 output_project_id, output_dataset_id, output_resource_id = parts
1037
1038 return output_project_id, output_dataset_id, output_resource_id
1039
1040
1041def _build_resource_from_properties(obj, filter_fields):
1042 """Build a resource based on a ``_properties`` dictionary, filtered by
1043 ``filter_fields``, which follow the name of the Python object.
1044 """
1045 partial = {}
1046 for filter_field in filter_fields:
1047 api_field = _get_sub_prop(obj._PROPERTY_TO_API_FIELD, filter_field)
1048 if api_field is None and filter_field not in obj._properties:
1049 raise ValueError("No property %s" % filter_field)
1050 elif api_field is not None:
1051 _set_sub_prop(partial, api_field, _get_sub_prop(obj._properties, api_field))
1052 else:
1053 # allows properties that are not defined in the library
1054 # and properties that have the same name as API resource key
1055 partial[filter_field] = obj._properties[filter_field]
1056
1057 return partial
1058
1059
1060def _verify_job_config_type(job_config, expected_type, param_name="job_config"):
1061 if not isinstance(job_config, expected_type):
1062 msg = (
1063 "Expected an instance of {expected_type} class for the {param_name} parameter, "
1064 "but received {param_name} = {job_config}"
1065 )
1066 raise TypeError(
1067 msg.format(
1068 expected_type=expected_type.__name__,
1069 param_name=param_name,
1070 job_config=job_config,
1071 )
1072 )
1073
1074
1075def _isinstance_or_raise(
1076 value: Any,
1077 dtype: Union[Type, Tuple[Type, ...]],
1078 none_allowed: Optional[bool] = False,
1079) -> Any:
1080 """Determine whether a value type matches a given datatype or None.
1081 Args:
1082 value (Any): Value to be checked.
1083 dtype (type): Expected data type or tuple of data types.
1084 none_allowed Optional(bool): whether value is allowed to be None. Default
1085 is False.
1086 Returns:
1087 Any: Returns the input value if the type check is successful.
1088 Raises:
1089 TypeError: If the input value's type does not match the expected data type(s).
1090 """
1091 if none_allowed and value is None:
1092 return value
1093
1094 if isinstance(value, dtype):
1095 return value
1096
1097 or_none = ""
1098 if none_allowed:
1099 or_none = " (or None)"
1100
1101 msg = f"Pass {value} as a '{dtype}'{or_none}. Got {type(value)}."
1102 raise TypeError(msg)