1# -*- coding: utf-8 -*-
2#
3# Copyright 2020 Google LLC
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# https://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Parent client for calling the Cloud BigQuery Storage API.
18
19This is the base from which all interactions with the API occur.
20"""
21
22from __future__ import absolute_import
23
24from google.api_core import gapic_v1
25import google.api_core.gapic_v1.method
26
27from google.cloud.bigquery_storage_v1 import gapic_version as package_version
28from google.cloud.bigquery_storage_v1 import reader
29from google.cloud.bigquery_storage_v1.services import big_query_read, big_query_write
30
31_SCOPES = (
32 "https://www.googleapis.com/auth/bigquery",
33 "https://www.googleapis.com/auth/cloud-platform",
34)
35
36VENEER_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
37 client_library_version=package_version.__version__
38)
39
40
41class BigQueryReadClient(big_query_read.BigQueryReadClient):
42 """Client for interacting with BigQuery Storage API.
43
44 The BigQuery storage API can be used to read data stored in BigQuery.
45 """
46
47 def __init__(self, **kwargs):
48 if "client_info" not in kwargs:
49 kwargs["client_info"] = VENEER_CLIENT_INFO
50 super().__init__(**kwargs)
51
52 def read_rows(
53 self,
54 name,
55 offset=0,
56 retry=google.api_core.gapic_v1.method.DEFAULT,
57 timeout=google.api_core.gapic_v1.method.DEFAULT,
58 metadata=(),
59 retry_delay_callback=None,
60 ):
61 """
62 Reads rows from the table in the format prescribed by the read
63 session. Each response contains one or more table rows, up to a
64 maximum of 10 MiB per response; read requests which attempt to read
65 individual rows larger than this will fail.
66
67 Each request also returns a set of stream statistics reflecting the
68 estimated total number of rows in the read stream. This number is
69 computed based on the total table size and the number of active
70 streams in the read session, and may change as other streams continue
71 to read data.
72
73 Example:
74 >>> from google.cloud import bigquery_storage
75 >>>
76 >>> client = bigquery_storage.BigQueryReadClient()
77 >>>
78 >>> # TODO: Initialize ``table``:
79 >>> table = "projects/{}/datasets/{}/tables/{}".format(
80 ... 'project_id': 'your-data-project-id',
81 ... 'dataset_id': 'your_dataset_id',
82 ... 'table_id': 'your_table_id',
83 ... )
84 >>>
85 >>> # TODO: Initialize `parent`:
86 >>> parent = 'projects/your-billing-project-id'
87 >>>
88 >>> requested_session = bigquery_storage.types.ReadSession(
89 ... table=table,
90 ... data_format=bigquery_storage.types.DataFormat.AVRO,
91 ... )
92 >>> session = client.create_read_session(
93 ... parent=parent, read_session=requested_session
94 ... )
95 >>>
96 >>> stream = session.streams[0], # TODO: Also read any other streams.
97 >>> read_rows_stream = client.read_rows(stream.name)
98 >>>
99 >>> for element in read_rows_stream.rows(session):
100 ... # process element
101 ... pass
102
103 Args:
104 name (str):
105 Required. Name of the stream to start
106 reading from, of the form
107 `projects/{project_id}/locations/{location}/sessions/{session_id}/streams/{stream_id}`
108 offset (Optional[int]):
109 The starting offset from which to begin reading rows from
110 in the stream. The offset requested must be less than the last
111 row read from ReadRows. Requesting a larger offset is
112 undefined.
113 retry (Optional[google.api_core.retry.Retry]): A retry object used
114 to retry requests. If ``None`` is specified, requests will not
115 be retried.
116 timeout (Optional[float]): The amount of time, in seconds, to wait
117 for the request to complete. Note that if ``retry`` is
118 specified, the timeout applies to each individual attempt.
119 metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
120 that is provided to the method.
121 retry_delay_callback (Optional[Callable[[float], None]]):
122 If the client receives a retryable error that asks the client to
123 delay its next attempt and retry_delay_callback is not None,
124 BigQueryReadClient will call retry_delay_callback with the delay
125 duration (in seconds) before it starts sleeping until the next
126 attempt.
127
128 Returns:
129 ~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
130 An iterable of
131 :class:`~google.cloud.bigquery_storage_v1.types.ReadRowsResponse`.
132
133 Raises:
134 google.api_core.exceptions.GoogleAPICallError: If the request
135 failed for any reason.
136 google.api_core.exceptions.RetryError: If the request failed due
137 to a retryable error and retry attempts failed.
138 ValueError: If the parameters are invalid.
139 """
140 gapic_client = super(BigQueryReadClient, self)
141 stream = reader.ReadRowsStream(
142 gapic_client,
143 name,
144 offset,
145 {"retry": retry, "timeout": timeout, "metadata": metadata},
146 retry_delay_callback=retry_delay_callback,
147 )
148 stream._reconnect()
149 return stream
150
151
152class BigQueryWriteClient(big_query_write.BigQueryWriteClient):
153 __doc__ = big_query_write.BigQueryWriteClient.__doc__
154
155 def __init__(self, **kwargs):
156 if "client_info" not in kwargs:
157 kwargs["client_info"] = VENEER_CLIENT_INFO
158 super().__init__(**kwargs)