1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Cursor for the Google BigQuery DB-API."""
16
17from __future__ import annotations
18
19import collections
20from collections import abc as collections_abc
21import re
22from typing import Optional
23
24try:
25 from google.cloud.bigquery_storage import ArrowSerializationOptions
26except ImportError:
27 _ARROW_COMPRESSION_SUPPORT = False
28else:
29 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
30 _ARROW_COMPRESSION_SUPPORT = True
31
32from google.cloud.bigquery import job
33from google.cloud.bigquery.dbapi import _helpers
34from google.cloud.bigquery.dbapi import exceptions
35import google.cloud.exceptions # type: ignore
36
37
38# Per PEP 249: A 7-item sequence containing information describing one result
39# column. The first two items (name and type_code) are mandatory, the other
40# five are optional and are set to None if no meaningful values can be
41# provided.
42Column = collections.namedtuple(
43 "Column",
44 [
45 "name",
46 "type_code",
47 "display_size",
48 "internal_size",
49 "precision",
50 "scale",
51 "null_ok",
52 ],
53)
54
55
56@_helpers.raise_on_closed("Operating on a closed cursor.")
57class Cursor(object):
58 """DB-API Cursor to Google BigQuery.
59
60 Args:
61 connection (google.cloud.bigquery.dbapi.Connection):
62 A DB-API connection to Google BigQuery.
63 """
64
65 def __init__(self, connection):
66 self.connection = connection
67 self.description = None
68 # Per PEP 249: The attribute is -1 in case no .execute*() has been
69 # performed on the cursor or the rowcount of the last operation
70 # cannot be determined by the interface.
71 self.rowcount = -1
72 # Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch
73 # a single row at a time. However, we deviate from that, and set the
74 # default to None, allowing the backend to automatically determine the
75 # most appropriate size.
76 self.arraysize = None
77 self._query_data = None
78 self._query_rows = None
79 self._closed = False
80
81 @property
82 def query_job(self) -> Optional[job.QueryJob]:
83 """google.cloud.bigquery.job.query.QueryJob | None: The query job
84 created by the last ``execute*()`` call, if a query job was created.
85
86 .. note::
87 If the last ``execute*()`` call was ``executemany()``, this is the
88 last job created by ``executemany()``."""
89 rows = self._query_rows
90
91 if rows is None:
92 return None
93
94 job_id = rows.job_id
95 project = rows.project
96 location = rows.location
97 client = self.connection._client
98
99 if job_id is None:
100 return None
101
102 return client.get_job(job_id, location=location, project=project)
103
104 def close(self):
105 """Mark the cursor as closed, preventing its further use."""
106 self._closed = True
107
108 def _set_description(self, schema):
109 """Set description from schema.
110
111 Args:
112 schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
113 A description of fields in the schema.
114 """
115 if schema is None:
116 self.description = None
117 return
118
119 self.description = tuple(
120 Column(
121 name=field.name,
122 type_code=field.field_type,
123 display_size=None,
124 internal_size=None,
125 precision=None,
126 scale=None,
127 null_ok=field.is_nullable,
128 )
129 for field in schema
130 )
131
132 def _set_rowcount(self, rows):
133 """Set the rowcount from a RowIterator.
134
135 Normally, this sets rowcount to the number of rows returned by the
136 query, but if it was a DML statement, it sets rowcount to the number
137 of modified rows.
138
139 Args:
140 query_results (google.cloud.bigquery.query._QueryResults):
141 Results of a query.
142 """
143 total_rows = 0
144 num_dml_affected_rows = rows.num_dml_affected_rows
145
146 if rows.total_rows is not None and rows.total_rows > 0:
147 total_rows = rows.total_rows
148 if num_dml_affected_rows is not None and num_dml_affected_rows > 0:
149 total_rows = num_dml_affected_rows
150 self.rowcount = total_rows
151
152 def execute(self, operation, parameters=None, job_id=None, job_config=None):
153 """Prepare and execute a database operation.
154
155 .. note::
156 When setting query parameters, values which are "text"
157 (``unicode`` in Python2, ``str`` in Python3) will use
158 the 'STRING' BigQuery type. Values which are "bytes" (``str`` in
159 Python2, ``bytes`` in Python3), will use using the 'BYTES' type.
160
161 A `~datetime.datetime` parameter without timezone information uses
162 the 'DATETIME' BigQuery type (example: Global Pi Day Celebration
163 March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with
164 timezone information uses the 'TIMESTAMP' BigQuery type (example:
165 a wedding on April 29, 2011 at 11am, British Summer Time).
166
167 For more information about BigQuery data types, see:
168 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
169
170 ``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not
171 yet supported. See:
172 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524
173
174 Args:
175 operation (str): A Google BigQuery query string.
176
177 parameters (Union[Mapping[str, Any], Sequence[Any]]):
178 (Optional) dictionary or sequence of parameter values.
179
180 job_id (str | None):
181 (Optional and discouraged) The job ID to use when creating
182 the query job. For best performance and reliability, manually
183 setting a job ID is discouraged.
184
185 job_config (google.cloud.bigquery.job.QueryJobConfig):
186 (Optional) Extra configuration options for the query job.
187 """
188 formatted_operation, parameter_types = _format_operation(operation, parameters)
189 self._execute(
190 formatted_operation, parameters, job_id, job_config, parameter_types
191 )
192
193 def _execute(
194 self, formatted_operation, parameters, job_id, job_config, parameter_types
195 ):
196 self._query_data = None
197 self._query_results = None
198 client = self.connection._client
199
200 # The DB-API uses the pyformat formatting, since the way BigQuery does
201 # query parameters was not one of the standard options. Convert both
202 # the query and the parameters to the format expected by the client
203 # libraries.
204 query_parameters = _helpers.to_query_parameters(parameters, parameter_types)
205
206 config = job_config or job.QueryJobConfig()
207 config.query_parameters = query_parameters
208
209 # Start the query and wait for the query to finish.
210 try:
211 if job_id is not None:
212 rows = client.query(
213 formatted_operation,
214 job_config=job_config,
215 job_id=job_id,
216 ).result(
217 page_size=self.arraysize,
218 )
219 else:
220 rows = client.query_and_wait(
221 formatted_operation,
222 job_config=config,
223 page_size=self.arraysize,
224 )
225 except google.cloud.exceptions.GoogleCloudError as exc:
226 raise exceptions.DatabaseError(exc)
227
228 self._query_rows = rows
229 self._set_description(rows.schema)
230
231 if config.dry_run:
232 self.rowcount = 0
233 else:
234 self._set_rowcount(rows)
235
236 def executemany(self, operation, seq_of_parameters):
237 """Prepare and execute a database operation multiple times.
238
239 Args:
240 operation (str): A Google BigQuery query string.
241
242 seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]):
243 Sequence of many sets of parameter values.
244 """
245 if seq_of_parameters:
246 rowcount = 0
247 # There's no reason to format the line more than once, as
248 # the operation only barely depends on the parameters. So
249 # we just use the first set of parameters. If there are
250 # different numbers or types of parameters, we'll error
251 # anyway.
252 formatted_operation, parameter_types = _format_operation(
253 operation, seq_of_parameters[0]
254 )
255 for parameters in seq_of_parameters:
256 self._execute(
257 formatted_operation, parameters, None, None, parameter_types
258 )
259 rowcount += self.rowcount
260
261 self.rowcount = rowcount
262
263 def _try_fetch(self, size=None):
264 """Try to start fetching data, if not yet started.
265
266 Mutates self to indicate that iteration has started.
267 """
268 if self._query_data is not None:
269 # Already started fetching the data.
270 return
271
272 rows = self._query_rows
273 if rows is None:
274 raise exceptions.InterfaceError(
275 "No query results: execute() must be called before fetch."
276 )
277
278 bqstorage_client = self.connection._bqstorage_client
279 if rows._should_use_bqstorage(
280 bqstorage_client,
281 create_bqstorage_client=False,
282 ):
283 rows_iterable = self._bqstorage_fetch(bqstorage_client)
284 self._query_data = _helpers.to_bq_table_rows(rows_iterable)
285 return
286
287 self._query_data = iter(rows)
288
289 def _bqstorage_fetch(self, bqstorage_client):
290 """Start fetching data with the BigQuery Storage API.
291
292 The method assumes that the data about the relevant query job already
293 exists internally.
294
295 Args:
296 bqstorage_client(\
297 google.cloud.bigquery_storage_v1.BigQueryReadClient \
298 ):
299 A client tha know how to talk to the BigQuery Storage API.
300
301 Returns:
302 Iterable[Mapping]:
303 A sequence of rows, represented as dictionaries.
304 """
305 # Hitting this code path with a BQ Storage client instance implies that
306 # bigquery_storage can indeed be imported here without errors.
307 from google.cloud import bigquery_storage
308
309 table_reference = self._query_rows._table
310
311 requested_session = bigquery_storage.types.ReadSession(
312 table=table_reference.to_bqstorage(),
313 data_format=bigquery_storage.types.DataFormat.ARROW,
314 )
315
316 if _ARROW_COMPRESSION_SUPPORT:
317 requested_session.read_options.arrow_serialization_options.buffer_compression = (
318 ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
319 )
320
321 read_session = bqstorage_client.create_read_session(
322 parent="projects/{}".format(table_reference.project),
323 read_session=requested_session,
324 # a single stream only, as DB API is not well-suited for multithreading
325 max_stream_count=1,
326 )
327
328 if not read_session.streams:
329 return iter([]) # empty table, nothing to read
330
331 stream_name = read_session.streams[0].name
332 read_rows_stream = bqstorage_client.read_rows(stream_name)
333
334 rows_iterable = read_rows_stream.rows(read_session)
335 return rows_iterable
336
337 def fetchone(self):
338 """Fetch a single row from the results of the last ``execute*()`` call.
339
340 .. note::
341 If a dry run query was executed, no rows are returned.
342
343 Returns:
344 Tuple:
345 A tuple representing a row or ``None`` if no more data is
346 available.
347
348 Raises:
349 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
350 """
351 self._try_fetch()
352 try:
353 return next(self._query_data)
354 except StopIteration:
355 return None
356
357 def fetchmany(self, size=None):
358 """Fetch multiple results from the last ``execute*()`` call.
359
360 .. note::
361 If a dry run query was executed, no rows are returned.
362
363 .. note::
364 The size parameter is not used for the request/response size.
365 Set the ``arraysize`` attribute before calling ``execute()`` to
366 set the batch size.
367
368 Args:
369 size (int):
370 (Optional) Maximum number of rows to return. Defaults to the
371 ``arraysize`` property value. If ``arraysize`` is not set, it
372 defaults to ``1``.
373
374 Returns:
375 List[Tuple]: A list of rows.
376
377 Raises:
378 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
379 """
380 if size is None:
381 # Since self.arraysize can be None (a deviation from PEP 249),
382 # use an actual PEP 249 default of 1 in such case (*some* number
383 # is needed here).
384 size = self.arraysize if self.arraysize else 1
385
386 self._try_fetch(size=size)
387 rows = []
388
389 for row in self._query_data:
390 rows.append(row)
391 if len(rows) >= size:
392 break
393
394 return rows
395
396 def fetchall(self):
397 """Fetch all remaining results from the last ``execute*()`` call.
398
399 .. note::
400 If a dry run query was executed, no rows are returned.
401
402 Returns:
403 List[Tuple]: A list of all the rows in the results.
404
405 Raises:
406 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
407 """
408 self._try_fetch()
409 return list(self._query_data)
410
411 def setinputsizes(self, sizes):
412 """No-op, but for consistency raise an error if cursor is closed."""
413
414 def setoutputsize(self, size, column=None):
415 """No-op, but for consistency raise an error if cursor is closed."""
416
417 def __iter__(self):
418 self._try_fetch()
419 return iter(self._query_data)
420
421
422def _format_operation_list(operation, parameters):
423 """Formats parameters in operation in the way BigQuery expects.
424
425 The input operation will be a query like ``SELECT %s`` and the output
426 will be a query like ``SELECT ?``.
427
428 Args:
429 operation (str): A Google BigQuery query string.
430
431 parameters (Sequence[Any]): Sequence of parameter values.
432
433 Returns:
434 str: A formatted query string.
435
436 Raises:
437 google.cloud.bigquery.dbapi.ProgrammingError:
438 if a parameter used in the operation is not found in the
439 ``parameters`` argument.
440 """
441 formatted_params = ["?" for _ in parameters]
442
443 try:
444 return operation % tuple(formatted_params)
445 except (TypeError, ValueError) as exc:
446 raise exceptions.ProgrammingError(exc)
447
448
449def _format_operation_dict(operation, parameters):
450 """Formats parameters in operation in the way BigQuery expects.
451
452 The input operation will be a query like ``SELECT %(namedparam)s`` and
453 the output will be a query like ``SELECT @namedparam``.
454
455 Args:
456 operation (str): A Google BigQuery query string.
457
458 parameters (Mapping[str, Any]): Dictionary of parameter values.
459
460 Returns:
461 str: A formatted query string.
462
463 Raises:
464 google.cloud.bigquery.dbapi.ProgrammingError:
465 if a parameter used in the operation is not found in the
466 ``parameters`` argument.
467 """
468 formatted_params = {}
469 for name in parameters:
470 escaped_name = name.replace("`", r"\`")
471 formatted_params[name] = "@`{}`".format(escaped_name)
472
473 try:
474 return operation % formatted_params
475 except (KeyError, ValueError, TypeError) as exc:
476 raise exceptions.ProgrammingError(exc)
477
478
479def _format_operation(operation, parameters):
480 """Formats parameters in operation in way BigQuery expects.
481
482 Args:
483 operation (str): A Google BigQuery query string.
484
485 parameters (Union[Mapping[str, Any], Sequence[Any]]):
486 Optional parameter values.
487
488 Returns:
489 str: A formatted query string.
490
491 Raises:
492 google.cloud.bigquery.dbapi.ProgrammingError:
493 if a parameter used in the operation is not found in the
494 ``parameters`` argument.
495 """
496 if parameters is None or len(parameters) == 0:
497 return operation.replace("%%", "%"), None # Still do percent de-escaping.
498
499 operation, parameter_types = _extract_types(operation)
500 if parameter_types is None:
501 raise exceptions.ProgrammingError(
502 f"Parameters were provided, but {repr(operation)} has no placeholders."
503 )
504
505 if isinstance(parameters, collections_abc.Mapping):
506 return _format_operation_dict(operation, parameters), parameter_types
507
508 return _format_operation_list(operation, parameters), parameter_types
509
510
511def _extract_types(
512 operation,
513 extra_type_sub=re.compile(
514 r"""
515 (%*) # Extra %s. We'll deal with these in the replacement code
516
517 % # Beginning of replacement, %s, %(...)s
518
519 (?:\( # Begin of optional name and/or type
520 ([^:)]*) # name
521 (?:: # ':' introduces type
522 ( # start of type group
523 [a-zA-Z0-9_<>, ]+ # First part, no parens
524
525 (?: # start sets of parens + non-paren text
526 \([0-9 ,]+\) # comma-separated groups of digits in parens
527 # (e.g. string(10))
528 (?=[, >)]) # Must be followed by ,>) or space
529 [a-zA-Z0-9<>, ]* # Optional non-paren chars
530 )* # Can be zero or more of parens and following text
531 ) # end of type group
532 )? # close type clause ":type"
533 \))? # End of optional name and/or type
534
535 s # End of replacement
536 """,
537 re.VERBOSE,
538 ).sub,
539):
540 """Remove type information from parameter placeholders.
541
542 For every parameter of the form %(name:type)s, replace with %(name)s and add the
543 item name->type to dict that's returned.
544
545 Returns operation without type information and a dictionary of names and types.
546 """
547 parameter_types = None
548
549 def repl(m):
550 nonlocal parameter_types
551 prefix, name, type_ = m.groups()
552 if len(prefix) % 2:
553 # The prefix has an odd number of %s, the last of which
554 # escapes the % we're looking for, so we don't want to
555 # change anything.
556 return m.group(0)
557
558 try:
559 if name:
560 if not parameter_types:
561 parameter_types = {}
562 if type_:
563 if name in parameter_types:
564 if type_ != parameter_types[name]:
565 raise exceptions.ProgrammingError(
566 f"Conflicting types for {name}: "
567 f"{parameter_types[name]} and {type_}."
568 )
569 else:
570 parameter_types[name] = type_
571 else:
572 if not isinstance(parameter_types, dict):
573 raise TypeError()
574
575 return f"{prefix}%({name})s"
576 else:
577 if parameter_types is None:
578 parameter_types = []
579 parameter_types.append(type_)
580 return f"{prefix}%s"
581 except (AttributeError, TypeError):
582 raise exceptions.ProgrammingError(
583 f"{repr(operation)} mixes named and unamed parameters."
584 )
585
586 return extra_type_sub(repl, operation), parameter_types