1# Copyright 2017 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9#  Unless required by applicable law or agreed to in writing, software 
    10#  distributed under the License is distributed on an "AS IS" BASIS, 
    11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12#  See the License for the specific language governing permissions and 
    13#  limitations under the License. 
    14 
    15"""Cursor for the Google BigQuery DB-API.""" 
    16 
    17from __future__ import annotations 
    18 
    19import collections 
    20from collections import abc as collections_abc 
    21import re 
    22from typing import Optional 
    23 
    24try: 
    25    from google.cloud.bigquery_storage import ArrowSerializationOptions 
    26except ImportError: 
    27    _ARROW_COMPRESSION_SUPPORT = False 
    28else: 
    29    # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. 
    30    _ARROW_COMPRESSION_SUPPORT = True 
    31 
    32from google.cloud.bigquery import job 
    33from google.cloud.bigquery.dbapi import _helpers 
    34from google.cloud.bigquery.dbapi import exceptions 
    35import google.cloud.exceptions  # type: ignore 
    36 
    37 
    38# Per PEP 249: A 7-item sequence containing information describing one result 
    39# column. The first two items (name and type_code) are mandatory, the other 
    40# five are optional and are set to None if no meaningful values can be 
    41# provided. 
    42Column = collections.namedtuple( 
    43    "Column", 
    44    [ 
    45        "name", 
    46        "type_code", 
    47        "display_size", 
    48        "internal_size", 
    49        "precision", 
    50        "scale", 
    51        "null_ok", 
    52    ], 
    53) 
    54 
    55 
    56@_helpers.raise_on_closed("Operating on a closed cursor.") 
    57class Cursor(object): 
    58    """DB-API Cursor to Google BigQuery. 
    59 
    60    Args: 
    61        connection (google.cloud.bigquery.dbapi.Connection): 
    62            A DB-API connection to Google BigQuery. 
    63    """ 
    64 
    65    def __init__(self, connection): 
    66        self.connection = connection 
    67        self.description = None 
    68        # Per PEP 249: The attribute is -1 in case no .execute*() has been 
    69        # performed on the cursor or the rowcount of the last operation 
    70        # cannot be determined by the interface. 
    71        self.rowcount = -1 
    72        # Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch 
    73        # a single row at a time. However, we deviate from that, and set the 
    74        # default to None, allowing the backend to automatically determine the 
    75        # most appropriate size. 
    76        self.arraysize = None 
    77        self._query_data = None 
    78        self._query_rows = None 
    79        self._closed = False 
    80 
    81    @property 
    82    def query_job(self) -> Optional[job.QueryJob]: 
    83        """google.cloud.bigquery.job.query.QueryJob | None: The query job 
    84        created by the last ``execute*()`` call, if a query job was created. 
    85 
    86        .. note:: 
    87            If the last ``execute*()`` call was ``executemany()``, this is the 
    88            last job created by ``executemany()``.""" 
    89        rows = self._query_rows 
    90 
    91        if rows is None: 
    92            return None 
    93 
    94        job_id = rows.job_id 
    95        project = rows.project 
    96        location = rows.location 
    97        client = self.connection._client 
    98 
    99        if job_id is None: 
    100            return None 
    101 
    102        return client.get_job(job_id, location=location, project=project) 
    103 
    104    def close(self): 
    105        """Mark the cursor as closed, preventing its further use.""" 
    106        self._closed = True 
    107 
    108    def _set_description(self, schema): 
    109        """Set description from schema. 
    110 
    111        Args: 
    112            schema (Sequence[google.cloud.bigquery.schema.SchemaField]): 
    113                A description of fields in the schema. 
    114        """ 
    115        if schema is None: 
    116            self.description = None 
    117            return 
    118 
    119        self.description = tuple( 
    120            Column( 
    121                name=field.name, 
    122                type_code=field.field_type, 
    123                display_size=None, 
    124                internal_size=None, 
    125                precision=None, 
    126                scale=None, 
    127                null_ok=field.is_nullable, 
    128            ) 
    129            for field in schema 
    130        ) 
    131 
    132    def _set_rowcount(self, rows): 
    133        """Set the rowcount from a RowIterator. 
    134 
    135        Normally, this sets rowcount to the number of rows returned by the 
    136        query, but if it was a DML statement, it sets rowcount to the number 
    137        of modified rows. 
    138 
    139        Args: 
    140            query_results (google.cloud.bigquery.query._QueryResults): 
    141                Results of a query. 
    142        """ 
    143        total_rows = 0 
    144        num_dml_affected_rows = rows.num_dml_affected_rows 
    145 
    146        if rows.total_rows is not None and rows.total_rows > 0: 
    147            total_rows = rows.total_rows 
    148        if num_dml_affected_rows is not None and num_dml_affected_rows > 0: 
    149            total_rows = num_dml_affected_rows 
    150        self.rowcount = total_rows 
    151 
    152    def execute(self, operation, parameters=None, job_id=None, job_config=None): 
    153        """Prepare and execute a database operation. 
    154 
    155        .. note:: 
    156            When setting query parameters, values which are "text" 
    157            (``unicode`` in Python2, ``str`` in Python3) will use 
    158            the 'STRING' BigQuery type. Values which are "bytes" (``str`` in 
    159            Python2, ``bytes`` in Python3), will use using the 'BYTES' type. 
    160 
    161            A `~datetime.datetime` parameter without timezone information uses 
    162            the 'DATETIME' BigQuery type (example: Global Pi Day Celebration 
    163            March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with 
    164            timezone information uses the 'TIMESTAMP' BigQuery type (example: 
    165            a wedding on April 29, 2011 at 11am, British Summer Time). 
    166 
    167            For more information about BigQuery data types, see: 
    168            https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types 
    169 
    170            ``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not 
    171            yet supported. See: 
    172            https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524 
    173 
    174        Args: 
    175            operation (str): A Google BigQuery query string. 
    176 
    177            parameters (Union[Mapping[str, Any], Sequence[Any]]): 
    178                (Optional) dictionary or sequence of parameter values. 
    179 
    180            job_id (str | None): 
    181                (Optional and discouraged) The job ID to use when creating 
    182                the query job. For best performance and reliability, manually 
    183                setting a job ID is discouraged. 
    184 
    185            job_config (google.cloud.bigquery.job.QueryJobConfig): 
    186                (Optional) Extra configuration options for the query job. 
    187        """ 
    188        formatted_operation, parameter_types = _format_operation(operation, parameters) 
    189        self._execute( 
    190            formatted_operation, parameters, job_id, job_config, parameter_types 
    191        ) 
    192 
    193    def _execute( 
    194        self, formatted_operation, parameters, job_id, job_config, parameter_types 
    195    ): 
    196        self._query_data = None 
    197        self._query_results = None 
    198        client = self.connection._client 
    199 
    200        # The DB-API uses the pyformat formatting, since the way BigQuery does 
    201        # query parameters was not one of the standard options. Convert both 
    202        # the query and the parameters to the format expected by the client 
    203        # libraries. 
    204        query_parameters = _helpers.to_query_parameters(parameters, parameter_types) 
    205 
    206        config = job_config or job.QueryJobConfig() 
    207        config.query_parameters = query_parameters 
    208 
    209        # Start the query and wait for the query to finish. 
    210        try: 
    211            if job_id is not None: 
    212                rows = client.query( 
    213                    formatted_operation, 
    214                    job_config=job_config, 
    215                    job_id=job_id, 
    216                ).result( 
    217                    page_size=self.arraysize, 
    218                ) 
    219            else: 
    220                rows = client.query_and_wait( 
    221                    formatted_operation, 
    222                    job_config=config, 
    223                    page_size=self.arraysize, 
    224                ) 
    225        except google.cloud.exceptions.GoogleCloudError as exc: 
    226            raise exceptions.DatabaseError(exc) 
    227 
    228        self._query_rows = rows 
    229        self._set_description(rows.schema) 
    230 
    231        if config.dry_run: 
    232            self.rowcount = 0 
    233        else: 
    234            self._set_rowcount(rows) 
    235 
    236    def executemany(self, operation, seq_of_parameters): 
    237        """Prepare and execute a database operation multiple times. 
    238 
    239        Args: 
    240            operation (str): A Google BigQuery query string. 
    241 
    242            seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]): 
    243                Sequence of many sets of parameter values. 
    244        """ 
    245        if seq_of_parameters: 
    246            rowcount = 0 
    247            # There's no reason to format the line more than once, as 
    248            # the operation only barely depends on the parameters.  So 
    249            # we just use the first set of parameters. If there are 
    250            # different numbers or types of parameters, we'll error 
    251            # anyway. 
    252            formatted_operation, parameter_types = _format_operation( 
    253                operation, seq_of_parameters[0] 
    254            ) 
    255            for parameters in seq_of_parameters: 
    256                self._execute( 
    257                    formatted_operation, parameters, None, None, parameter_types 
    258                ) 
    259                rowcount += self.rowcount 
    260 
    261            self.rowcount = rowcount 
    262 
    263    def _try_fetch(self, size=None): 
    264        """Try to start fetching data, if not yet started. 
    265 
    266        Mutates self to indicate that iteration has started. 
    267        """ 
    268        if self._query_data is not None: 
    269            # Already started fetching the data. 
    270            return 
    271 
    272        rows = self._query_rows 
    273        if rows is None: 
    274            raise exceptions.InterfaceError( 
    275                "No query results: execute() must be called before fetch." 
    276            ) 
    277 
    278        bqstorage_client = self.connection._bqstorage_client 
    279        if rows._should_use_bqstorage( 
    280            bqstorage_client, 
    281            create_bqstorage_client=False, 
    282        ): 
    283            rows_iterable = self._bqstorage_fetch(bqstorage_client) 
    284            self._query_data = _helpers.to_bq_table_rows(rows_iterable) 
    285            return 
    286 
    287        self._query_data = iter(rows) 
    288 
    289    def _bqstorage_fetch(self, bqstorage_client): 
    290        """Start fetching data with the BigQuery Storage API. 
    291 
    292        The method assumes that the data about the relevant query job already 
    293        exists internally. 
    294 
    295        Args: 
    296            bqstorage_client(\ 
    297                google.cloud.bigquery_storage_v1.BigQueryReadClient \ 
    298            ): 
    299                A client tha know how to talk to the BigQuery Storage API. 
    300 
    301        Returns: 
    302            Iterable[Mapping]: 
    303                A sequence of rows, represented as dictionaries. 
    304        """ 
    305        # Hitting this code path with a BQ Storage client instance implies that 
    306        # bigquery_storage can indeed be imported here without errors. 
    307        from google.cloud import bigquery_storage 
    308 
    309        table_reference = self._query_rows._table 
    310 
    311        requested_session = bigquery_storage.types.ReadSession( 
    312            table=table_reference.to_bqstorage(), 
    313            data_format=bigquery_storage.types.DataFormat.ARROW, 
    314        ) 
    315 
    316        if _ARROW_COMPRESSION_SUPPORT: 
    317            requested_session.read_options.arrow_serialization_options.buffer_compression = ( 
    318                ArrowSerializationOptions.CompressionCodec.LZ4_FRAME 
    319            ) 
    320 
    321        read_session = bqstorage_client.create_read_session( 
    322            parent="projects/{}".format(table_reference.project), 
    323            read_session=requested_session, 
    324            # a single stream only, as DB API is not well-suited for multithreading 
    325            max_stream_count=1, 
    326        ) 
    327 
    328        if not read_session.streams: 
    329            return iter([])  # empty table, nothing to read 
    330 
    331        stream_name = read_session.streams[0].name 
    332        read_rows_stream = bqstorage_client.read_rows(stream_name) 
    333 
    334        rows_iterable = read_rows_stream.rows(read_session) 
    335        return rows_iterable 
    336 
    337    def fetchone(self): 
    338        """Fetch a single row from the results of the last ``execute*()`` call. 
    339 
    340        .. note:: 
    341            If a dry run query was executed, no rows are returned. 
    342 
    343        Returns: 
    344            Tuple: 
    345                A tuple representing a row or ``None`` if no more data is 
    346                available. 
    347 
    348        Raises: 
    349            google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 
    350        """ 
    351        self._try_fetch() 
    352        try: 
    353            return next(self._query_data) 
    354        except StopIteration: 
    355            return None 
    356 
    357    def fetchmany(self, size=None): 
    358        """Fetch multiple results from the last ``execute*()`` call. 
    359 
    360        .. note:: 
    361            If a dry run query was executed, no rows are returned. 
    362 
    363        .. note:: 
    364            The size parameter is not used for the request/response size. 
    365            Set the ``arraysize`` attribute before calling ``execute()`` to 
    366            set the batch size. 
    367 
    368        Args: 
    369            size (int): 
    370                (Optional) Maximum number of rows to return. Defaults to the 
    371                ``arraysize`` property value. If ``arraysize`` is not set, it 
    372                defaults to ``1``. 
    373 
    374        Returns: 
    375            List[Tuple]: A list of rows. 
    376 
    377        Raises: 
    378            google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 
    379        """ 
    380        if size is None: 
    381            # Since self.arraysize can be None (a deviation from PEP 249), 
    382            # use an actual PEP 249 default of 1 in such case (*some* number 
    383            # is needed here). 
    384            size = self.arraysize if self.arraysize else 1 
    385 
    386        self._try_fetch(size=size) 
    387        rows = [] 
    388 
    389        for row in self._query_data: 
    390            rows.append(row) 
    391            if len(rows) >= size: 
    392                break 
    393 
    394        return rows 
    395 
    396    def fetchall(self): 
    397        """Fetch all remaining results from the last ``execute*()`` call. 
    398 
    399        .. note:: 
    400            If a dry run query was executed, no rows are returned. 
    401 
    402        Returns: 
    403            List[Tuple]: A list of all the rows in the results. 
    404 
    405        Raises: 
    406            google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 
    407        """ 
    408        self._try_fetch() 
    409        return list(self._query_data) 
    410 
    411    def setinputsizes(self, sizes): 
    412        """No-op, but for consistency raise an error if cursor is closed.""" 
    413 
    414    def setoutputsize(self, size, column=None): 
    415        """No-op, but for consistency raise an error if cursor is closed.""" 
    416 
    417    def __iter__(self): 
    418        self._try_fetch() 
    419        return iter(self._query_data) 
    420 
    421 
    422def _format_operation_list(operation, parameters): 
    423    """Formats parameters in operation in the way BigQuery expects. 
    424 
    425    The input operation will be a query like ``SELECT %s`` and the output 
    426    will be a query like ``SELECT ?``. 
    427 
    428    Args: 
    429        operation (str): A Google BigQuery query string. 
    430 
    431        parameters (Sequence[Any]): Sequence of parameter values. 
    432 
    433    Returns: 
    434        str: A formatted query string. 
    435 
    436    Raises: 
    437        google.cloud.bigquery.dbapi.ProgrammingError: 
    438            if a parameter used in the operation is not found in the 
    439            ``parameters`` argument. 
    440    """ 
    441    formatted_params = ["?" for _ in parameters] 
    442 
    443    try: 
    444        return operation % tuple(formatted_params) 
    445    except (TypeError, ValueError) as exc: 
    446        raise exceptions.ProgrammingError(exc) 
    447 
    448 
    449def _format_operation_dict(operation, parameters): 
    450    """Formats parameters in operation in the way BigQuery expects. 
    451 
    452    The input operation will be a query like ``SELECT %(namedparam)s`` and 
    453    the output will be a query like ``SELECT @namedparam``. 
    454 
    455    Args: 
    456        operation (str): A Google BigQuery query string. 
    457 
    458        parameters (Mapping[str, Any]): Dictionary of parameter values. 
    459 
    460    Returns: 
    461        str: A formatted query string. 
    462 
    463    Raises: 
    464        google.cloud.bigquery.dbapi.ProgrammingError: 
    465            if a parameter used in the operation is not found in the 
    466            ``parameters`` argument. 
    467    """ 
    468    formatted_params = {} 
    469    for name in parameters: 
    470        escaped_name = name.replace("`", r"\`") 
    471        formatted_params[name] = "@`{}`".format(escaped_name) 
    472 
    473    try: 
    474        return operation % formatted_params 
    475    except (KeyError, ValueError, TypeError) as exc: 
    476        raise exceptions.ProgrammingError(exc) 
    477 
    478 
    479def _format_operation(operation, parameters): 
    480    """Formats parameters in operation in way BigQuery expects. 
    481 
    482    Args: 
    483        operation (str): A Google BigQuery query string. 
    484 
    485        parameters (Union[Mapping[str, Any], Sequence[Any]]): 
    486            Optional parameter values. 
    487 
    488    Returns: 
    489        str: A formatted query string. 
    490 
    491    Raises: 
    492        google.cloud.bigquery.dbapi.ProgrammingError: 
    493            if a parameter used in the operation is not found in the 
    494            ``parameters`` argument. 
    495    """ 
    496    if parameters is None or len(parameters) == 0: 
    497        return operation.replace("%%", "%"), None  # Still do percent de-escaping. 
    498 
    499    operation, parameter_types = _extract_types(operation) 
    500    if parameter_types is None: 
    501        raise exceptions.ProgrammingError( 
    502            f"Parameters were provided, but {repr(operation)} has no placeholders." 
    503        ) 
    504 
    505    if isinstance(parameters, collections_abc.Mapping): 
    506        return _format_operation_dict(operation, parameters), parameter_types 
    507 
    508    return _format_operation_list(operation, parameters), parameter_types 
    509 
    510 
    511def _extract_types( 
    512    operation, 
    513    extra_type_sub=re.compile( 
    514        r""" 
    515        (%*)          # Extra %s.  We'll deal with these in the replacement code 
    516 
    517        %             # Beginning of replacement, %s, %(...)s 
    518 
    519        (?:\(         # Begin of optional name and/or type 
    520        ([^:)]*)      # name 
    521        (?::          # ':' introduces type 
    522          (             # start of type group 
    523            [a-zA-Z0-9_<>, ]+ # First part, no parens 
    524 
    525            (?:               # start sets of parens + non-paren text 
    526              \([0-9 ,]+\)      # comma-separated groups of digits in parens 
    527                                # (e.g. string(10)) 
    528              (?=[, >)])        # Must be followed by ,>) or space 
    529              [a-zA-Z0-9<>, ]*  # Optional non-paren chars 
    530            )*                # Can be zero or more of parens and following text 
    531          )             # end of type group 
    532        )?            # close type clause ":type" 
    533        \))?          # End of optional name and/or type 
    534 
    535        s             # End of replacement 
    536        """, 
    537        re.VERBOSE, 
    538    ).sub, 
    539): 
    540    """Remove type information from parameter placeholders. 
    541 
    542    For every parameter of the form %(name:type)s, replace with %(name)s and add the 
    543    item name->type to dict that's returned. 
    544 
    545    Returns operation without type information and a dictionary of names and types. 
    546    """ 
    547    parameter_types = None 
    548 
    549    def repl(m): 
    550        nonlocal parameter_types 
    551        prefix, name, type_ = m.groups() 
    552        if len(prefix) % 2: 
    553            # The prefix has an odd number of %s, the last of which 
    554            # escapes the % we're looking for, so we don't want to 
    555            # change anything. 
    556            return m.group(0) 
    557 
    558        try: 
    559            if name: 
    560                if not parameter_types: 
    561                    parameter_types = {} 
    562                if type_: 
    563                    if name in parameter_types: 
    564                        if type_ != parameter_types[name]: 
    565                            raise exceptions.ProgrammingError( 
    566                                f"Conflicting types for {name}: " 
    567                                f"{parameter_types[name]} and {type_}." 
    568                            ) 
    569                    else: 
    570                        parameter_types[name] = type_ 
    571                else: 
    572                    if not isinstance(parameter_types, dict): 
    573                        raise TypeError() 
    574 
    575                return f"{prefix}%({name})s" 
    576            else: 
    577                if parameter_types is None: 
    578                    parameter_types = [] 
    579                parameter_types.append(type_) 
    580                return f"{prefix}%s" 
    581        except (AttributeError, TypeError): 
    582            raise exceptions.ProgrammingError( 
    583                f"{repr(operation)} mixes named and unamed parameters." 
    584            ) 
    585 
    586    return extra_type_sub(repl, operation), parameter_types