1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16from collections import abc as collections_abc
17import datetime
18import decimal
19import functools
20import numbers
21import re
22import typing
23
24from google.cloud import bigquery
25from google.cloud.bigquery import table, query
26from google.cloud.bigquery.dbapi import exceptions
27
28
29_NUMERIC_SERVER_MIN = decimal.Decimal("-9.9999999999999999999999999999999999999E+28")
30_NUMERIC_SERVER_MAX = decimal.Decimal("9.9999999999999999999999999999999999999E+28")
31
32type_parameters_re = re.compile(
33 r"""
34 \(
35 \s*[0-9]+\s*
36 (,
37 \s*[0-9]+\s*
38 )*
39 \)
40 """,
41 re.VERBOSE,
42)
43
44
45def _parameter_type(name, value, query_parameter_type=None, value_doc=""):
46 if query_parameter_type:
47 # Strip type parameters
48 query_parameter_type = type_parameters_re.sub("", query_parameter_type)
49 try:
50 parameter_type = getattr(
51 query.SqlParameterScalarTypes, query_parameter_type.upper()
52 )._type
53 except AttributeError:
54 raise exceptions.ProgrammingError(
55 f"The given parameter type, {query_parameter_type},"
56 f" for {name} is not a valid BigQuery scalar type."
57 )
58 else:
59 parameter_type = bigquery_scalar_type(value)
60 if parameter_type is None:
61 raise exceptions.ProgrammingError(
62 f"Encountered parameter {name} with "
63 f"{value_doc} value {value} of unexpected type."
64 )
65 return parameter_type
66
67
68def scalar_to_query_parameter(value, name=None, query_parameter_type=None):
69 """Convert a scalar value into a query parameter.
70
71 Args:
72 value (Any):
73 A scalar value to convert into a query parameter.
74
75 name (str):
76 (Optional) Name of the query parameter.
77 query_parameter_type (Optional[str]): Given type for the parameter.
78
79 Returns:
80 google.cloud.bigquery.ScalarQueryParameter:
81 A query parameter corresponding with the type and value of the plain
82 Python object.
83
84 Raises:
85 google.cloud.bigquery.dbapi.exceptions.ProgrammingError:
86 if the type cannot be determined.
87 """
88 return bigquery.ScalarQueryParameter(
89 name, _parameter_type(name, value, query_parameter_type), value
90 )
91
92
93def array_to_query_parameter(value, name=None, query_parameter_type=None):
94 """Convert an array-like value into a query parameter.
95
96 Args:
97 value (Sequence[Any]): The elements of the array (should not be a
98 string-like Sequence).
99 name (Optional[str]): Name of the query parameter.
100 query_parameter_type (Optional[str]): Given type for the parameter.
101
102 Returns:
103 A query parameter corresponding with the type and value of the plain
104 Python object.
105
106 Raises:
107 google.cloud.bigquery.dbapi.exceptions.ProgrammingError:
108 if the type of array elements cannot be determined.
109 """
110 if not array_like(value):
111 raise exceptions.ProgrammingError(
112 "The value of parameter {} must be a sequence that is "
113 "not string-like.".format(name)
114 )
115
116 if query_parameter_type or value:
117 array_type = _parameter_type(
118 name,
119 value[0] if value else None,
120 query_parameter_type,
121 value_doc="array element ",
122 )
123 else:
124 raise exceptions.ProgrammingError(
125 "Encountered an empty array-like value of parameter {}, cannot "
126 "determine array elements type.".format(name)
127 )
128
129 return bigquery.ArrayQueryParameter(name, array_type, value)
130
131
132def _parse_struct_fields(
133 fields,
134 base,
135 parse_struct_field=re.compile(
136 r"""
137 (?:(\w+)\s+) # field name
138 ([A-Z0-9<> ,()]+) # Field type
139 $""",
140 re.VERBOSE | re.IGNORECASE,
141 ).match,
142):
143 # Split a string of struct fields. They're defined by commas, but
144 # we have to avoid splitting on commas internal to fields. For
145 # example:
146 # name string, children array<struct<name string, bdate date>>
147 #
148 # only has 2 top-level fields.
149 fields = fields.split(",")
150 fields = list(reversed(fields)) # in the off chance that there are very many
151 while fields:
152 field = fields.pop()
153 while fields and field.count("<") != field.count(">"):
154 field += "," + fields.pop()
155
156 m = parse_struct_field(field.strip())
157 if not m:
158 raise exceptions.ProgrammingError(
159 f"Invalid struct field, {field}, in {base}"
160 )
161 yield m.group(1, 2)
162
163
164SCALAR, ARRAY, STRUCT = ("s", "a", "r")
165
166
167def _parse_type(
168 type_,
169 name,
170 base,
171 complex_query_parameter_parse=re.compile(
172 r"""
173 \s*
174 (ARRAY|STRUCT|RECORD) # Type
175 \s*
176 <([A-Z0-9_<> ,()]+)> # Subtype(s)
177 \s*$
178 """,
179 re.IGNORECASE | re.VERBOSE,
180 ).match,
181):
182 if "<" not in type_:
183 # Scalar
184
185 # Strip type parameters
186 type_ = type_parameters_re.sub("", type_).strip()
187 try:
188 type_ = getattr(query.SqlParameterScalarTypes, type_.upper())
189 except AttributeError:
190 raise exceptions.ProgrammingError(
191 f"The given parameter type, {type_},"
192 f"{' for ' + name if name else ''}"
193 f" is not a valid BigQuery scalar type, in {base}."
194 )
195 if name:
196 type_ = type_.with_name(name)
197 return SCALAR, type_
198
199 m = complex_query_parameter_parse(type_)
200 if not m:
201 raise exceptions.ProgrammingError(f"Invalid parameter type, {type_}")
202 tname, sub = m.group(1, 2)
203 if tname.upper() == "ARRAY":
204 sub_type = complex_query_parameter_type(None, sub, base)
205 if isinstance(sub_type, query.ArrayQueryParameterType):
206 raise exceptions.ProgrammingError(f"Array can't contain an array in {base}")
207 sub_type._complex__src = sub
208 return ARRAY, sub_type
209 else:
210 return STRUCT, _parse_struct_fields(sub, base)
211
212
213def complex_query_parameter_type(name: typing.Optional[str], type_: str, base: str):
214 """Construct a parameter type (`StructQueryParameterType`) for a complex type
215
216 or a non-complex type that's part of a complex type.
217
218 Examples:
219
220 array<struct<x float64, y float64>>
221
222 struct<name string, children array<struct<name string, bdate date>>>
223
224 This is used for computing array types.
225 """
226
227 type_type, sub_type = _parse_type(type_, name, base)
228 if type_type == SCALAR:
229 result_type = sub_type
230 elif type_type == ARRAY:
231 result_type = query.ArrayQueryParameterType(sub_type, name=name)
232 elif type_type == STRUCT:
233 fields = [
234 complex_query_parameter_type(field_name, field_type, base)
235 for field_name, field_type in sub_type
236 ]
237 result_type = query.StructQueryParameterType(*fields, name=name)
238 else: # pragma: NO COVER
239 raise AssertionError("Bad type_type", type_type) # Can't happen :)
240
241 return result_type
242
243
244def complex_query_parameter(
245 name: typing.Optional[str], value, type_: str, base: typing.Optional[str] = None
246):
247 """
248 Construct a query parameter for a complex type (array or struct record)
249
250 or for a subtype, which may not be complex
251
252 Examples:
253
254 array<struct<x float64, y float64>>
255
256 struct<name string, children array<struct<name string, bdate date>>>
257
258 """
259 param: typing.Union[
260 query.ScalarQueryParameter,
261 query.ArrayQueryParameter,
262 query.StructQueryParameter,
263 ]
264
265 base = base or type_
266
267 type_type, sub_type = _parse_type(type_, name, base)
268
269 if type_type == SCALAR:
270 param = query.ScalarQueryParameter(name, sub_type._type, value)
271 elif type_type == ARRAY:
272 if not array_like(value):
273 raise exceptions.ProgrammingError(
274 f"Array type with non-array-like value"
275 f" with type {type(value).__name__}"
276 )
277 param = query.ArrayQueryParameter(
278 name,
279 sub_type,
280 (
281 value
282 if isinstance(sub_type, query.ScalarQueryParameterType)
283 else [
284 complex_query_parameter(None, v, sub_type._complex__src, base)
285 for v in value
286 ]
287 ),
288 )
289 elif type_type == STRUCT:
290 if not isinstance(value, collections_abc.Mapping):
291 raise exceptions.ProgrammingError(f"Non-mapping value for type {type_}")
292 value_keys = set(value)
293 fields = []
294 for field_name, field_type in sub_type:
295 if field_name not in value:
296 raise exceptions.ProgrammingError(
297 f"No field value for {field_name} in {type_}"
298 )
299 value_keys.remove(field_name)
300 fields.append(
301 complex_query_parameter(field_name, value[field_name], field_type, base)
302 )
303 if value_keys:
304 raise exceptions.ProgrammingError(f"Extra data keys for {type_}")
305
306 param = query.StructQueryParameter(name, *fields)
307 else: # pragma: NO COVER
308 raise AssertionError("Bad type_type", type_type) # Can't happen :)
309
310 return param
311
312
313def _dispatch_parameter(type_, value, name=None):
314 if type_ is not None and "<" in type_:
315 param = complex_query_parameter(name, value, type_)
316 elif isinstance(value, collections_abc.Mapping):
317 raise NotImplementedError(
318 f"STRUCT-like parameter values are not supported"
319 f"{' (parameter ' + name + ')' if name else ''},"
320 f" unless an explicit type is give in the parameter placeholder"
321 f" (e.g. '%({name if name else ''}:struct<...>)s')."
322 )
323 elif array_like(value):
324 param = array_to_query_parameter(value, name, type_)
325 else:
326 param = scalar_to_query_parameter(value, name, type_)
327
328 return param
329
330
331def to_query_parameters_list(parameters, parameter_types):
332 """Converts a sequence of parameter values into query parameters.
333
334 Args:
335 parameters (Sequence[Any]): Sequence of query parameter values.
336 parameter_types:
337 A list of parameter types, one for each parameter.
338 Unknown types are provided as None.
339
340 Returns:
341 List[google.cloud.bigquery.query._AbstractQueryParameter]:
342 A list of query parameters.
343 """
344 return [
345 _dispatch_parameter(type_, value)
346 for value, type_ in zip(parameters, parameter_types)
347 ]
348
349
350def to_query_parameters_dict(parameters, query_parameter_types):
351 """Converts a dictionary of parameter values into query parameters.
352
353 Args:
354 parameters (Mapping[str, Any]): Dictionary of query parameter values.
355 parameter_types:
356 A dictionary of parameter types. It needn't have a key for each
357 parameter.
358
359 Returns:
360 List[google.cloud.bigquery.query._AbstractQueryParameter]:
361 A list of named query parameters.
362 """
363 return [
364 _dispatch_parameter(query_parameter_types.get(name), value, name)
365 for name, value in parameters.items()
366 ]
367
368
369def to_query_parameters(parameters, parameter_types):
370 """Converts DB-API parameter values into query parameters.
371
372 Args:
373 parameters (Union[Mapping[str, Any], Sequence[Any]]):
374 A dictionary or sequence of query parameter values.
375 parameter_types (Union[Mapping[str, str], Sequence[str]]):
376 A dictionary or list of parameter types.
377
378 If parameters is a mapping, then this must be a dictionary
379 of parameter types. It needn't have a key for each
380 parameter.
381
382 If parameters is a sequence, then this must be a list of
383 parameter types, one for each paramater. Unknown types
384 are provided as None.
385
386 Returns:
387 List[google.cloud.bigquery.query._AbstractQueryParameter]:
388 A list of query parameters.
389 """
390 if parameters is None:
391 return []
392
393 if isinstance(parameters, collections_abc.Mapping):
394 return to_query_parameters_dict(parameters, parameter_types)
395 else:
396 return to_query_parameters_list(parameters, parameter_types)
397
398
399def bigquery_scalar_type(value):
400 """Return a BigQuery name of the scalar type that matches the given value.
401
402 If the scalar type name could not be determined (e.g. for non-scalar
403 values), ``None`` is returned.
404
405 Args:
406 value (Any)
407
408 Returns:
409 Optional[str]: The BigQuery scalar type name.
410 """
411 if isinstance(value, bool):
412 return "BOOL"
413 elif isinstance(value, numbers.Integral):
414 return "INT64"
415 elif isinstance(value, numbers.Real):
416 return "FLOAT64"
417 elif isinstance(value, decimal.Decimal):
418 vtuple = value.as_tuple()
419 # NUMERIC values have precision of 38 (number of digits) and scale of 9 (number
420 # of fractional digits), and their max absolute value must be strictly smaller
421 # than 1.0E+29.
422 # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
423 if (
424 len(vtuple.digits) <= 38 # max precision: 38
425 and vtuple.exponent >= -9 # max scale: 9
426 and _NUMERIC_SERVER_MIN <= value <= _NUMERIC_SERVER_MAX
427 ):
428 return "NUMERIC"
429 else:
430 return "BIGNUMERIC"
431
432 elif isinstance(value, str):
433 return "STRING"
434 elif isinstance(value, bytes):
435 return "BYTES"
436 elif isinstance(value, datetime.datetime):
437 return "DATETIME" if value.tzinfo is None else "TIMESTAMP"
438 elif isinstance(value, datetime.date):
439 return "DATE"
440 elif isinstance(value, datetime.time):
441 return "TIME"
442
443 return None
444
445
446def array_like(value):
447 """Determine if the given value is array-like.
448
449 Examples of array-like values (as interpreted by this function) are
450 sequences such as ``list`` and ``tuple``, but not strings and other
451 iterables such as sets.
452
453 Args:
454 value (Any)
455
456 Returns:
457 bool: ``True`` if the value is considered array-like, ``False`` otherwise.
458 """
459 return isinstance(value, collections_abc.Sequence) and not isinstance(
460 value, (str, bytes, bytearray)
461 )
462
463
464def to_bq_table_rows(rows_iterable):
465 """Convert table rows to BigQuery table Row instances.
466
467 Args:
468 rows_iterable (Iterable[Mapping]):
469 An iterable of row data items to convert to ``Row`` instances.
470
471 Returns:
472 Iterable[google.cloud.bigquery.table.Row]
473 """
474
475 def to_table_row(row):
476 # NOTE: We fetch ARROW values, thus we need to convert them to Python
477 # objects with as_py().
478 values = tuple(value.as_py() for value in row.values())
479 keys_to_index = {key: i for i, key in enumerate(row.keys())}
480 return table.Row(values, keys_to_index)
481
482 return (to_table_row(row_data) for row_data in rows_iterable)
483
484
485def raise_on_closed(
486 exc_msg, exc_class=exceptions.ProgrammingError, closed_attr_name="_closed"
487):
488 """Make public instance methods raise an error if the instance is closed."""
489
490 def _raise_on_closed(method):
491 """Make a non-static method raise an error if its containing instance is closed."""
492
493 def with_closed_check(self, *args, **kwargs):
494 if getattr(self, closed_attr_name):
495 raise exc_class(exc_msg)
496 return method(self, *args, **kwargs)
497
498 functools.update_wrapper(with_closed_check, method)
499 return with_closed_check
500
501 def decorate_public_methods(klass):
502 """Apply ``_raise_on_closed()`` decorator to public instance methods."""
503 for name in dir(klass):
504 if name.startswith("_") and name != "__iter__":
505 continue
506
507 member = getattr(klass, name)
508 if not callable(member):
509 continue
510
511 # We need to check for class/static methods directly in the instance
512 # __dict__, not via the retrieved attribute (`member`), as the
513 # latter is already a callable *produced* by one of these descriptors.
514 if isinstance(klass.__dict__[name], (staticmethod, classmethod)):
515 continue
516
517 member = _raise_on_closed(member)
518 setattr(klass, name, member)
519
520 return klass
521
522 return decorate_public_methods