1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Cursor for the Google BigQuery DB-API."""
16
17from __future__ import annotations
18
19import collections
20from collections import abc as collections_abc
21import re
22from typing import Optional
23
24try:
25 from google.cloud.bigquery_storage import ArrowSerializationOptions
26except ImportError:
27 _ARROW_COMPRESSION_SUPPORT = False
28else:
29 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
30 _ARROW_COMPRESSION_SUPPORT = True
31
32from google.cloud.bigquery import job
33from google.cloud.bigquery.dbapi import _helpers
34from google.cloud.bigquery.dbapi import exceptions
35import google.cloud.exceptions # type: ignore
36
37
38# Per PEP 249: A 7-item sequence containing information describing one result
39# column. The first two items (name and type_code) are mandatory, the other
40# five are optional and are set to None if no meaningful values can be
41# provided.
42Column = collections.namedtuple(
43 "Column",
44 [
45 "name",
46 "type_code",
47 "display_size",
48 "internal_size",
49 "precision",
50 "scale",
51 "null_ok",
52 ],
53)
54
55
56@_helpers.raise_on_closed("Operating on a closed cursor.")
57class Cursor(object):
58 """DB-API Cursor to Google BigQuery.
59
60 Args:
61 connection (google.cloud.bigquery.dbapi.Connection):
62 A DB-API connection to Google BigQuery.
63 """
64
65 def __init__(self, connection):
66 self.connection = connection
67 self.description = None
68 # Per PEP 249: The attribute is -1 in case no .execute*() has been
69 # performed on the cursor or the rowcount of the last operation
70 # cannot be determined by the interface.
71 self.rowcount = -1
72 # Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch
73 # a single row at a time. However, we deviate from that, and set the
74 # default to None, allowing the backend to automatically determine the
75 # most appropriate size.
76 self.arraysize = None
77 self._query_data = None
78 self._query_rows = None
79 self._closed = False
80
81 @property
82 def query_job(self) -> Optional[job.QueryJob]:
83 """google.cloud.bigquery.job.query.QueryJob | None: The query job
84 created by the last ``execute*()`` call, if a query job was created.
85
86 .. note::
87 If the last ``execute*()`` call was ``executemany()``, this is the
88 last job created by ``executemany()``."""
89 rows = self._query_rows
90
91 if rows is None:
92 return None
93
94 job_id = rows.job_id
95 project = rows.project
96 location = rows.location
97 client = self.connection._client
98
99 if job_id is None:
100 return None
101
102 return client.get_job(job_id, location=location, project=project)
103
104 def close(self):
105 """Mark the cursor as closed, preventing its further use."""
106 self._closed = True
107
108 def _set_description(self, schema):
109 """Set description from schema.
110
111 Args:
112 schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
113 A description of fields in the schema.
114 """
115 if schema is None:
116 self.description = None
117 return
118
119 self.description = tuple(
120 Column(
121 name=field.name,
122 type_code=field.field_type,
123 display_size=None,
124 internal_size=None,
125 precision=None,
126 scale=None,
127 null_ok=field.is_nullable,
128 )
129 for field in schema
130 )
131
132 def _set_rowcount(self, rows):
133 """Set the rowcount from a RowIterator.
134
135 Normally, this sets rowcount to the number of rows returned by the
136 query, but if it was a DML statement, it sets rowcount to the number
137 of modified rows.
138
139 Args:
140 query_results (google.cloud.bigquery.query._QueryResults):
141 Results of a query.
142 """
143 total_rows = 0
144 num_dml_affected_rows = rows.num_dml_affected_rows
145
146 if rows.total_rows is not None and rows.total_rows > 0:
147 total_rows = rows.total_rows
148 if num_dml_affected_rows is not None and num_dml_affected_rows > 0:
149 total_rows = num_dml_affected_rows
150 self.rowcount = total_rows
151
152 def execute(self, operation, parameters=None, job_id=None, job_config=None):
153 """Prepare and execute a database operation.
154
155 .. note::
156 When setting query parameters, values which are "text"
157 (``unicode`` in Python2, ``str`` in Python3) will use
158 the 'STRING' BigQuery type. Values which are "bytes" (``str`` in
159 Python2, ``bytes`` in Python3), will use using the 'BYTES' type.
160
161 A `~datetime.datetime` parameter without timezone information uses
162 the 'DATETIME' BigQuery type (example: Global Pi Day Celebration
163 March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with
164 timezone information uses the 'TIMESTAMP' BigQuery type (example:
165 a wedding on April 29, 2011 at 11am, British Summer Time).
166
167 For more information about BigQuery data types, see:
168 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
169
170 ``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not
171 yet supported. See:
172 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524
173
174 Args:
175 operation (str): A Google BigQuery query string.
176
177 parameters (Union[Mapping[str, Any], Sequence[Any]]):
178 (Optional) dictionary or sequence of parameter values.
179
180 job_id (str | None):
181 (Optional and discouraged) The job ID to use when creating
182 the query job. For best performance and reliability, manually
183 setting a job ID is discouraged.
184
185 job_config (google.cloud.bigquery.job.QueryJobConfig):
186 (Optional) Extra configuration options for the query job.
187 """
188 formatted_operation, parameter_types = _format_operation(operation, parameters)
189 self._execute(
190 formatted_operation, parameters, job_id, job_config, parameter_types
191 )
192
193 def _execute(
194 self, formatted_operation, parameters, job_id, job_config, parameter_types
195 ):
196 self._query_data = None
197 self._query_results = None
198 client = self.connection._client
199
200 # The DB-API uses the pyformat formatting, since the way BigQuery does
201 # query parameters was not one of the standard options. Convert both
202 # the query and the parameters to the format expected by the client
203 # libraries.
204 query_parameters = _helpers.to_query_parameters(parameters, parameter_types)
205
206 config = job_config or job.QueryJobConfig()
207 config.query_parameters = query_parameters
208
209 # Start the query and wait for the query to finish.
210 try:
211 if job_id is not None:
212 rows = client.query(
213 formatted_operation,
214 job_config=job_config,
215 job_id=job_id,
216 ).result(
217 page_size=self.arraysize,
218 )
219 else:
220 rows = client.query_and_wait(
221 formatted_operation,
222 job_config=config,
223 page_size=self.arraysize,
224 )
225 except google.cloud.exceptions.GoogleCloudError as exc:
226 raise exceptions.DatabaseError(exc)
227
228 self._query_rows = rows
229 self._set_description(rows.schema)
230
231 if config.dry_run:
232 self.rowcount = 0
233 else:
234 self._set_rowcount(rows)
235
236 def executemany(self, operation, seq_of_parameters):
237 """Prepare and execute a database operation multiple times.
238
239 Args:
240 operation (str): A Google BigQuery query string.
241
242 seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]):
243 Sequence of many sets of parameter values.
244 """
245 if seq_of_parameters:
246 rowcount = 0
247 # There's no reason to format the line more than once, as
248 # the operation only barely depends on the parameters. So
249 # we just use the first set of parameters. If there are
250 # different numbers or types of parameters, we'll error
251 # anyway.
252 formatted_operation, parameter_types = _format_operation(
253 operation, seq_of_parameters[0]
254 )
255 for parameters in seq_of_parameters:
256 self._execute(
257 formatted_operation, parameters, None, None, parameter_types
258 )
259 rowcount += self.rowcount
260
261 self.rowcount = rowcount
262
263 def _try_fetch(self, size=None):
264 """Try to start fetching data, if not yet started.
265
266 Mutates self to indicate that iteration has started.
267 """
268 if self._query_data is not None:
269 # Already started fetching the data.
270 return
271
272 rows = self._query_rows
273 if rows is None:
274 raise exceptions.InterfaceError(
275 "No query results: execute() must be called before fetch."
276 )
277
278 bqstorage_client = self.connection._bqstorage_client
279 if rows._should_use_bqstorage(
280 bqstorage_client,
281 create_bqstorage_client=False,
282 ):
283 rows_iterable = self._bqstorage_fetch(bqstorage_client)
284 self._query_data = _helpers.to_bq_table_rows(rows_iterable)
285 return
286
287 self._query_data = iter(rows)
288
289 def _bqstorage_fetch(self, bqstorage_client):
290 """Start fetching data with the BigQuery Storage API.
291
292 The method assumes that the data about the relevant query job already
293 exists internally.
294
295 Args:
296 bqstorage_client(\
297 google.cloud.bigquery_storage_v1.BigQueryReadClient \
298 ):
299 A client tha know how to talk to the BigQuery Storage API.
300
301 Returns:
302 Iterable[Mapping]:
303 A sequence of rows, represented as dictionaries.
304 """
305 # Hitting this code path with a BQ Storage client instance implies that
306 # bigquery_storage can indeed be imported here without errors.
307 from google.cloud import bigquery_storage
308
309 table_reference = self._query_rows._table
310
311 requested_session = bigquery_storage.types.ReadSession(
312 table=table_reference.to_bqstorage(),
313 data_format=bigquery_storage.types.DataFormat.ARROW,
314 )
315
316 if _ARROW_COMPRESSION_SUPPORT:
317 requested_session.read_options.arrow_serialization_options.buffer_compression = (
318 ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
319 )
320
321 read_session = bqstorage_client.create_read_session(
322 parent="projects/{}".format(table_reference.project),
323 read_session=requested_session,
324 # a single stream only, as DB API is not well-suited for multithreading
325 max_stream_count=1,
326 retry=None,
327 timeout=None,
328 )
329
330 if not read_session.streams:
331 return iter([]) # empty table, nothing to read
332
333 stream_name = read_session.streams[0].name
334 read_rows_stream = bqstorage_client.read_rows(stream_name)
335
336 rows_iterable = read_rows_stream.rows(read_session)
337 return rows_iterable
338
339 def fetchone(self):
340 """Fetch a single row from the results of the last ``execute*()`` call.
341
342 .. note::
343 If a dry run query was executed, no rows are returned.
344
345 Returns:
346 Tuple:
347 A tuple representing a row or ``None`` if no more data is
348 available.
349
350 Raises:
351 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
352 """
353 self._try_fetch()
354 try:
355 return next(self._query_data)
356 except StopIteration:
357 return None
358
359 def fetchmany(self, size=None):
360 """Fetch multiple results from the last ``execute*()`` call.
361
362 .. note::
363 If a dry run query was executed, no rows are returned.
364
365 .. note::
366 The size parameter is not used for the request/response size.
367 Set the ``arraysize`` attribute before calling ``execute()`` to
368 set the batch size.
369
370 Args:
371 size (int):
372 (Optional) Maximum number of rows to return. Defaults to the
373 ``arraysize`` property value. If ``arraysize`` is not set, it
374 defaults to ``1``.
375
376 Returns:
377 List[Tuple]: A list of rows.
378
379 Raises:
380 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
381 """
382 if size is None:
383 # Since self.arraysize can be None (a deviation from PEP 249),
384 # use an actual PEP 249 default of 1 in such case (*some* number
385 # is needed here).
386 size = self.arraysize if self.arraysize else 1
387
388 self._try_fetch(size=size)
389 rows = []
390
391 for row in self._query_data:
392 rows.append(row)
393 if len(rows) >= size:
394 break
395
396 return rows
397
398 def fetchall(self):
399 """Fetch all remaining results from the last ``execute*()`` call.
400
401 .. note::
402 If a dry run query was executed, no rows are returned.
403
404 Returns:
405 List[Tuple]: A list of all the rows in the results.
406
407 Raises:
408 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
409 """
410 self._try_fetch()
411 return list(self._query_data)
412
413 def setinputsizes(self, sizes):
414 """No-op, but for consistency raise an error if cursor is closed."""
415
416 def setoutputsize(self, size, column=None):
417 """No-op, but for consistency raise an error if cursor is closed."""
418
419 def __iter__(self):
420 self._try_fetch()
421 return iter(self._query_data)
422
423
424def _format_operation_list(operation, parameters):
425 """Formats parameters in operation in the way BigQuery expects.
426
427 The input operation will be a query like ``SELECT %s`` and the output
428 will be a query like ``SELECT ?``.
429
430 Args:
431 operation (str): A Google BigQuery query string.
432
433 parameters (Sequence[Any]): Sequence of parameter values.
434
435 Returns:
436 str: A formatted query string.
437
438 Raises:
439 google.cloud.bigquery.dbapi.ProgrammingError:
440 if a parameter used in the operation is not found in the
441 ``parameters`` argument.
442 """
443 formatted_params = ["?" for _ in parameters]
444
445 try:
446 return operation % tuple(formatted_params)
447 except (TypeError, ValueError) as exc:
448 raise exceptions.ProgrammingError(exc)
449
450
451def _format_operation_dict(operation, parameters):
452 """Formats parameters in operation in the way BigQuery expects.
453
454 The input operation will be a query like ``SELECT %(namedparam)s`` and
455 the output will be a query like ``SELECT @namedparam``.
456
457 Args:
458 operation (str): A Google BigQuery query string.
459
460 parameters (Mapping[str, Any]): Dictionary of parameter values.
461
462 Returns:
463 str: A formatted query string.
464
465 Raises:
466 google.cloud.bigquery.dbapi.ProgrammingError:
467 if a parameter used in the operation is not found in the
468 ``parameters`` argument.
469 """
470 formatted_params = {}
471 for name in parameters:
472 escaped_name = name.replace("`", r"\`")
473 formatted_params[name] = "@`{}`".format(escaped_name)
474
475 try:
476 return operation % formatted_params
477 except (KeyError, ValueError, TypeError) as exc:
478 raise exceptions.ProgrammingError(exc)
479
480
481def _format_operation(operation, parameters):
482 """Formats parameters in operation in way BigQuery expects.
483
484 Args:
485 operation (str): A Google BigQuery query string.
486
487 parameters (Union[Mapping[str, Any], Sequence[Any]]):
488 Optional parameter values.
489
490 Returns:
491 str: A formatted query string.
492
493 Raises:
494 google.cloud.bigquery.dbapi.ProgrammingError:
495 if a parameter used in the operation is not found in the
496 ``parameters`` argument.
497 """
498 if parameters is None or len(parameters) == 0:
499 return operation.replace("%%", "%"), None # Still do percent de-escaping.
500
501 operation, parameter_types = _extract_types(operation)
502 if parameter_types is None:
503 raise exceptions.ProgrammingError(
504 f"Parameters were provided, but {repr(operation)} has no placeholders."
505 )
506
507 if isinstance(parameters, collections_abc.Mapping):
508 return _format_operation_dict(operation, parameters), parameter_types
509
510 return _format_operation_list(operation, parameters), parameter_types
511
512
513def _extract_types(
514 operation,
515 extra_type_sub=re.compile(
516 r"""
517 (%*) # Extra %s. We'll deal with these in the replacement code
518
519 % # Beginning of replacement, %s, %(...)s
520
521 (?:\( # Begin of optional name and/or type
522 ([^:)]*) # name
523 (?:: # ':' introduces type
524 ( # start of type group
525 [a-zA-Z0-9_<>, ]+ # First part, no parens
526
527 (?: # start sets of parens + non-paren text
528 \([0-9 ,]+\) # comma-separated groups of digits in parens
529 # (e.g. string(10))
530 (?=[, >)]) # Must be followed by ,>) or space
531 [a-zA-Z0-9<>, ]* # Optional non-paren chars
532 )* # Can be zero or more of parens and following text
533 ) # end of type group
534 )? # close type clause ":type"
535 \))? # End of optional name and/or type
536
537 s # End of replacement
538 """,
539 re.VERBOSE,
540 ).sub,
541):
542 """Remove type information from parameter placeholders.
543
544 For every parameter of the form %(name:type)s, replace with %(name)s and add the
545 item name->type to dict that's returned.
546
547 Returns operation without type information and a dictionary of names and types.
548 """
549 parameter_types = None
550
551 def repl(m):
552 nonlocal parameter_types
553 prefix, name, type_ = m.groups()
554 if len(prefix) % 2:
555 # The prefix has an odd number of %s, the last of which
556 # escapes the % we're looking for, so we don't want to
557 # change anything.
558 return m.group(0)
559
560 try:
561 if name:
562 if not parameter_types:
563 parameter_types = {}
564 if type_:
565 if name in parameter_types:
566 if type_ != parameter_types[name]:
567 raise exceptions.ProgrammingError(
568 f"Conflicting types for {name}: "
569 f"{parameter_types[name]} and {type_}."
570 )
571 else:
572 parameter_types[name] = type_
573 else:
574 if not isinstance(parameter_types, dict):
575 raise TypeError()
576
577 return f"{prefix}%({name})s"
578 else:
579 if parameter_types is None:
580 parameter_types = []
581 parameter_types.append(type_)
582 return f"{prefix}%s"
583 except (AttributeError, TypeError):
584 raise exceptions.ProgrammingError(
585 f"{repr(operation)} mixes named and unamed parameters."
586 )
587
588 return extra_type_sub(repl, operation), parameter_types