1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helpers for interacting with the job REST APIs from the client.
16
17For queries, there are three cases to consider:
18
191. jobs.insert: This always returns a job resource.
202. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
21 This sometimes can return the results inline, but always includes a job ID.
223. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
23 This sometimes doesn't create a job at all, instead returning the results.
24 For better debugging, an auto-generated query ID is included in the
25 response.
26
27Client.query() calls either (1) or (2), depending on what the user provides
28for the api_method parameter. query() always returns a QueryJob object, which
29can retry the query when the query job fails for a retriable reason.
30
31Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
32local results from the response or may wrap a query job containing multiple
33pages of results. Even though query_and_wait() waits for the job to complete,
34we still need a separate job_retry object because there are different
35predicates where it is safe to generate a new query ID.
36"""
37
38import copy
39import functools
40import uuid
41import textwrap
42from typing import Any, Dict, Optional, TYPE_CHECKING, Union
43import warnings
44
45import google.api_core.exceptions as core_exceptions
46from google.api_core import retry as retries
47
48from google.cloud.bigquery import job
49import google.cloud.bigquery.query
50from google.cloud.bigquery import table
51import google.cloud.bigquery.retry
52from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE
53
54# Avoid circular imports
55if TYPE_CHECKING: # pragma: NO COVER
56 from google.cloud.bigquery.client import Client
57
58
59# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to
60# happen before the client-side timeout. This is not strictly necessary, as the
61# client retries client-side timeouts, but the hope by making the server-side
62# timeout slightly shorter is that it can save the server from some unncessary
63# processing time.
64#
65# 250 milliseconds is chosen arbitrarily, though should be about the right
66# order of magnitude for network latency and switching delays. It is about the
67# amount of time for light to circumnavigate the world twice.
68_TIMEOUT_BUFFER_MILLIS = 250
69
70
71def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str:
72 """Construct an ID for a new job.
73
74 Args:
75 job_id: the user-provided job ID.
76 prefix: the user-provided prefix for a job ID.
77
78 Returns:
79 str: A job ID
80 """
81 if job_id is not None:
82 return job_id
83 elif prefix is not None:
84 return str(prefix) + str(uuid.uuid4())
85 else:
86 return str(uuid.uuid4())
87
88
89def job_config_with_defaults(
90 job_config: Optional[job.QueryJobConfig],
91 default_job_config: Optional[job.QueryJobConfig],
92) -> Optional[job.QueryJobConfig]:
93 """Create a copy of `job_config`, replacing unset values with those from
94 `default_job_config`.
95 """
96 if job_config is None:
97 return default_job_config
98
99 if default_job_config is None:
100 return job_config
101
102 # Both job_config and default_job_config are not None, so make a copy of
103 # job_config merged with default_job_config. Anything already explicitly
104 # set on job_config should not be replaced.
105 return job_config._fill_from_default(default_job_config)
106
107
108def query_jobs_insert(
109 client: "Client",
110 query: str,
111 job_config: Optional[job.QueryJobConfig],
112 job_id: Optional[str],
113 job_id_prefix: Optional[str],
114 location: Optional[str],
115 project: str,
116 retry: Optional[retries.Retry],
117 timeout: Optional[float],
118 job_retry: Optional[retries.Retry],
119) -> job.QueryJob:
120 """Initiate a query using jobs.insert.
121
122 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
123 """
124 job_id_given = job_id is not None
125 job_id_save = job_id
126 job_config_save = job_config
127
128 def do_query():
129 # Make a copy now, so that original doesn't get changed by the process
130 # below and to facilitate retry
131 job_config = copy.deepcopy(job_config_save)
132
133 job_id = make_job_id(job_id_save, job_id_prefix)
134 job_ref = job._JobReference(job_id, project=project, location=location)
135 query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config)
136
137 try:
138 query_job._begin(retry=retry, timeout=timeout)
139 except core_exceptions.Conflict as create_exc:
140 # The thought is if someone is providing their own job IDs and they get
141 # their job ID generation wrong, this could end up returning results for
142 # the wrong query. We thus only try to recover if job ID was not given.
143 if job_id_given:
144 raise create_exc
145
146 try:
147 # Sometimes we get a 404 after a Conflict. In this case, we
148 # have pretty high confidence that by retrying the 404, we'll
149 # (hopefully) eventually recover the job.
150 # https://github.com/googleapis/python-bigquery/issues/2134
151 #
152 # Allow users who want to completely disable retries to
153 # continue to do so by setting retry to None.
154 get_job_retry = retry
155 if retry is not None:
156 # TODO(tswast): Amend the user's retry object with allowing
157 # 404 to retry when there's a public way to do so.
158 # https://github.com/googleapis/python-api-core/issues/796
159 get_job_retry = (
160 google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY
161 )
162
163 query_job = client.get_job(
164 job_id,
165 project=project,
166 location=location,
167 retry=get_job_retry,
168 timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
169 )
170 except core_exceptions.GoogleAPIError: # (includes RetryError)
171 raise
172 else:
173 return query_job
174 else:
175 return query_job
176
177 # Allow users who want to completely disable retries to
178 # continue to do so by setting job_retry to None.
179 if job_retry is not None:
180 do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query)
181
182 future = do_query()
183
184 # The future might be in a failed state now, but if it's
185 # unrecoverable, we'll find out when we ask for it's result, at which
186 # point, we may retry.
187 if not job_id_given:
188 future._retry_do_query = do_query # in case we have to retry later
189 future._job_retry = job_retry
190
191 return future
192
193
194def _validate_job_config(request_body: Dict[str, Any], invalid_key: str):
195 """Catch common mistakes, such as passing in a *JobConfig object of the
196 wrong type.
197 """
198 if invalid_key in request_body:
199 raise ValueError(f"got unexpected key {repr(invalid_key)} in job_config")
200
201
202def validate_job_retry(job_id: Optional[str], job_retry: Optional[retries.Retry]):
203 """Catch common mistakes, such as setting a job_id and job_retry at the same
204 time.
205 """
206 if job_id is not None and job_retry is not None:
207 # TODO(tswast): To avoid breaking changes but still allow a default
208 # query job retry, we currently only raise if they explicitly set a
209 # job_retry other than the default. In a future version, we may want to
210 # avoid this check for DEFAULT_JOB_RETRY and always raise.
211 if job_retry is not google.cloud.bigquery.retry.DEFAULT_JOB_RETRY:
212 raise TypeError(
213 textwrap.dedent(
214 """
215 `job_retry` was provided, but the returned job is
216 not retryable, because a custom `job_id` was
217 provided. To customize the job ID and allow for job
218 retries, set job_id_prefix, instead.
219 """
220 ).strip()
221 )
222 else:
223 warnings.warn(
224 textwrap.dedent(
225 """
226 job_retry must be explicitly set to None if job_id is set.
227 BigQuery cannot retry a failed job by using the exact
228 same ID. Setting job_id without explicitly disabling
229 job_retry will raise an error in the future. To avoid this
230 warning, either use job_id_prefix instead (preferred) or
231 set job_retry=None.
232 """
233 ).strip(),
234 category=FutureWarning,
235 # user code -> client.query / client.query_and_wait -> validate_job_retry
236 stacklevel=3,
237 )
238
239
240def _to_query_request(
241 job_config: Optional[job.QueryJobConfig] = None,
242 *,
243 query: str,
244 location: Optional[str] = None,
245 timeout: Optional[float] = None,
246) -> Dict[str, Any]:
247 """Transform from Job resource to QueryRequest resource.
248
249 Most of the keys in job.configuration.query are in common with
250 QueryRequest. If any configuration property is set that is not available in
251 jobs.query, it will result in a server-side error.
252 """
253 request_body = copy.copy(job_config.to_api_repr()) if job_config else {}
254
255 _validate_job_config(request_body, job.CopyJob._JOB_TYPE)
256 _validate_job_config(request_body, job.ExtractJob._JOB_TYPE)
257 _validate_job_config(request_body, job.LoadJob._JOB_TYPE)
258
259 # Move query.* properties to top-level.
260 query_config_resource = request_body.pop("query", {})
261 request_body.update(query_config_resource)
262
263 # Default to standard SQL.
264 request_body.setdefault("useLegacySql", False)
265
266 # Since jobs.query can return results, ensure we use the lossless timestamp
267 # format. See: https://github.com/googleapis/python-bigquery/issues/395
268 request_body.setdefault("formatOptions", {})
269 request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore
270
271 if timeout is not None:
272 # Subtract a buffer for context switching, network latency, etc.
273 request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
274
275 if location is not None:
276 request_body["location"] = location
277
278 request_body["query"] = query
279
280 return request_body
281
282
283def _to_query_job(
284 client: "Client",
285 query: str,
286 request_config: Optional[job.QueryJobConfig],
287 query_response: Dict[str, Any],
288) -> job.QueryJob:
289 job_ref_resource = query_response["jobReference"]
290 job_ref = job._JobReference._from_api_repr(job_ref_resource)
291 query_job = job.QueryJob(job_ref, query, client=client)
292 query_job._properties.setdefault("configuration", {})
293
294 # Not all relevant properties are in the jobs.query response. Populate some
295 # expected properties based on the job configuration.
296 if request_config is not None:
297 query_job._properties["configuration"].update(request_config.to_api_repr())
298
299 query_job._properties["configuration"].setdefault("query", {})
300 query_job._properties["configuration"]["query"]["query"] = query
301 query_job._properties["configuration"]["query"].setdefault("useLegacySql", False)
302
303 query_job._properties.setdefault("statistics", {})
304 query_job._properties["statistics"].setdefault("query", {})
305 query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get(
306 "cacheHit"
307 )
308 query_job._properties["statistics"]["query"]["schema"] = query_response.get(
309 "schema"
310 )
311 query_job._properties["statistics"]["query"][
312 "totalBytesProcessed"
313 ] = query_response.get("totalBytesProcessed")
314
315 # Set errors if any were encountered.
316 query_job._properties.setdefault("status", {})
317 if "errors" in query_response:
318 # Set errors but not errorResult. If there was an error that failed
319 # the job, jobs.query behaves like jobs.getQueryResults and returns a
320 # non-success HTTP status code.
321 errors = query_response["errors"]
322 query_job._properties["status"]["errors"] = errors
323
324 # Avoid an extra call to `getQueryResults` if the query has finished.
325 job_complete = query_response.get("jobComplete")
326 if job_complete:
327 query_job._query_results = google.cloud.bigquery.query._QueryResults(
328 query_response
329 )
330
331 # We want job.result() to refresh the job state, so the conversion is
332 # always "PENDING", even if the job is finished.
333 query_job._properties["status"]["state"] = "PENDING"
334
335 return query_job
336
337
338def _to_query_path(project: str) -> str:
339 return f"/projects/{project}/queries"
340
341
342def query_jobs_query(
343 client: "Client",
344 query: str,
345 job_config: Optional[job.QueryJobConfig],
346 location: Optional[str],
347 project: str,
348 retry: retries.Retry,
349 timeout: Optional[float],
350 job_retry: Optional[retries.Retry],
351) -> job.QueryJob:
352 """Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
353
354 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
355 """
356 path = _to_query_path(project)
357 request_body = _to_query_request(
358 query=query, job_config=job_config, location=location, timeout=timeout
359 )
360
361 def do_query():
362 request_body["requestId"] = make_job_id()
363 span_attributes = {"path": path}
364 api_response = client._call_api(
365 retry,
366 span_name="BigQuery.query",
367 span_attributes=span_attributes,
368 method="POST",
369 path=path,
370 data=request_body,
371 timeout=timeout,
372 )
373 return _to_query_job(client, query, job_config, api_response)
374
375 future = do_query()
376
377 # The future might be in a failed state now, but if it's
378 # unrecoverable, we'll find out when we ask for it's result, at which
379 # point, we may retry.
380 future._retry_do_query = do_query # in case we have to retry later
381 future._job_retry = job_retry
382
383 return future
384
385
386def query_and_wait(
387 client: "Client",
388 query: str,
389 *,
390 job_config: Optional[job.QueryJobConfig],
391 location: Optional[str],
392 project: str,
393 api_timeout: Optional[float] = None,
394 wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
395 retry: Optional[retries.Retry],
396 job_retry: Optional[retries.Retry],
397 page_size: Optional[int] = None,
398 max_results: Optional[int] = None,
399) -> table.RowIterator:
400 """Run the query, wait for it to finish, and return the results.
401
402
403 Args:
404 client:
405 BigQuery client to make API calls.
406 query (str):
407 SQL query to be executed. Defaults to the standard SQL
408 dialect. Use the ``job_config`` parameter to change dialects.
409 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
410 Extra configuration options for the job.
411 To override any options that were previously set in
412 the ``default_query_job_config`` given to the
413 ``Client`` constructor, manually set those options to ``None``,
414 or whatever value is preferred.
415 location (Optional[str]):
416 Location where to run the job. Must match the location of the
417 table used in the query as well as the destination table.
418 project (Optional[str]):
419 Project ID of the project of where to run the job. Defaults
420 to the client's project.
421 api_timeout (Optional[float]):
422 The number of seconds to wait for the underlying HTTP transport
423 before using ``retry``.
424 wait_timeout (Optional[Union[float, object]]):
425 The number of seconds to wait for the query to finish. If the
426 query doesn't finish before this timeout, the client attempts
427 to cancel the query. If unset, the underlying Client.get_job() API
428 call has timeout, but we still wait indefinitely for the job to
429 finish.
430 retry (Optional[google.api_core.retry.Retry]):
431 How to retry the RPC. This only applies to making RPC
432 calls. It isn't used to retry failed jobs. This has
433 a reasonable default that should only be overridden
434 with care.
435 job_retry (Optional[google.api_core.retry.Retry]):
436 How to retry failed jobs. The default retries
437 rate-limit-exceeded errors. Passing ``None`` disables
438 job retry. Not all jobs can be retried.
439 page_size (Optional[int]):
440 The maximum number of rows in each page of results from this
441 request. Non-positive values are ignored.
442 max_results (Optional[int]):
443 The maximum total number of rows from this request.
444
445 Returns:
446 google.cloud.bigquery.table.RowIterator:
447 Iterator of row data
448 :class:`~google.cloud.bigquery.table.Row`-s. During each
449 page, the iterator will have the ``total_rows`` attribute
450 set, which counts the total number of rows **in the result
451 set** (this is distinct from the total number of rows in the
452 current page: ``iterator.page.num_items``).
453
454 If the query is a special query that produces no results, e.g.
455 a DDL query, an ``_EmptyRowIterator`` instance is returned.
456
457 Raises:
458 TypeError:
459 If ``job_config`` is not an instance of
460 :class:`~google.cloud.bigquery.job.QueryJobConfig`
461 class.
462 """
463 request_body = _to_query_request(
464 query=query, job_config=job_config, location=location, timeout=api_timeout
465 )
466
467 # Some API parameters aren't supported by the jobs.query API. In these
468 # cases, fallback to a jobs.insert call.
469 if not _supported_by_jobs_query(request_body):
470 return _wait_or_cancel(
471 query_jobs_insert(
472 client=client,
473 query=query,
474 job_id=None,
475 job_id_prefix=None,
476 job_config=job_config,
477 location=location,
478 project=project,
479 retry=retry,
480 timeout=api_timeout,
481 job_retry=job_retry,
482 ),
483 api_timeout=api_timeout,
484 wait_timeout=wait_timeout,
485 retry=retry,
486 page_size=page_size,
487 max_results=max_results,
488 )
489
490 path = _to_query_path(project)
491
492 if page_size is not None and max_results is not None:
493 request_body["maxResults"] = min(page_size, max_results)
494 elif page_size is not None or max_results is not None:
495 request_body["maxResults"] = page_size or max_results
496 if client.default_job_creation_mode:
497 request_body["jobCreationMode"] = client.default_job_creation_mode
498
499 def do_query():
500 request_body["requestId"] = make_job_id()
501 span_attributes = {"path": path}
502
503 # For easier testing, handle the retries ourselves.
504 if retry is not None:
505 response = retry(client._call_api)(
506 retry=None, # We're calling the retry decorator ourselves.
507 span_name="BigQuery.query",
508 span_attributes=span_attributes,
509 method="POST",
510 path=path,
511 data=request_body,
512 timeout=api_timeout,
513 )
514 else:
515 response = client._call_api(
516 retry=None,
517 span_name="BigQuery.query",
518 span_attributes=span_attributes,
519 method="POST",
520 path=path,
521 data=request_body,
522 timeout=api_timeout,
523 )
524
525 # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
526 # to fetch, there will be a job ID for jobs.getQueryResults.
527 query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
528 response
529 )
530 page_token = query_results.page_token
531 more_pages = page_token is not None
532
533 if more_pages or not query_results.complete:
534 # TODO(swast): Avoid a call to jobs.get in some cases (few
535 # remaining pages) by waiting for the query to finish and calling
536 # client._list_rows_from_query_results directly. Need to update
537 # RowIterator to fetch destination table via the job ID if needed.
538 return _wait_or_cancel(
539 _to_query_job(client, query, job_config, response),
540 api_timeout=api_timeout,
541 wait_timeout=wait_timeout,
542 retry=retry,
543 page_size=page_size,
544 max_results=max_results,
545 )
546
547 return table.RowIterator(
548 client=client,
549 api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
550 path=None,
551 schema=query_results.schema,
552 max_results=max_results,
553 page_size=page_size,
554 total_rows=query_results.total_rows,
555 first_page_response=response,
556 location=query_results.location,
557 job_id=query_results.job_id,
558 query_id=query_results.query_id,
559 project=query_results.project,
560 num_dml_affected_rows=query_results.num_dml_affected_rows,
561 query=query,
562 total_bytes_processed=query_results.total_bytes_processed,
563 )
564
565 if job_retry is not None:
566 return job_retry(do_query)()
567 else:
568 return do_query()
569
570
571def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
572 """True if jobs.query can be used. False if jobs.insert is needed."""
573 request_keys = frozenset(request_body.keys())
574
575 # Per issue: https://github.com/googleapis/python-bigquery/issues/1867
576 # use an allowlist here instead of a denylist because the backend API allows
577 # unsupported parameters without any warning or failure. Instead, keep this
578 # set in sync with those in QueryRequest:
579 # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest
580 keys_allowlist = {
581 "kind",
582 "query",
583 "maxResults",
584 "defaultDataset",
585 "timeoutMs",
586 "dryRun",
587 "preserveNulls",
588 "useQueryCache",
589 "useLegacySql",
590 "parameterMode",
591 "queryParameters",
592 "location",
593 "formatOptions",
594 "connectionProperties",
595 "labels",
596 "maximumBytesBilled",
597 "requestId",
598 "createSession",
599 "writeIncrementalResults",
600 }
601
602 unsupported_keys = request_keys - keys_allowlist
603 return len(unsupported_keys) == 0
604
605
606def _wait_or_cancel(
607 job: job.QueryJob,
608 api_timeout: Optional[float],
609 wait_timeout: Optional[Union[object, float]],
610 retry: Optional[retries.Retry],
611 page_size: Optional[int],
612 max_results: Optional[int],
613) -> table.RowIterator:
614 """Wait for a job to complete and return the results.
615
616 If we can't return the results within the ``wait_timeout``, try to cancel
617 the job.
618 """
619 try:
620 return job.result(
621 page_size=page_size,
622 max_results=max_results,
623 retry=retry,
624 timeout=wait_timeout,
625 )
626 except Exception:
627 # Attempt to cancel the job since we can't return the results.
628 try:
629 job.cancel(retry=retry, timeout=api_timeout)
630 except Exception:
631 # Don't eat the original exception if cancel fails.
632 pass
633 raise