1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Shared helper functions for BigQuery API classes."""
16
17import base64
18import datetime
19import decimal
20import json
21import math
22import re
23import os
24import textwrap
25import warnings
26from typing import Any, Optional, Tuple, Type, Union
27
28from dateutil import relativedelta
29from google.cloud._helpers import UTC # type: ignore
30from google.cloud._helpers import _date_from_iso8601_date
31from google.cloud._helpers import _datetime_from_microseconds
32from google.cloud._helpers import _RFC3339_MICROS
33from google.cloud._helpers import _RFC3339_NO_FRACTION
34from google.cloud._helpers import _to_bytes
35from google.cloud.bigquery import enums
36
37from google.auth import credentials as ga_credentials # type: ignore
38from google.api_core import client_options as client_options_lib
39
40TimeoutType = Union[float, None]
41
42_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
43_TIMEONLY_WO_MICROS = "%H:%M:%S"
44_TIMEONLY_W_MICROS = "%H:%M:%S.%f"
45_PROJECT_PREFIX_PATTERN = re.compile(
46 r"""
47 (?P<project_id>\S+\:[^.]+)\.(?P<dataset_id>[^.]+)(?:$|\.(?P<custom_id>[^.]+)$)
48""",
49 re.VERBOSE,
50)
51
52# BigQuery sends INTERVAL data in "canonical format"
53# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#interval_type
54_INTERVAL_PATTERN = re.compile(
55 r"(?P<calendar_sign>-?)(?P<years>\d+)-(?P<months>\d+) "
56 r"(?P<days>-?\d+) "
57 r"(?P<time_sign>-?)(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)\.?(?P<fraction>\d*)?$"
58)
59_RANGE_PATTERN = re.compile(r"\[.*, .*\)")
60
61BIGQUERY_EMULATOR_HOST = "BIGQUERY_EMULATOR_HOST"
62"""Environment variable defining host for emulator."""
63
64_DEFAULT_HOST = "https://bigquery.googleapis.com"
65"""Default host for JSON API."""
66
67_DEFAULT_HOST_TEMPLATE = "https://bigquery.{UNIVERSE_DOMAIN}"
68""" Templatized endpoint format. """
69
70_DEFAULT_UNIVERSE = "googleapis.com"
71"""Default universe for the JSON API."""
72
73_UNIVERSE_DOMAIN_ENV = "GOOGLE_CLOUD_UNIVERSE_DOMAIN"
74"""Environment variable for setting universe domain."""
75
76_SUPPORTED_RANGE_ELEMENTS = {"TIMESTAMP", "DATETIME", "DATE"}
77
78
79def _get_client_universe(
80 client_options: Optional[Union[client_options_lib.ClientOptions, dict]]
81) -> str:
82 """Retrieves the specified universe setting.
83
84 Args:
85 client_options: specified client options.
86 Returns:
87 str: resolved universe setting.
88
89 """
90 if isinstance(client_options, dict):
91 client_options = client_options_lib.from_dict(client_options)
92 universe = _DEFAULT_UNIVERSE
93 options_universe = getattr(client_options, "universe_domain", None)
94 if (
95 options_universe
96 and isinstance(options_universe, str)
97 and len(options_universe) > 0
98 ):
99 universe = options_universe
100 else:
101 env_universe = os.getenv(_UNIVERSE_DOMAIN_ENV)
102 if isinstance(env_universe, str) and len(env_universe) > 0:
103 universe = env_universe
104 return universe
105
106
107def _validate_universe(client_universe: str, credentials: ga_credentials.Credentials):
108 """Validates that client provided universe and universe embedded in credentials match.
109
110 Args:
111 client_universe (str): The universe domain configured via the client options.
112 credentials (ga_credentials.Credentials): The credentials being used in the client.
113
114 Raises:
115 ValueError: when client_universe does not match the universe in credentials.
116 """
117 if hasattr(credentials, "universe_domain"):
118 cred_universe = getattr(credentials, "universe_domain")
119 if isinstance(cred_universe, str):
120 if client_universe != cred_universe:
121 raise ValueError(
122 "The configured universe domain "
123 f"({client_universe}) does not match the universe domain "
124 f"found in the credentials ({cred_universe}). "
125 "If you haven't configured the universe domain explicitly, "
126 f"`{_DEFAULT_UNIVERSE}` is the default."
127 )
128
129
130def _get_bigquery_host():
131 return os.environ.get(BIGQUERY_EMULATOR_HOST, _DEFAULT_HOST)
132
133
134def _not_null(value, field):
135 """Check whether 'value' should be coerced to 'field' type."""
136 return value is not None or (field is not None and field.mode != "NULLABLE")
137
138
139class CellDataParser:
140 """Converter from BigQuery REST resource to Python value for RowIterator and similar classes.
141
142 See: "rows" field of
143 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list and
144 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults.
145 """
146
147 def to_py(self, resource, field):
148 def default_converter(value, field):
149 _warn_unknown_field_type(field)
150 return value
151
152 converter = getattr(
153 self, f"{field.field_type.lower()}_to_py", default_converter
154 )
155 if field.mode == "REPEATED":
156 return [converter(item["v"], field) for item in resource]
157 else:
158 return converter(resource, field)
159
160 def bool_to_py(self, value, field):
161 """Coerce 'value' to a bool, if set or not nullable."""
162 if _not_null(value, field):
163 # TODO(tswast): Why does _not_null care if the field is NULLABLE or
164 # REQUIRED? Do we actually need such client-side validation?
165 if value is None:
166 raise TypeError(f"got None for required boolean field {field}")
167 return value.lower() in ("t", "true", "1")
168
169 def boolean_to_py(self, value, field):
170 """Coerce 'value' to a bool, if set or not nullable."""
171 return self.bool_to_py(value, field)
172
173 def integer_to_py(self, value, field):
174 """Coerce 'value' to an int, if set or not nullable."""
175 if _not_null(value, field):
176 return int(value)
177
178 def int64_to_py(self, value, field):
179 """Coerce 'value' to an int, if set or not nullable."""
180 return self.integer_to_py(value, field)
181
182 def interval_to_py(
183 self, value: Optional[str], field
184 ) -> Optional[relativedelta.relativedelta]:
185 """Coerce 'value' to an interval, if set or not nullable."""
186 if not _not_null(value, field):
187 return None
188 if value is None:
189 raise TypeError(f"got {value} for REQUIRED field: {repr(field)}")
190
191 parsed = _INTERVAL_PATTERN.match(value)
192 if parsed is None:
193 raise ValueError(
194 textwrap.dedent(
195 f"""
196 Got interval: '{value}' with unexpected format.
197 Expected interval in canonical format of "[sign]Y-M [sign]D [sign]H:M:S[.F]".
198 See:
199 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#interval_type
200 for more information.
201 """
202 ),
203 )
204
205 calendar_sign = -1 if parsed.group("calendar_sign") == "-" else 1
206 years = calendar_sign * int(parsed.group("years"))
207 months = calendar_sign * int(parsed.group("months"))
208 days = int(parsed.group("days"))
209 time_sign = -1 if parsed.group("time_sign") == "-" else 1
210 hours = time_sign * int(parsed.group("hours"))
211 minutes = time_sign * int(parsed.group("minutes"))
212 seconds = time_sign * int(parsed.group("seconds"))
213 fraction = parsed.group("fraction")
214 microseconds = time_sign * int(fraction.ljust(6, "0")[:6]) if fraction else 0
215
216 return relativedelta.relativedelta(
217 years=years,
218 months=months,
219 days=days,
220 hours=hours,
221 minutes=minutes,
222 seconds=seconds,
223 microseconds=microseconds,
224 )
225
226 def float_to_py(self, value, field):
227 """Coerce 'value' to a float, if set or not nullable."""
228 if _not_null(value, field):
229 return float(value)
230
231 def float64_to_py(self, value, field):
232 """Coerce 'value' to a float, if set or not nullable."""
233 return self.float_to_py(value, field)
234
235 def numeric_to_py(self, value, field):
236 """Coerce 'value' to a Decimal, if set or not nullable."""
237 if _not_null(value, field):
238 return decimal.Decimal(value)
239
240 def bignumeric_to_py(self, value, field):
241 """Coerce 'value' to a Decimal, if set or not nullable."""
242 return self.numeric_to_py(value, field)
243
244 def string_to_py(self, value, _):
245 """NOOP string -> string coercion"""
246 return value
247
248 def geography_to_py(self, value, _):
249 """NOOP string -> string coercion"""
250 return value
251
252 def bytes_to_py(self, value, field):
253 """Base64-decode value"""
254 if _not_null(value, field):
255 return base64.standard_b64decode(_to_bytes(value))
256
257 def timestamp_to_py(self, value, field) -> Union[datetime.datetime, str, None]:
258 """Coerce 'value' to a datetime, if set or not nullable. If timestamp
259 is of picosecond precision, preserve the string format."""
260 if field.timestamp_precision == enums.TimestampPrecision.PICOSECOND:
261 return value
262 if _not_null(value, field):
263 # value will be a integer in seconds, to microsecond precision, in UTC.
264 return _datetime_from_microseconds(int(value))
265 return None
266
267 def datetime_to_py(self, value, field):
268 """Coerce 'value' to a datetime, if set or not nullable.
269
270 Args:
271 value (str): The timestamp.
272 field (google.cloud.bigquery.schema.SchemaField):
273 The field corresponding to the value.
274
275 Returns:
276 Optional[datetime.datetime]:
277 The parsed datetime object from
278 ``value`` if the ``field`` is not null (otherwise it is
279 :data:`None`).
280 """
281 if _not_null(value, field):
282 if "." in value:
283 # YYYY-MM-DDTHH:MM:SS.ffffff
284 return datetime.datetime.strptime(value, _RFC3339_MICROS_NO_ZULU)
285 else:
286 # YYYY-MM-DDTHH:MM:SS
287 return datetime.datetime.strptime(value, _RFC3339_NO_FRACTION)
288 else:
289 return None
290
291 def date_to_py(self, value, field):
292 """Coerce 'value' to a datetime date, if set or not nullable"""
293 if _not_null(value, field):
294 # value will be a string, in YYYY-MM-DD form.
295 return _date_from_iso8601_date(value)
296
297 def time_to_py(self, value, field):
298 """Coerce 'value' to a datetime date, if set or not nullable"""
299 if _not_null(value, field):
300 if len(value) == 8: # HH:MM:SS
301 fmt = _TIMEONLY_WO_MICROS
302 elif len(value) == 15: # HH:MM:SS.micros
303 fmt = _TIMEONLY_W_MICROS
304 else:
305 raise ValueError(
306 textwrap.dedent(
307 f"""
308 Got {repr(value)} with unknown time format.
309 Expected HH:MM:SS or HH:MM:SS.micros. See
310 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type
311 for more information.
312 """
313 ),
314 )
315 return datetime.datetime.strptime(value, fmt).time()
316
317 def record_to_py(self, value, field):
318 """Coerce 'value' to a mapping, if set or not nullable."""
319 if _not_null(value, field):
320 record = {}
321 record_iter = zip(field.fields, value["f"])
322 for subfield, cell in record_iter:
323 record[subfield.name] = self.to_py(cell["v"], subfield)
324 return record
325
326 def struct_to_py(self, value, field):
327 """Coerce 'value' to a mapping, if set or not nullable."""
328 return self.record_to_py(value, field)
329
330 def json_to_py(self, value, field):
331 """Coerce 'value' to a Pythonic JSON representation."""
332 if _not_null(value, field):
333 return json.loads(value)
334 else:
335 return None
336
337 def _range_element_to_py(self, value, field_element_type):
338 """Coerce 'value' to a range element value."""
339 # Avoid circular imports by importing here.
340 from google.cloud.bigquery import schema
341
342 if value == "UNBOUNDED":
343 return None
344 if field_element_type.element_type in _SUPPORTED_RANGE_ELEMENTS:
345 return self.to_py(
346 value,
347 schema.SchemaField("placeholder", field_element_type.element_type),
348 )
349 else:
350 raise ValueError(
351 textwrap.dedent(
352 f"""
353 Got unsupported range element type: {field_element_type.element_type}.
354 Exptected one of {repr(_SUPPORTED_RANGE_ELEMENTS)}. See:
355 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#declare_a_range_type
356 for more information.
357 """
358 ),
359 )
360
361 def range_to_py(self, value, field):
362 """Coerce 'value' to a range, if set or not nullable.
363
364 Args:
365 value (str): The literal representation of the range.
366 field (google.cloud.bigquery.schema.SchemaField):
367 The field corresponding to the value.
368
369 Returns:
370 Optional[dict]:
371 The parsed range object from ``value`` if the ``field`` is not
372 null (otherwise it is :data:`None`).
373 """
374 if _not_null(value, field):
375 if _RANGE_PATTERN.match(value):
376 start, end = value[1:-1].split(", ")
377 start = self._range_element_to_py(start, field.range_element_type)
378 end = self._range_element_to_py(end, field.range_element_type)
379 return {"start": start, "end": end}
380 else:
381 raise ValueError(
382 textwrap.dedent(
383 f"""
384 Got unknown format for range value: {value}.
385 Expected format '[lower_bound, upper_bound)'. See:
386 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_with_literal
387 for more information.
388 """
389 ),
390 )
391
392
393CELL_DATA_PARSER = CellDataParser()
394
395
396class DataFrameCellDataParser(CellDataParser):
397 """Override of CellDataParser to handle differences in expression of values in DataFrame-like outputs.
398
399 This is used to turn the output of the REST API into a pyarrow Table,
400 emulating the serialized arrow from the BigQuery Storage Read API.
401 """
402
403 def json_to_py(self, value, _):
404 """No-op because DataFrame expects string for JSON output."""
405 return value
406
407
408DATA_FRAME_CELL_DATA_PARSER = DataFrameCellDataParser()
409
410
411class ScalarQueryParamParser(CellDataParser):
412 """Override of CellDataParser to handle the differences in the response from query params.
413
414 See: "value" field of
415 https://cloud.google.com/bigquery/docs/reference/rest/v2/QueryParameter#QueryParameterValue
416 """
417
418 def timestamp_to_py(self, value, field):
419 """Coerce 'value' to a datetime, if set or not nullable.
420
421 Args:
422 value (str): The timestamp.
423
424 field (google.cloud.bigquery.schema.SchemaField):
425 The field corresponding to the value.
426
427 Returns:
428 Optional[datetime.datetime]:
429 The parsed datetime object from
430 ``value`` if the ``field`` is not null (otherwise it is
431 :data:`None`).
432 """
433 if _not_null(value, field):
434 # Canonical formats for timestamps in BigQuery are flexible. See:
435 # g.co/cloud/bigquery/docs/reference/standard-sql/data-types#timestamp-type
436 # The separator between the date and time can be 'T' or ' '.
437 value = value.replace(" ", "T", 1)
438 # The UTC timezone may be formatted as Z or +00:00.
439 value = value.replace("Z", "")
440 value = value.replace("+00:00", "")
441
442 if "." in value:
443 # YYYY-MM-DDTHH:MM:SS.ffffff
444 return datetime.datetime.strptime(
445 value, _RFC3339_MICROS_NO_ZULU
446 ).replace(tzinfo=UTC)
447 else:
448 # YYYY-MM-DDTHH:MM:SS
449 return datetime.datetime.strptime(value, _RFC3339_NO_FRACTION).replace(
450 tzinfo=UTC
451 )
452 else:
453 return None
454
455
456SCALAR_QUERY_PARAM_PARSER = ScalarQueryParamParser()
457
458
459def _field_to_index_mapping(schema):
460 """Create a mapping from schema field name to index of field."""
461 return {f.name: i for i, f in enumerate(schema)}
462
463
464def _row_tuple_from_json(row, schema):
465 """Convert JSON row data to row with appropriate types.
466
467 Note: ``row['f']`` and ``schema`` are presumed to be of the same length.
468
469 Args:
470 row (Dict): A JSON response row to be converted.
471 schema (Sequence[Union[ \
472 :class:`~google.cloud.bigquery.schema.SchemaField`, \
473 Mapping[str, Any] \
474 ]]): Specification of the field types in ``row``.
475
476 Returns:
477 Tuple: A tuple of data converted to native types.
478 """
479 from google.cloud.bigquery.schema import _to_schema_fields
480
481 schema = _to_schema_fields(schema)
482
483 row_data = []
484 for field, cell in zip(schema, row["f"]):
485 row_data.append(CELL_DATA_PARSER.to_py(cell["v"], field))
486 return tuple(row_data)
487
488
489def _rows_from_json(values, schema):
490 """Convert JSON row data to rows with appropriate types.
491
492 Args:
493 values (Sequence[Dict]): The list of responses (JSON rows) to convert.
494 schema (Sequence[Union[ \
495 :class:`~google.cloud.bigquery.schema.SchemaField`, \
496 Mapping[str, Any] \
497 ]]):
498 The table's schema. If any item is a mapping, its content must be
499 compatible with
500 :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
501
502 Returns:
503 List[:class:`~google.cloud.bigquery.Row`]
504 """
505 from google.cloud.bigquery import Row
506 from google.cloud.bigquery.schema import _to_schema_fields
507
508 schema = _to_schema_fields(schema)
509 field_to_index = _field_to_index_mapping(schema)
510 return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values]
511
512
513def _int_to_json(value):
514 """Coerce 'value' to an JSON-compatible representation."""
515 if isinstance(value, int):
516 value = str(value)
517 return value
518
519
520def _float_to_json(value) -> Union[None, str, float]:
521 """Coerce 'value' to an JSON-compatible representation."""
522 if value is None:
523 return None
524
525 if isinstance(value, str):
526 value = float(value)
527
528 return str(value) if (math.isnan(value) or math.isinf(value)) else float(value)
529
530
531def _decimal_to_json(value):
532 """Coerce 'value' to a JSON-compatible representation."""
533 if isinstance(value, decimal.Decimal):
534 value = str(value)
535 return value
536
537
538def _bool_to_json(value):
539 """Coerce 'value' to an JSON-compatible representation."""
540 if isinstance(value, bool):
541 value = "true" if value else "false"
542 return value
543
544
545def _bytes_to_json(value):
546 """Coerce 'value' to an JSON-compatible representation."""
547 if isinstance(value, bytes):
548 value = base64.standard_b64encode(value).decode("ascii")
549 return value
550
551
552def _json_to_json(value):
553 """Coerce 'value' to a BigQuery REST API representation."""
554 if value is None:
555 return None
556 return json.dumps(value)
557
558
559def _string_to_json(value):
560 """NOOP string -> string coercion"""
561 return value
562
563
564def _timestamp_to_json_parameter(value):
565 """Coerce 'value' to an JSON-compatible representation.
566
567 This version returns the string representation used in query parameters.
568 """
569 if isinstance(value, datetime.datetime):
570 if value.tzinfo not in (None, UTC):
571 # Convert to UTC and remove the time zone info.
572 value = value.replace(tzinfo=None) - value.utcoffset()
573 value = "%s %s+00:00" % (value.date().isoformat(), value.time().isoformat())
574 return value
575
576
577def _timestamp_to_json_row(value):
578 """Coerce 'value' to an JSON-compatible representation."""
579 if isinstance(value, datetime.datetime):
580 # For naive datetime objects UTC timezone is assumed, thus we format
581 # those to string directly without conversion.
582 if value.tzinfo is not None:
583 value = value.astimezone(UTC)
584 value = value.strftime(_RFC3339_MICROS)
585 return value
586
587
588def _datetime_to_json(value):
589 """Coerce 'value' to an JSON-compatible representation."""
590 if isinstance(value, datetime.datetime):
591 # For naive datetime objects UTC timezone is assumed, thus we format
592 # those to string directly without conversion.
593 if value.tzinfo is not None:
594 value = value.astimezone(UTC)
595 value = value.strftime(_RFC3339_MICROS_NO_ZULU)
596 return value
597
598
599def _date_to_json(value):
600 """Coerce 'value' to an JSON-compatible representation."""
601 if isinstance(value, datetime.date):
602 value = value.isoformat()
603 return value
604
605
606def _time_to_json(value):
607 """Coerce 'value' to an JSON-compatible representation."""
608 if isinstance(value, datetime.time):
609 value = value.isoformat()
610 return value
611
612
613def _range_element_to_json(value, element_type=None):
614 """Coerce 'value' to an JSON-compatible representation."""
615 if value is None:
616 return None
617 elif isinstance(value, str):
618 if value.upper() in ("UNBOUNDED", "NULL"):
619 return None
620 else:
621 # We do not enforce range element value to be valid to reduce
622 # redundancy with backend.
623 return value
624 elif (
625 element_type and element_type.element_type.upper() in _SUPPORTED_RANGE_ELEMENTS
626 ):
627 converter = _SCALAR_VALUE_TO_JSON_ROW.get(element_type.element_type.upper())
628 return converter(value)
629 else:
630 raise ValueError(
631 f"Unsupported RANGE element type {element_type}, or "
632 "element type is empty. Must be DATE, DATETIME, or "
633 "TIMESTAMP"
634 )
635
636
637def _range_field_to_json(range_element_type, value):
638 """Coerce 'value' to an JSON-compatible representation."""
639 if isinstance(value, str):
640 # string literal
641 if _RANGE_PATTERN.match(value):
642 start, end = value[1:-1].split(", ")
643 else:
644 raise ValueError(f"RANGE literal {value} has incorrect format")
645 elif isinstance(value, dict):
646 # dictionary
647 start = value.get("start")
648 end = value.get("end")
649 else:
650 raise ValueError(
651 f"Unsupported type of RANGE value {value}, must be " "string or dict"
652 )
653
654 start = _range_element_to_json(start, range_element_type)
655 end = _range_element_to_json(end, range_element_type)
656 return {"start": start, "end": end}
657
658
659# Converters used for scalar values marshalled to the BigQuery API, such as in
660# query parameters or the tabledata.insert API.
661_SCALAR_VALUE_TO_JSON_ROW = {
662 "INTEGER": _int_to_json,
663 "INT64": _int_to_json,
664 "FLOAT": _float_to_json,
665 "FLOAT64": _float_to_json,
666 "NUMERIC": _decimal_to_json,
667 "BIGNUMERIC": _decimal_to_json,
668 "BOOLEAN": _bool_to_json,
669 "BOOL": _bool_to_json,
670 "BYTES": _bytes_to_json,
671 "TIMESTAMP": _timestamp_to_json_row,
672 "DATETIME": _datetime_to_json,
673 "DATE": _date_to_json,
674 "TIME": _time_to_json,
675 "JSON": _json_to_json,
676 "STRING": _string_to_json,
677 # Make sure DECIMAL and BIGDECIMAL are handled, even though
678 # requests for them should be converted to NUMERIC. Better safe
679 # than sorry.
680 "DECIMAL": _decimal_to_json,
681 "BIGDECIMAL": _decimal_to_json,
682}
683
684
685# Converters used for scalar values marshalled as query parameters.
686_SCALAR_VALUE_TO_JSON_PARAM = _SCALAR_VALUE_TO_JSON_ROW.copy()
687_SCALAR_VALUE_TO_JSON_PARAM["TIMESTAMP"] = _timestamp_to_json_parameter
688
689
690def _warn_unknown_field_type(field):
691 warnings.warn(
692 "Unknown type '{}' for field '{}'. Behavior reading and writing this type is not officially supported and may change in the future.".format(
693 field.field_type, field.name
694 ),
695 FutureWarning,
696 )
697
698
699def _scalar_field_to_json(field, row_value):
700 """Maps a field and value to a JSON-safe value.
701
702 Args:
703 field (google.cloud.bigquery.schema.SchemaField):
704 The SchemaField to use for type conversion and field name.
705 row_value (Any):
706 Value to be converted, based on the field's type.
707
708 Returns:
709 Any: A JSON-serializable object.
710 """
711
712 def default_converter(value):
713 _warn_unknown_field_type(field)
714 return value
715
716 converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type, default_converter)
717 return converter(row_value)
718
719
720def _repeated_field_to_json(field, row_value):
721 """Convert a repeated/array field to its JSON representation.
722
723 Args:
724 field (google.cloud.bigquery.schema.SchemaField):
725 The SchemaField to use for type conversion and field name. The
726 field mode must equal ``REPEATED``.
727 row_value (Sequence[Any]):
728 A sequence of values to convert to JSON-serializable values.
729
730 Returns:
731 List[Any]: A list of JSON-serializable objects.
732 """
733 values = []
734 for item in row_value:
735 values.append(_single_field_to_json(field, item))
736 return values
737
738
739def _record_field_to_json(fields, row_value):
740 """Convert a record/struct field to its JSON representation.
741
742 Args:
743 fields (Sequence[google.cloud.bigquery.schema.SchemaField]):
744 The :class:`~google.cloud.bigquery.schema.SchemaField`s of the
745 record's subfields to use for type conversion and field names.
746 row_value (Union[Tuple[Any], Mapping[str, Any]):
747 A tuple or dictionary to convert to JSON-serializable values.
748
749 Returns:
750 Mapping[str, Any]: A JSON-serializable dictionary.
751 """
752 isdict = isinstance(row_value, dict)
753
754 # If row is passed as a tuple, make the length sanity check to avoid either
755 # uninformative index errors a few lines below or silently omitting some of
756 # the values from the result (we cannot know exactly which fields are missing
757 # or redundant, since we don't have their names).
758 if not isdict and len(row_value) != len(fields):
759 msg = "The number of row fields ({}) does not match schema length ({}).".format(
760 len(row_value), len(fields)
761 )
762 raise ValueError(msg)
763
764 record = {}
765
766 if isdict:
767 processed_fields = set()
768
769 for subindex, subfield in enumerate(fields):
770 subname = subfield.name
771 subvalue = row_value.get(subname) if isdict else row_value[subindex]
772
773 # None values are unconditionally omitted
774 if subvalue is not None:
775 record[subname] = _field_to_json(subfield, subvalue)
776
777 if isdict:
778 processed_fields.add(subname)
779
780 # Unknown fields should not be silently dropped, include them. Since there
781 # is no schema information available for them, include them as strings
782 # to make them JSON-serializable.
783 if isdict:
784 not_processed = set(row_value.keys()) - processed_fields
785
786 for field_name in not_processed:
787 value = row_value[field_name]
788 if value is not None:
789 record[field_name] = str(value)
790
791 return record
792
793
794def _single_field_to_json(field, row_value):
795 """Convert a single field into JSON-serializable values.
796
797 Ignores mode so that this can function for ARRAY / REPEATING fields
798 without requiring a deepcopy of the field. See:
799 https://github.com/googleapis/python-bigquery/issues/6
800
801 Args:
802 field (google.cloud.bigquery.schema.SchemaField):
803 The SchemaField to use for type conversion and field name.
804
805 row_value (Any):
806 Scalar or Struct to be inserted. The type
807 is inferred from the SchemaField's field_type.
808
809 Returns:
810 Any: A JSON-serializable object.
811 """
812 if row_value is None:
813 return None
814
815 if field.field_type == "RECORD":
816 return _record_field_to_json(field.fields, row_value)
817 if field.field_type == "RANGE":
818 return _range_field_to_json(field.range_element_type, row_value)
819
820 return _scalar_field_to_json(field, row_value)
821
822
823def _field_to_json(field, row_value):
824 """Convert a field into JSON-serializable values.
825
826 Args:
827 field (google.cloud.bigquery.schema.SchemaField):
828 The SchemaField to use for type conversion and field name.
829
830 row_value (Union[Sequence[List], Any]):
831 Row data to be inserted. If the SchemaField's mode is
832 REPEATED, assume this is a list. If not, the type
833 is inferred from the SchemaField's field_type.
834
835 Returns:
836 Any: A JSON-serializable object.
837 """
838 if row_value is None:
839 return None
840
841 if field.mode == "REPEATED":
842 return _repeated_field_to_json(field, row_value)
843
844 return _single_field_to_json(field, row_value)
845
846
847def _snake_to_camel_case(value):
848 """Convert snake case string to camel case."""
849 words = value.split("_")
850 return words[0] + "".join(map(str.capitalize, words[1:]))
851
852
853def _get_sub_prop(container, keys, default=None):
854 """Get a nested value from a dictionary.
855
856 This method works like ``dict.get(key)``, but for nested values.
857
858 Args:
859 container (Dict):
860 A dictionary which may contain other dictionaries as values.
861 keys (Iterable):
862 A sequence of keys to attempt to get the value for. If ``keys`` is a
863 string, it is treated as sequence containing a single string key. Each item
864 in the sequence represents a deeper nesting. The first key is for
865 the top level. If there is a dictionary there, the second key
866 attempts to get the value within that, and so on.
867 default (Optional[object]):
868 Value to returned if any of the keys are not found.
869 Defaults to ``None``.
870
871 Examples:
872 Get a top-level value (equivalent to ``container.get('key')``).
873
874 >>> _get_sub_prop({'key': 'value'}, ['key'])
875 'value'
876
877 Get a top-level value, providing a default (equivalent to
878 ``container.get('key', default='default')``).
879
880 >>> _get_sub_prop({'nothere': 123}, ['key'], default='not found')
881 'not found'
882
883 Get a nested value.
884
885 >>> _get_sub_prop({'key': {'subkey': 'value'}}, ['key', 'subkey'])
886 'value'
887
888 Returns:
889 object: The value if present or the default.
890 """
891 if isinstance(keys, str):
892 keys = [keys]
893
894 sub_val = container
895 for key in keys:
896 if key not in sub_val:
897 return default
898 sub_val = sub_val[key]
899 return sub_val
900
901
902def _set_sub_prop(container, keys, value):
903 """Set a nested value in a dictionary.
904
905 Args:
906 container (Dict):
907 A dictionary which may contain other dictionaries as values.
908 keys (Iterable):
909 A sequence of keys to attempt to set the value for. If ``keys`` is a
910 string, it is treated as sequence containing a single string key. Each item
911 in the sequence represents a deeper nesting. The first key is for
912 the top level. If there is a dictionary there, the second key
913 attempts to get the value within that, and so on.
914 value (object): Value to set within the container.
915
916 Examples:
917 Set a top-level value (equivalent to ``container['key'] = 'value'``).
918
919 >>> container = {}
920 >>> _set_sub_prop(container, ['key'], 'value')
921 >>> container
922 {'key': 'value'}
923
924 Set a nested value.
925
926 >>> container = {}
927 >>> _set_sub_prop(container, ['key', 'subkey'], 'value')
928 >>> container
929 {'key': {'subkey': 'value'}}
930
931 Replace a nested value.
932
933 >>> container = {'key': {'subkey': 'prev'}}
934 >>> _set_sub_prop(container, ['key', 'subkey'], 'new')
935 >>> container
936 {'key': {'subkey': 'new'}}
937 """
938 if isinstance(keys, str):
939 keys = [keys]
940
941 sub_val = container
942 for key in keys[:-1]:
943 if key not in sub_val:
944 sub_val[key] = {}
945 sub_val = sub_val[key]
946 sub_val[keys[-1]] = value
947
948
949def _del_sub_prop(container, keys):
950 """Remove a nested key fro a dictionary.
951
952 Args:
953 container (Dict):
954 A dictionary which may contain other dictionaries as values.
955 keys (Iterable):
956 A sequence of keys to attempt to clear the value for. Each item in
957 the sequence represents a deeper nesting. The first key is for
958 the top level. If there is a dictionary there, the second key
959 attempts to get the value within that, and so on.
960
961 Examples:
962 Remove a top-level value (equivalent to ``del container['key']``).
963
964 >>> container = {'key': 'value'}
965 >>> _del_sub_prop(container, ['key'])
966 >>> container
967 {}
968
969 Remove a nested value.
970
971 >>> container = {'key': {'subkey': 'value'}}
972 >>> _del_sub_prop(container, ['key', 'subkey'])
973 >>> container
974 {'key': {}}
975 """
976 sub_val = container
977 for key in keys[:-1]:
978 if key not in sub_val:
979 sub_val[key] = {}
980 sub_val = sub_val[key]
981 if keys[-1] in sub_val:
982 del sub_val[keys[-1]]
983
984
985def _int_or_none(value):
986 """Helper: deserialize int value from JSON string."""
987 if isinstance(value, int):
988 return value
989 if value is not None:
990 return int(value)
991
992
993def _str_or_none(value):
994 """Helper: serialize value to JSON string."""
995 if value is not None:
996 return str(value)
997
998
999def _split_id(full_id):
1000 """Helper: split full_id into composite parts.
1001
1002 Args:
1003 full_id (str): Fully-qualified ID in standard SQL format.
1004
1005 Returns:
1006 List[str]: ID's parts separated into components.
1007 """
1008 with_prefix = _PROJECT_PREFIX_PATTERN.match(full_id)
1009 if with_prefix is None:
1010 parts = full_id.split(".")
1011 else:
1012 parts = with_prefix.groups()
1013 parts = [part for part in parts if part]
1014 return parts
1015
1016
1017def _parse_3_part_id(full_id, default_project=None, property_name="table_id"):
1018 output_project_id = default_project
1019 output_dataset_id = None
1020 output_resource_id = None
1021 parts = _split_id(full_id)
1022
1023 if len(parts) != 2 and len(parts) != 3:
1024 raise ValueError(
1025 "{property_name} must be a fully-qualified ID in "
1026 'standard SQL format, e.g., "project.dataset.{property_name}", '
1027 "got {}".format(full_id, property_name=property_name)
1028 )
1029
1030 if len(parts) == 2 and not default_project:
1031 raise ValueError(
1032 "When default_project is not set, {property_name} must be a "
1033 "fully-qualified ID in standard SQL format, "
1034 'e.g., "project.dataset_id.{property_name}", got {}'.format(
1035 full_id, property_name=property_name
1036 )
1037 )
1038
1039 if len(parts) == 2:
1040 output_dataset_id, output_resource_id = parts
1041 else:
1042 output_project_id, output_dataset_id, output_resource_id = parts
1043
1044 return output_project_id, output_dataset_id, output_resource_id
1045
1046
1047def _build_resource_from_properties(obj, filter_fields):
1048 """Build a resource based on a ``_properties`` dictionary, filtered by
1049 ``filter_fields``, which follow the name of the Python object.
1050 """
1051 partial = {}
1052 for filter_field in filter_fields:
1053 api_field = _get_sub_prop(obj._PROPERTY_TO_API_FIELD, filter_field)
1054 if api_field is None and filter_field not in obj._properties:
1055 raise ValueError("No property %s" % filter_field)
1056 elif api_field is not None:
1057 _set_sub_prop(partial, api_field, _get_sub_prop(obj._properties, api_field))
1058 else:
1059 # allows properties that are not defined in the library
1060 # and properties that have the same name as API resource key
1061 partial[filter_field] = obj._properties[filter_field]
1062
1063 return partial
1064
1065
1066def _verify_job_config_type(job_config, expected_type, param_name="job_config"):
1067 if not isinstance(job_config, expected_type):
1068 msg = (
1069 "Expected an instance of {expected_type} class for the {param_name} parameter, "
1070 "but received {param_name} = {job_config}"
1071 )
1072 raise TypeError(
1073 msg.format(
1074 expected_type=expected_type.__name__,
1075 param_name=param_name,
1076 job_config=job_config,
1077 )
1078 )
1079
1080
1081def _isinstance_or_raise(
1082 value: Any,
1083 dtype: Union[Type, Tuple[Type, ...]],
1084 none_allowed: Optional[bool] = False,
1085) -> Any:
1086 """Determine whether a value type matches a given datatype or None.
1087 Args:
1088 value (Any): Value to be checked.
1089 dtype (type): Expected data type or tuple of data types.
1090 none_allowed Optional(bool): whether value is allowed to be None. Default
1091 is False.
1092 Returns:
1093 Any: Returns the input value if the type check is successful.
1094 Raises:
1095 TypeError: If the input value's type does not match the expected data type(s).
1096 """
1097 if none_allowed and value is None:
1098 return value
1099
1100 if isinstance(value, dtype):
1101 return value
1102
1103 or_none = ""
1104 if none_allowed:
1105 or_none = " (or None)"
1106
1107 msg = f"Pass {value} as a '{dtype}'{or_none}. Got {type(value)}."
1108 raise TypeError(msg)