Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/_job_helpers.py: 14%
92 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Helpers for interacting with the job REST APIs from the client."""
17import copy
18import uuid
19from typing import Any, Dict, TYPE_CHECKING, Optional
21import google.api_core.exceptions as core_exceptions
22from google.api_core import retry as retries
24from google.cloud.bigquery import job
26# Avoid circular imports
27if TYPE_CHECKING: # pragma: NO COVER
28 from google.cloud.bigquery.client import Client
31# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to
32# happen before the client-side timeout. This is not strictly neccessary, as the
33# client retries client-side timeouts, but the hope by making the server-side
34# timeout slightly shorter is that it can save the server from some unncessary
35# processing time.
36#
37# 250 milliseconds is chosen arbitrarily, though should be about the right
38# order of magnitude for network latency and switching delays. It is about the
39# amount of time for light to circumnavigate the world twice.
40_TIMEOUT_BUFFER_MILLIS = 250
43def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str:
44 """Construct an ID for a new job.
46 Args:
47 job_id: the user-provided job ID.
48 prefix: the user-provided prefix for a job ID.
50 Returns:
51 str: A job ID
52 """
53 if job_id is not None:
54 return job_id
55 elif prefix is not None:
56 return str(prefix) + str(uuid.uuid4())
57 else:
58 return str(uuid.uuid4())
61def query_jobs_insert(
62 client: "Client",
63 query: str,
64 job_config: Optional[job.QueryJobConfig],
65 job_id: Optional[str],
66 job_id_prefix: Optional[str],
67 location: str,
68 project: str,
69 retry: retries.Retry,
70 timeout: Optional[float],
71 job_retry: retries.Retry,
72) -> job.QueryJob:
73 """Initiate a query using jobs.insert.
75 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
76 """
77 job_id_given = job_id is not None
78 job_id_save = job_id
79 job_config_save = job_config
81 def do_query():
82 # Make a copy now, so that original doesn't get changed by the process
83 # below and to facilitate retry
84 job_config = copy.deepcopy(job_config_save)
86 job_id = make_job_id(job_id_save, job_id_prefix)
87 job_ref = job._JobReference(job_id, project=project, location=location)
88 query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config)
90 try:
91 query_job._begin(retry=retry, timeout=timeout)
92 except core_exceptions.Conflict as create_exc:
93 # The thought is if someone is providing their own job IDs and they get
94 # their job ID generation wrong, this could end up returning results for
95 # the wrong query. We thus only try to recover if job ID was not given.
96 if job_id_given:
97 raise create_exc
99 try:
100 query_job = client.get_job(
101 job_id,
102 project=project,
103 location=location,
104 retry=retry,
105 timeout=timeout,
106 )
107 except core_exceptions.GoogleAPIError: # (includes RetryError)
108 raise create_exc
109 else:
110 return query_job
111 else:
112 return query_job
114 future = do_query()
115 # The future might be in a failed state now, but if it's
116 # unrecoverable, we'll find out when we ask for it's result, at which
117 # point, we may retry.
118 if not job_id_given:
119 future._retry_do_query = do_query # in case we have to retry later
120 future._job_retry = job_retry
122 return future
125def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]:
126 """Transform from Job resource to QueryRequest resource.
128 Most of the keys in job.configuration.query are in common with
129 QueryRequest. If any configuration property is set that is not available in
130 jobs.query, it will result in a server-side error.
131 """
132 request_body = {}
133 job_config_resource = job_config.to_api_repr() if job_config else {}
134 query_config_resource = job_config_resource.get("query", {})
136 request_body.update(query_config_resource)
138 # These keys are top level in job resource and query resource.
139 if "labels" in job_config_resource:
140 request_body["labels"] = job_config_resource["labels"]
141 if "dryRun" in job_config_resource:
142 request_body["dryRun"] = job_config_resource["dryRun"]
144 # Default to standard SQL.
145 request_body.setdefault("useLegacySql", False)
147 # Since jobs.query can return results, ensure we use the lossless timestamp
148 # format. See: https://github.com/googleapis/python-bigquery/issues/395
149 request_body.setdefault("formatOptions", {})
150 request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore
152 return request_body
155def _to_query_job(
156 client: "Client",
157 query: str,
158 request_config: Optional[job.QueryJobConfig],
159 query_response: Dict[str, Any],
160) -> job.QueryJob:
161 job_ref_resource = query_response["jobReference"]
162 job_ref = job._JobReference._from_api_repr(job_ref_resource)
163 query_job = job.QueryJob(job_ref, query, client=client)
164 query_job._properties.setdefault("configuration", {})
166 # Not all relevant properties are in the jobs.query response. Populate some
167 # expected properties based on the job configuration.
168 if request_config is not None:
169 query_job._properties["configuration"].update(request_config.to_api_repr())
171 query_job._properties["configuration"].setdefault("query", {})
172 query_job._properties["configuration"]["query"]["query"] = query
173 query_job._properties["configuration"]["query"].setdefault("useLegacySql", False)
175 query_job._properties.setdefault("statistics", {})
176 query_job._properties["statistics"].setdefault("query", {})
177 query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get(
178 "cacheHit"
179 )
180 query_job._properties["statistics"]["query"]["schema"] = query_response.get(
181 "schema"
182 )
183 query_job._properties["statistics"]["query"][
184 "totalBytesProcessed"
185 ] = query_response.get("totalBytesProcessed")
187 # Set errors if any were encountered.
188 query_job._properties.setdefault("status", {})
189 if "errors" in query_response:
190 # Set errors but not errorResult. If there was an error that failed
191 # the job, jobs.query behaves like jobs.getQueryResults and returns a
192 # non-success HTTP status code.
193 errors = query_response["errors"]
194 query_job._properties["status"]["errors"] = errors
196 # Transform job state so that QueryJob doesn't try to restart the query.
197 job_complete = query_response.get("jobComplete")
198 if job_complete:
199 query_job._properties["status"]["state"] = "DONE"
200 # TODO: https://github.com/googleapis/python-bigquery/issues/589
201 # Set the first page of results if job is "complete" and there is
202 # only 1 page of results. Otherwise, use the existing logic that
203 # refreshes the job stats.
204 #
205 # This also requires updates to `to_dataframe` and the DB API connector
206 # so that they don't try to read from a destination table if all the
207 # results are present.
208 else:
209 query_job._properties["status"]["state"] = "PENDING"
211 return query_job
214def query_jobs_query(
215 client: "Client",
216 query: str,
217 job_config: Optional[job.QueryJobConfig],
218 location: str,
219 project: str,
220 retry: retries.Retry,
221 timeout: Optional[float],
222 job_retry: retries.Retry,
223) -> job.QueryJob:
224 """Initiate a query using jobs.query.
226 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
227 """
228 path = f"/projects/{project}/queries"
229 request_body = _to_query_request(job_config)
231 if timeout is not None:
232 # Subtract a buffer for context switching, network latency, etc.
233 request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
234 request_body["location"] = location
235 request_body["query"] = query
237 def do_query():
238 request_body["requestId"] = make_job_id()
239 span_attributes = {"path": path}
240 api_response = client._call_api(
241 retry,
242 span_name="BigQuery.query",
243 span_attributes=span_attributes,
244 method="POST",
245 path=path,
246 data=request_body,
247 timeout=timeout,
248 )
249 return _to_query_job(client, query, job_config, api_response)
251 future = do_query()
253 # The future might be in a failed state now, but if it's
254 # unrecoverable, we'll find out when we ask for it's result, at which
255 # point, we may retry.
256 future._retry_do_query = do_query # in case we have to retry later
257 future._job_retry = job_retry
259 return future