Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/dbapi/cursor.py: 21%
176 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Cursor for the Google BigQuery DB-API."""
17import collections
18from collections import abc as collections_abc
19import copy
20import logging
21import re
23try:
24 from google.cloud.bigquery_storage import ArrowSerializationOptions
25except ImportError:
26 _ARROW_COMPRESSION_SUPPORT = False
27else:
28 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
29 _ARROW_COMPRESSION_SUPPORT = True
31from google.cloud.bigquery import job
32from google.cloud.bigquery.dbapi import _helpers
33from google.cloud.bigquery.dbapi import exceptions
34import google.cloud.exceptions # type: ignore
37_LOGGER = logging.getLogger(__name__)
39# Per PEP 249: A 7-item sequence containing information describing one result
40# column. The first two items (name and type_code) are mandatory, the other
41# five are optional and are set to None if no meaningful values can be
42# provided.
43Column = collections.namedtuple(
44 "Column",
45 [
46 "name",
47 "type_code",
48 "display_size",
49 "internal_size",
50 "precision",
51 "scale",
52 "null_ok",
53 ],
54)
57@_helpers.raise_on_closed("Operating on a closed cursor.")
58class Cursor(object):
59 """DB-API Cursor to Google BigQuery.
61 Args:
62 connection (google.cloud.bigquery.dbapi.Connection):
63 A DB-API connection to Google BigQuery.
64 """
66 def __init__(self, connection):
67 self.connection = connection
68 self.description = None
69 # Per PEP 249: The attribute is -1 in case no .execute*() has been
70 # performed on the cursor or the rowcount of the last operation
71 # cannot be determined by the interface.
72 self.rowcount = -1
73 # Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch
74 # a single row at a time. However, we deviate from that, and set the
75 # default to None, allowing the backend to automatically determine the
76 # most appropriate size.
77 self.arraysize = None
78 self._query_data = None
79 self._query_job = None
80 self._closed = False
82 def close(self):
83 """Mark the cursor as closed, preventing its further use."""
84 self._closed = True
86 def _set_description(self, schema):
87 """Set description from schema.
89 Args:
90 schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
91 A description of fields in the schema.
92 """
93 if schema is None:
94 self.description = None
95 return
97 self.description = tuple(
98 Column(
99 name=field.name,
100 type_code=field.field_type,
101 display_size=None,
102 internal_size=None,
103 precision=None,
104 scale=None,
105 null_ok=field.is_nullable,
106 )
107 for field in schema
108 )
110 def _set_rowcount(self, query_results):
111 """Set the rowcount from query results.
113 Normally, this sets rowcount to the number of rows returned by the
114 query, but if it was a DML statement, it sets rowcount to the number
115 of modified rows.
117 Args:
118 query_results (google.cloud.bigquery.query._QueryResults):
119 Results of a query.
120 """
121 total_rows = 0
122 num_dml_affected_rows = query_results.num_dml_affected_rows
124 if query_results.total_rows is not None and query_results.total_rows > 0:
125 total_rows = query_results.total_rows
126 if num_dml_affected_rows is not None and num_dml_affected_rows > 0:
127 total_rows = num_dml_affected_rows
128 self.rowcount = total_rows
130 def execute(self, operation, parameters=None, job_id=None, job_config=None):
131 """Prepare and execute a database operation.
133 .. note::
134 When setting query parameters, values which are "text"
135 (``unicode`` in Python2, ``str`` in Python3) will use
136 the 'STRING' BigQuery type. Values which are "bytes" (``str`` in
137 Python2, ``bytes`` in Python3), will use using the 'BYTES' type.
139 A `~datetime.datetime` parameter without timezone information uses
140 the 'DATETIME' BigQuery type (example: Global Pi Day Celebration
141 March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with
142 timezone information uses the 'TIMESTAMP' BigQuery type (example:
143 a wedding on April 29, 2011 at 11am, British Summer Time).
145 For more information about BigQuery data types, see:
146 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
148 ``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not
149 yet supported. See:
150 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524
152 Args:
153 operation (str): A Google BigQuery query string.
155 parameters (Union[Mapping[str, Any], Sequence[Any]]):
156 (Optional) dictionary or sequence of parameter values.
158 job_id (str):
159 (Optional) The job_id to use. If not set, a job ID
160 is generated at random.
162 job_config (google.cloud.bigquery.job.QueryJobConfig):
163 (Optional) Extra configuration options for the query job.
164 """
165 formatted_operation, parameter_types = _format_operation(operation, parameters)
166 self._execute(
167 formatted_operation, parameters, job_id, job_config, parameter_types
168 )
170 def _execute(
171 self, formatted_operation, parameters, job_id, job_config, parameter_types
172 ):
173 self._query_data = None
174 self._query_job = None
175 client = self.connection._client
177 # The DB-API uses the pyformat formatting, since the way BigQuery does
178 # query parameters was not one of the standard options. Convert both
179 # the query and the parameters to the format expected by the client
180 # libraries.
181 query_parameters = _helpers.to_query_parameters(parameters, parameter_types)
183 if client._default_query_job_config:
184 if job_config:
185 config = job_config._fill_from_default(client._default_query_job_config)
186 else:
187 config = copy.deepcopy(client._default_query_job_config)
188 else:
189 config = job_config or job.QueryJobConfig(use_legacy_sql=False)
191 config.query_parameters = query_parameters
192 self._query_job = client.query(
193 formatted_operation, job_config=config, job_id=job_id
194 )
196 if self._query_job.dry_run:
197 self._set_description(schema=None)
198 self.rowcount = 0
199 return
201 # Wait for the query to finish.
202 try:
203 self._query_job.result()
204 except google.cloud.exceptions.GoogleCloudError as exc:
205 raise exceptions.DatabaseError(exc)
207 query_results = self._query_job._query_results
208 self._set_rowcount(query_results)
209 self._set_description(query_results.schema)
211 def executemany(self, operation, seq_of_parameters):
212 """Prepare and execute a database operation multiple times.
214 Args:
215 operation (str): A Google BigQuery query string.
217 seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]):
218 Sequence of many sets of parameter values.
219 """
220 if seq_of_parameters:
221 rowcount = 0
222 # There's no reason to format the line more than once, as
223 # the operation only barely depends on the parameters. So
224 # we just use the first set of parameters. If there are
225 # different numbers or types of parameters, we'll error
226 # anyway.
227 formatted_operation, parameter_types = _format_operation(
228 operation, seq_of_parameters[0]
229 )
230 for parameters in seq_of_parameters:
231 self._execute(
232 formatted_operation, parameters, None, None, parameter_types
233 )
234 rowcount += self.rowcount
236 self.rowcount = rowcount
238 def _try_fetch(self, size=None):
239 """Try to start fetching data, if not yet started.
241 Mutates self to indicate that iteration has started.
242 """
243 if self._query_job is None:
244 raise exceptions.InterfaceError(
245 "No query results: execute() must be called before fetch."
246 )
248 if self._query_job.dry_run:
249 self._query_data = iter([])
250 return
252 if self._query_data is None:
253 bqstorage_client = self.connection._bqstorage_client
255 if bqstorage_client is not None:
256 rows_iterable = self._bqstorage_fetch(bqstorage_client)
257 self._query_data = _helpers.to_bq_table_rows(rows_iterable)
258 return
260 rows_iter = self._query_job.result(page_size=self.arraysize)
261 self._query_data = iter(rows_iter)
263 def _bqstorage_fetch(self, bqstorage_client):
264 """Start fetching data with the BigQuery Storage API.
266 The method assumes that the data about the relevant query job already
267 exists internally.
269 Args:
270 bqstorage_client(\
271 google.cloud.bigquery_storage_v1.BigQueryReadClient \
272 ):
273 A client tha know how to talk to the BigQuery Storage API.
275 Returns:
276 Iterable[Mapping]:
277 A sequence of rows, represented as dictionaries.
278 """
279 # Hitting this code path with a BQ Storage client instance implies that
280 # bigquery_storage can indeed be imported here without errors.
281 from google.cloud import bigquery_storage
283 table_reference = self._query_job.destination
285 requested_session = bigquery_storage.types.ReadSession(
286 table=table_reference.to_bqstorage(),
287 data_format=bigquery_storage.types.DataFormat.ARROW,
288 )
290 if _ARROW_COMPRESSION_SUPPORT:
291 requested_session.read_options.arrow_serialization_options.buffer_compression = (
292 ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
293 )
295 read_session = bqstorage_client.create_read_session(
296 parent="projects/{}".format(table_reference.project),
297 read_session=requested_session,
298 # a single stream only, as DB API is not well-suited for multithreading
299 max_stream_count=1,
300 )
302 if not read_session.streams:
303 return iter([]) # empty table, nothing to read
305 stream_name = read_session.streams[0].name
306 read_rows_stream = bqstorage_client.read_rows(stream_name)
308 rows_iterable = read_rows_stream.rows(read_session)
309 return rows_iterable
311 def fetchone(self):
312 """Fetch a single row from the results of the last ``execute*()`` call.
314 .. note::
315 If a dry run query was executed, no rows are returned.
317 Returns:
318 Tuple:
319 A tuple representing a row or ``None`` if no more data is
320 available.
322 Raises:
323 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
324 """
325 self._try_fetch()
326 try:
327 return next(self._query_data)
328 except StopIteration:
329 return None
331 def fetchmany(self, size=None):
332 """Fetch multiple results from the last ``execute*()`` call.
334 .. note::
335 If a dry run query was executed, no rows are returned.
337 .. note::
338 The size parameter is not used for the request/response size.
339 Set the ``arraysize`` attribute before calling ``execute()`` to
340 set the batch size.
342 Args:
343 size (int):
344 (Optional) Maximum number of rows to return. Defaults to the
345 ``arraysize`` property value. If ``arraysize`` is not set, it
346 defaults to ``1``.
348 Returns:
349 List[Tuple]: A list of rows.
351 Raises:
352 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
353 """
354 if size is None:
355 # Since self.arraysize can be None (a deviation from PEP 249),
356 # use an actual PEP 249 default of 1 in such case (*some* number
357 # is needed here).
358 size = self.arraysize if self.arraysize else 1
360 self._try_fetch(size=size)
361 rows = []
363 for row in self._query_data:
364 rows.append(row)
365 if len(rows) >= size:
366 break
368 return rows
370 def fetchall(self):
371 """Fetch all remaining results from the last ``execute*()`` call.
373 .. note::
374 If a dry run query was executed, no rows are returned.
376 Returns:
377 List[Tuple]: A list of all the rows in the results.
379 Raises:
380 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
381 """
382 self._try_fetch()
383 return list(self._query_data)
385 def setinputsizes(self, sizes):
386 """No-op, but for consistency raise an error if cursor is closed."""
388 def setoutputsize(self, size, column=None):
389 """No-op, but for consistency raise an error if cursor is closed."""
391 def __iter__(self):
392 self._try_fetch()
393 return iter(self._query_data)
396def _format_operation_list(operation, parameters):
397 """Formats parameters in operation in the way BigQuery expects.
399 The input operation will be a query like ``SELECT %s`` and the output
400 will be a query like ``SELECT ?``.
402 Args:
403 operation (str): A Google BigQuery query string.
405 parameters (Sequence[Any]): Sequence of parameter values.
407 Returns:
408 str: A formatted query string.
410 Raises:
411 google.cloud.bigquery.dbapi.ProgrammingError:
412 if a parameter used in the operation is not found in the
413 ``parameters`` argument.
414 """
415 formatted_params = ["?" for _ in parameters]
417 try:
418 return operation % tuple(formatted_params)
419 except (TypeError, ValueError) as exc:
420 raise exceptions.ProgrammingError(exc)
423def _format_operation_dict(operation, parameters):
424 """Formats parameters in operation in the way BigQuery expects.
426 The input operation will be a query like ``SELECT %(namedparam)s`` and
427 the output will be a query like ``SELECT @namedparam``.
429 Args:
430 operation (str): A Google BigQuery query string.
432 parameters (Mapping[str, Any]): Dictionary of parameter values.
434 Returns:
435 str: A formatted query string.
437 Raises:
438 google.cloud.bigquery.dbapi.ProgrammingError:
439 if a parameter used in the operation is not found in the
440 ``parameters`` argument.
441 """
442 formatted_params = {}
443 for name in parameters:
444 escaped_name = name.replace("`", r"\`")
445 formatted_params[name] = "@`{}`".format(escaped_name)
447 try:
448 return operation % formatted_params
449 except (KeyError, ValueError, TypeError) as exc:
450 raise exceptions.ProgrammingError(exc)
453def _format_operation(operation, parameters):
454 """Formats parameters in operation in way BigQuery expects.
456 Args:
457 operation (str): A Google BigQuery query string.
459 parameters (Union[Mapping[str, Any], Sequence[Any]]):
460 Optional parameter values.
462 Returns:
463 str: A formatted query string.
465 Raises:
466 google.cloud.bigquery.dbapi.ProgrammingError:
467 if a parameter used in the operation is not found in the
468 ``parameters`` argument.
469 """
470 if parameters is None or len(parameters) == 0:
471 return operation.replace("%%", "%"), None # Still do percent de-escaping.
473 operation, parameter_types = _extract_types(operation)
474 if parameter_types is None:
475 raise exceptions.ProgrammingError(
476 f"Parameters were provided, but {repr(operation)} has no placeholders."
477 )
479 if isinstance(parameters, collections_abc.Mapping):
480 return _format_operation_dict(operation, parameters), parameter_types
482 return _format_operation_list(operation, parameters), parameter_types
485def _extract_types(
486 operation,
487 extra_type_sub=re.compile(
488 r"""
489 (%*) # Extra %s. We'll deal with these in the replacement code
491 % # Beginning of replacement, %s, %(...)s
493 (?:\( # Begin of optional name and/or type
494 ([^:)]*) # name
495 (?:: # ':' introduces type
496 ( # start of type group
497 [a-zA-Z0-9_<>, ]+ # First part, no parens
499 (?: # start sets of parens + non-paren text
500 \([0-9 ,]+\) # comma-separated groups of digits in parens
501 # (e.g. string(10))
502 (?=[, >)]) # Must be followed by ,>) or space
503 [a-zA-Z0-9<>, ]* # Optional non-paren chars
504 )* # Can be zero or more of parens and following text
505 ) # end of type group
506 )? # close type clause ":type"
507 \))? # End of optional name and/or type
509 s # End of replacement
510 """,
511 re.VERBOSE,
512 ).sub,
513):
514 """Remove type information from parameter placeholders.
516 For every parameter of the form %(name:type)s, replace with %(name)s and add the
517 item name->type to dict that's returned.
519 Returns operation without type information and a dictionary of names and types.
520 """
521 parameter_types = None
523 def repl(m):
524 nonlocal parameter_types
525 prefix, name, type_ = m.groups()
526 if len(prefix) % 2:
527 # The prefix has an odd number of %s, the last of which
528 # escapes the % we're looking for, so we don't want to
529 # change anything.
530 return m.group(0)
532 try:
533 if name:
534 if not parameter_types:
535 parameter_types = {}
536 if type_:
537 if name in parameter_types:
538 if type_ != parameter_types[name]:
539 raise exceptions.ProgrammingError(
540 f"Conflicting types for {name}: "
541 f"{parameter_types[name]} and {type_}."
542 )
543 else:
544 parameter_types[name] = type_
545 else:
546 if not isinstance(parameter_types, dict):
547 raise TypeError()
549 return f"{prefix}%({name})s"
550 else:
551 if parameter_types is None:
552 parameter_types = []
553 parameter_types.append(type_)
554 return f"{prefix}%s"
555 except (AttributeError, TypeError):
556 raise exceptions.ProgrammingError(
557 f"{repr(operation)} mixes named and unamed parameters."
558 )
560 return extra_type_sub(repl, operation), parameter_types