1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Base classes and helpers for job classes."""
16
17from collections import namedtuple
18import copy
19import http
20import threading
21import typing
22from typing import ClassVar, Dict, Optional, Sequence
23
24from google.api_core import retry as retries
25from google.api_core import exceptions
26import google.api_core.future.polling
27
28from google.cloud.bigquery import _helpers
29from google.cloud.bigquery._helpers import _int_or_none
30from google.cloud.bigquery.retry import (
31 DEFAULT_GET_JOB_TIMEOUT,
32 DEFAULT_RETRY,
33)
34
35
36_DONE_STATE = "DONE"
37_STOPPED_REASON = "stopped"
38_ERROR_REASON_TO_EXCEPTION = {
39 "accessDenied": http.client.FORBIDDEN,
40 "backendError": http.client.INTERNAL_SERVER_ERROR,
41 "billingNotEnabled": http.client.FORBIDDEN,
42 "billingTierLimitExceeded": http.client.BAD_REQUEST,
43 "blocked": http.client.FORBIDDEN,
44 "duplicate": http.client.CONFLICT,
45 "internalError": http.client.INTERNAL_SERVER_ERROR,
46 "invalid": http.client.BAD_REQUEST,
47 "invalidQuery": http.client.BAD_REQUEST,
48 "notFound": http.client.NOT_FOUND,
49 "notImplemented": http.client.NOT_IMPLEMENTED,
50 "policyViolation": http.client.FORBIDDEN,
51 "quotaExceeded": http.client.FORBIDDEN,
52 "rateLimitExceeded": http.client.TOO_MANY_REQUESTS,
53 "resourceInUse": http.client.BAD_REQUEST,
54 "resourcesExceeded": http.client.BAD_REQUEST,
55 "responseTooLarge": http.client.FORBIDDEN,
56 "stopped": http.client.OK,
57 "tableUnavailable": http.client.BAD_REQUEST,
58}
59
60
61def _error_result_to_exception(error_result, errors=None):
62 """Maps BigQuery error reasons to an exception.
63
64 The reasons and their matching HTTP status codes are documented on
65 the `troubleshooting errors`_ page.
66
67 .. _troubleshooting errors: https://cloud.google.com/bigquery\
68 /troubleshooting-errors
69
70 Args:
71 error_result (Mapping[str, str]): The error result from BigQuery.
72 errors (Union[Iterable[str], None]): The detailed error messages.
73
74 Returns:
75 google.cloud.exceptions.GoogleAPICallError: The mapped exception.
76 """
77 reason = error_result.get("reason")
78 status_code = _ERROR_REASON_TO_EXCEPTION.get(
79 reason, http.client.INTERNAL_SERVER_ERROR
80 )
81 # Manually create error message to preserve both error_result and errors.
82 # Can be removed once b/310544564 and b/318889899 are resolved.
83 concatenated_errors = ""
84 if errors:
85 concatenated_errors = "; "
86 for err in errors:
87 concatenated_errors += ", ".join(
88 [f"{key}: {value}" for key, value in err.items()]
89 )
90 concatenated_errors += "; "
91
92 # strips off the last unneeded semicolon and space
93 concatenated_errors = concatenated_errors[:-2]
94
95 error_message = error_result.get("message", "") + concatenated_errors
96
97 return exceptions.from_http_status(
98 status_code, error_message, errors=[error_result]
99 )
100
101
102ReservationUsage = namedtuple("ReservationUsage", "name slot_ms")
103ReservationUsage.__doc__ = "Job resource usage for a reservation."
104ReservationUsage.name.__doc__ = (
105 'Reservation name or "unreserved" for on-demand resources usage.'
106)
107ReservationUsage.slot_ms.__doc__ = (
108 "Total slot milliseconds used by the reservation for a particular job."
109)
110
111
112class TransactionInfo(typing.NamedTuple):
113 """[Alpha] Information of a multi-statement transaction.
114
115 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo
116
117 .. versionadded:: 2.24.0
118 """
119
120 transaction_id: str
121 """Output only. ID of the transaction."""
122
123 @classmethod
124 def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo":
125 return cls(transaction_info["transactionId"])
126
127
128class _JobReference(object):
129 """A reference to a job.
130
131 Args:
132 job_id (str): ID of the job to run.
133 project (str): ID of the project where the job runs.
134 location (str): Location of where the job runs.
135 """
136
137 def __init__(self, job_id, project, location):
138 self._properties = {"jobId": job_id, "projectId": project}
139 # The location field must not be populated if it is None.
140 if location:
141 self._properties["location"] = location
142
143 @property
144 def job_id(self):
145 """str: ID of the job."""
146 return self._properties.get("jobId")
147
148 @property
149 def project(self):
150 """str: ID of the project where the job runs."""
151 return self._properties.get("projectId")
152
153 @property
154 def location(self):
155 """str: Location where the job runs."""
156 return self._properties.get("location")
157
158 def _to_api_repr(self):
159 """Returns the API resource representation of the job reference."""
160 return copy.deepcopy(self._properties)
161
162 @classmethod
163 def _from_api_repr(cls, resource):
164 """Returns a job reference for an API resource representation."""
165 job_id = resource.get("jobId")
166 project = resource.get("projectId")
167 location = resource.get("location")
168 job_ref = cls(job_id, project, location)
169 return job_ref
170
171
172class _JobConfig(object):
173 """Abstract base class for job configuration objects.
174
175 Args:
176 job_type (str): The key to use for the job configuration.
177 """
178
179 def __init__(self, job_type, **kwargs):
180 self._job_type = job_type
181 self._properties = {job_type: {}}
182 for prop, val in kwargs.items():
183 setattr(self, prop, val)
184
185 def __setattr__(self, name, value):
186 """Override to be able to raise error if an unknown property is being set"""
187 if not name.startswith("_") and not hasattr(type(self), name):
188 raise AttributeError(
189 "Property {} is unknown for {}.".format(name, type(self))
190 )
191 super(_JobConfig, self).__setattr__(name, value)
192
193 @property
194 def job_timeout_ms(self):
195 """Optional parameter. Job timeout in milliseconds. If this time limit is exceeded, BigQuery might attempt to stop the job.
196 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms
197 e.g.
198
199 job_config = bigquery.QueryJobConfig( job_timeout_ms = 5000 )
200 or
201 job_config.job_timeout_ms = 5000
202
203 Raises:
204 ValueError: If ``value`` type is invalid.
205 """
206
207 # None as this is an optional parameter.
208 if self._properties.get("jobTimeoutMs"):
209 return self._properties["jobTimeoutMs"]
210 return None
211
212 @job_timeout_ms.setter
213 def job_timeout_ms(self, value):
214 try:
215 value = _int_or_none(value)
216 except ValueError as err:
217 raise ValueError("Pass an int for jobTimeoutMs, e.g. 5000").with_traceback(
218 err.__traceback__
219 )
220
221 if value is not None:
222 # docs indicate a string is expected by the API
223 self._properties["jobTimeoutMs"] = str(value)
224 else:
225 self._properties.pop("jobTimeoutMs", None)
226
227 @property
228 def max_slots(self) -> Optional[int]:
229 """The maximum rate of slot consumption to allow for this job.
230
231 If set, the number of slots used to execute the job will be throttled
232 to try and keep its slot consumption below the requested rate.
233 This feature is not generally available.
234 """
235
236 max_slots = self._properties.get("maxSlots")
237 if max_slots is not None:
238 if isinstance(max_slots, str):
239 return int(max_slots)
240 if isinstance(max_slots, int):
241 return max_slots
242 return None
243
244 @max_slots.setter
245 def max_slots(self, value):
246 try:
247 value = _int_or_none(value)
248 except ValueError as err:
249 raise ValueError("Pass an int for max slots, e.g. 100").with_traceback(
250 err.__traceback__
251 )
252
253 if value is not None:
254 self._properties["maxSlots"] = str(value)
255 else:
256 self._properties.pop("maxSlots", None)
257
258 @property
259 def reservation(self):
260 """str: Optional. The reservation that job would use.
261
262 User can specify a reservation to execute the job. If reservation is
263 not set, reservation is determined based on the rules defined by the
264 reservation assignments. The expected format is
265 projects/{project}/locations/{location}/reservations/{reservation}.
266
267 Raises:
268 ValueError: If ``value`` type is not None or of string type.
269 """
270 return self._properties.setdefault("reservation", None)
271
272 @reservation.setter
273 def reservation(self, value):
274 if value and not isinstance(value, str):
275 raise ValueError("Reservation must be None or a string.")
276 self._properties["reservation"] = value
277
278 @property
279 def labels(self):
280 """Dict[str, str]: Labels for the job.
281
282 This method always returns a dict. Once a job has been created on the
283 server, its labels cannot be modified anymore.
284
285 Raises:
286 ValueError: If ``value`` type is invalid.
287 """
288 return self._properties.setdefault("labels", {})
289
290 @labels.setter
291 def labels(self, value):
292 if not isinstance(value, dict):
293 raise ValueError("Pass a dict")
294 self._properties["labels"] = value
295
296 def _get_sub_prop(self, key, default=None):
297 """Get a value in the ``self._properties[self._job_type]`` dictionary.
298
299 Most job properties are inside the dictionary related to the job type
300 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access
301 those properties::
302
303 self._get_sub_prop('destinationTable')
304
305 This is equivalent to using the ``_helpers._get_sub_prop`` function::
306
307 _helpers._get_sub_prop(
308 self._properties, ['query', 'destinationTable'])
309
310 Args:
311 key (str):
312 Key for the value to get in the
313 ``self._properties[self._job_type]`` dictionary.
314 default (Optional[object]):
315 Default value to return if the key is not found.
316 Defaults to :data:`None`.
317
318 Returns:
319 object: The value if present or the default.
320 """
321 return _helpers._get_sub_prop(
322 self._properties, [self._job_type, key], default=default
323 )
324
325 def _set_sub_prop(self, key, value):
326 """Set a value in the ``self._properties[self._job_type]`` dictionary.
327
328 Most job properties are inside the dictionary related to the job type
329 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set
330 those properties::
331
332 self._set_sub_prop('useLegacySql', False)
333
334 This is equivalent to using the ``_helper._set_sub_prop`` function::
335
336 _helper._set_sub_prop(
337 self._properties, ['query', 'useLegacySql'], False)
338
339 Args:
340 key (str):
341 Key to set in the ``self._properties[self._job_type]``
342 dictionary.
343 value (object): Value to set.
344 """
345 _helpers._set_sub_prop(self._properties, [self._job_type, key], value)
346
347 def _del_sub_prop(self, key):
348 """Remove ``key`` from the ``self._properties[self._job_type]`` dict.
349
350 Most job properties are inside the dictionary related to the job type
351 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear
352 those properties::
353
354 self._del_sub_prop('useLegacySql')
355
356 This is equivalent to using the ``_helper._del_sub_prop`` function::
357
358 _helper._del_sub_prop(
359 self._properties, ['query', 'useLegacySql'])
360
361 Args:
362 key (str):
363 Key to remove in the ``self._properties[self._job_type]``
364 dictionary.
365 """
366 _helpers._del_sub_prop(self._properties, [self._job_type, key])
367
368 def to_api_repr(self) -> dict:
369 """Build an API representation of the job config.
370
371 Returns:
372 Dict: A dictionary in the format used by the BigQuery API.
373 """
374 return copy.deepcopy(self._properties)
375
376 def _fill_from_default(self, default_job_config=None):
377 """Merge this job config with a default job config.
378
379 The keys in this object take precedence over the keys in the default
380 config. The merge is done at the top-level as well as for keys one
381 level below the job type.
382
383 Args:
384 default_job_config (google.cloud.bigquery.job._JobConfig):
385 The default job config that will be used to fill in self.
386
387 Returns:
388 google.cloud.bigquery.job._JobConfig: A new (merged) job config.
389 """
390 if not default_job_config:
391 new_job_config = copy.deepcopy(self)
392 return new_job_config
393
394 if self._job_type != default_job_config._job_type:
395 raise TypeError(
396 "attempted to merge two incompatible job types: "
397 + repr(self._job_type)
398 + ", "
399 + repr(default_job_config._job_type)
400 )
401
402 # cls is one of the job config subclasses that provides the job_type argument to
403 # this base class on instantiation, thus missing-parameter warning is a false
404 # positive here.
405 new_job_config = self.__class__() # pytype: disable=missing-parameter
406
407 default_job_properties = copy.deepcopy(default_job_config._properties)
408 for key in self._properties:
409 if key != self._job_type:
410 default_job_properties[key] = self._properties[key]
411
412 default_job_properties[self._job_type].update(self._properties[self._job_type])
413 new_job_config._properties = default_job_properties
414
415 return new_job_config
416
417 @classmethod
418 def from_api_repr(cls, resource: dict) -> "_JobConfig":
419 """Factory: construct a job configuration given its API representation
420
421 Args:
422 resource (Dict):
423 A job configuration in the same representation as is returned
424 from the API.
425
426 Returns:
427 google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``.
428 """
429 # cls is one of the job config subclasses that provides the job_type argument to
430 # this base class on instantiation, thus missing-parameter warning is a false
431 # positive here.
432 job_config = cls() # type: ignore # pytype: disable=missing-parameter
433 job_config._properties = resource
434 return job_config
435
436
437class _AsyncJob(google.api_core.future.polling.PollingFuture):
438 """Base class for asynchronous jobs.
439
440 Args:
441 job_id (Union[str, _JobReference]):
442 Job's ID in the project associated with the client or a
443 fully-qualified job reference.
444 client (google.cloud.bigquery.client.Client):
445 Client which holds credentials and project configuration.
446 """
447
448 _JOB_TYPE = "unknown"
449 _CONFIG_CLASS: ClassVar
450
451 def __init__(self, job_id, client):
452 super(_AsyncJob, self).__init__()
453
454 # The job reference can be either a plain job ID or the full resource.
455 # Populate the properties dictionary consistently depending on what has
456 # been passed in.
457 job_ref = job_id
458 if not isinstance(job_id, _JobReference):
459 job_ref = _JobReference(job_id, client.project, None)
460 self._properties = {"jobReference": job_ref._to_api_repr()}
461
462 self._client = client
463 self._result_set = False
464 self._completion_lock = threading.Lock()
465
466 @property
467 def configuration(self) -> _JobConfig:
468 """Job-type specific configurtion."""
469 configuration: _JobConfig = self._CONFIG_CLASS() # pytype: disable=not-callable
470 configuration._properties = self._properties.setdefault("configuration", {})
471 return configuration
472
473 @property
474 def job_id(self):
475 """str: ID of the job."""
476 return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"])
477
478 @property
479 def parent_job_id(self):
480 """Return the ID of the parent job.
481
482 See:
483 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id
484
485 Returns:
486 Optional[str]: parent job id.
487 """
488 return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"])
489
490 @property
491 def script_statistics(self) -> Optional["ScriptStatistics"]:
492 """Statistics for a child job of a script."""
493 resource = _helpers._get_sub_prop(
494 self._properties, ["statistics", "scriptStatistics"]
495 )
496 if resource is None:
497 return None
498 return ScriptStatistics(resource)
499
500 @property
501 def session_info(self) -> Optional["SessionInfo"]:
502 """[Preview] Information of the session if this job is part of one.
503
504 .. versionadded:: 2.29.0
505 """
506 resource = _helpers._get_sub_prop(
507 self._properties, ["statistics", "sessionInfo"]
508 )
509 if resource is None:
510 return None
511 return SessionInfo(resource)
512
513 @property
514 def num_child_jobs(self):
515 """The number of child jobs executed.
516
517 See:
518 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs
519
520 Returns:
521 int
522 """
523 count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"])
524 return int(count) if count is not None else 0
525
526 @property
527 def project(self):
528 """Project bound to the job.
529
530 Returns:
531 str: the project (derived from the client).
532 """
533 return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"])
534
535 @property
536 def location(self):
537 """str: Location where the job runs."""
538 return _helpers._get_sub_prop(self._properties, ["jobReference", "location"])
539
540 @property
541 def reservation_id(self):
542 """str: Name of the primary reservation assigned to this job.
543
544 Note that this could be different than reservations reported in
545 the reservation field if parent reservations were used to execute
546 this job.
547 """
548 return _helpers._get_sub_prop(
549 self._properties, ["statistics", "reservation_id"]
550 )
551
552 def _require_client(self, client):
553 """Check client or verify over-ride.
554
555 Args:
556 client (Optional[google.cloud.bigquery.client.Client]):
557 the client to use. If not passed, falls back to the
558 ``client`` stored on the current dataset.
559
560 Returns:
561 google.cloud.bigquery.client.Client:
562 The client passed in or the currently bound client.
563 """
564 if client is None:
565 client = self._client
566 return client
567
568 @property
569 def job_type(self):
570 """Type of job.
571
572 Returns:
573 str: one of 'load', 'copy', 'extract', 'query'.
574 """
575 return self._JOB_TYPE
576
577 @property
578 def path(self):
579 """URL path for the job's APIs.
580
581 Returns:
582 str: the path based on project and job ID.
583 """
584 return "/projects/%s/jobs/%s" % (self.project, self.job_id)
585
586 @property
587 def labels(self):
588 """Dict[str, str]: Labels for the job."""
589 return self._properties.setdefault("configuration", {}).setdefault("labels", {})
590
591 @property
592 def etag(self):
593 """ETag for the job resource.
594
595 Returns:
596 Optional[str]: the ETag (None until set from the server).
597 """
598 return self._properties.get("etag")
599
600 @property
601 def self_link(self):
602 """URL for the job resource.
603
604 Returns:
605 Optional[str]: the URL (None until set from the server).
606 """
607 return self._properties.get("selfLink")
608
609 @property
610 def user_email(self):
611 """E-mail address of user who submitted the job.
612
613 Returns:
614 Optional[str]: the URL (None until set from the server).
615 """
616 return self._properties.get("user_email")
617
618 @property
619 def created(self):
620 """Datetime at which the job was created.
621
622 Returns:
623 Optional[datetime.datetime]:
624 the creation time (None until set from the server).
625 """
626 millis = _helpers._get_sub_prop(
627 self._properties, ["statistics", "creationTime"]
628 )
629 if millis is not None:
630 return _helpers._datetime_from_microseconds(millis * 1000.0)
631
632 @property
633 def started(self):
634 """Datetime at which the job was started.
635
636 Returns:
637 Optional[datetime.datetime]:
638 the start time (None until set from the server).
639 """
640 millis = _helpers._get_sub_prop(self._properties, ["statistics", "startTime"])
641 if millis is not None:
642 return _helpers._datetime_from_microseconds(millis * 1000.0)
643
644 @property
645 def ended(self):
646 """Datetime at which the job finished.
647
648 Returns:
649 Optional[datetime.datetime]:
650 the end time (None until set from the server).
651 """
652 millis = _helpers._get_sub_prop(self._properties, ["statistics", "endTime"])
653 if millis is not None:
654 return _helpers._datetime_from_microseconds(millis * 1000.0)
655
656 def _job_statistics(self):
657 """Helper for job-type specific statistics-based properties."""
658 statistics = self._properties.get("statistics", {})
659 return statistics.get(self._JOB_TYPE, {})
660
661 @property
662 def reservation_usage(self):
663 """Job resource usage breakdown by reservation.
664
665 Returns:
666 List[google.cloud.bigquery.job.ReservationUsage]:
667 Reservation usage stats. Can be empty if not set from the server.
668 """
669 usage_stats_raw = _helpers._get_sub_prop(
670 self._properties, ["statistics", "reservationUsage"], default=()
671 )
672 return [
673 ReservationUsage(name=usage["name"], slot_ms=int(usage["slotMs"]))
674 for usage in usage_stats_raw
675 ]
676
677 @property
678 def transaction_info(self) -> Optional[TransactionInfo]:
679 """Information of the multi-statement transaction if this job is part of one.
680
681 Since a scripting query job can execute multiple transactions, this
682 property is only expected on child jobs. Use the
683 :meth:`google.cloud.bigquery.client.Client.list_jobs` method with the
684 ``parent_job`` parameter to iterate over child jobs.
685
686 .. versionadded:: 2.24.0
687 """
688 info = self._properties.get("statistics", {}).get("transactionInfo")
689 if info is None:
690 return None
691 else:
692 return TransactionInfo.from_api_repr(info)
693
694 @property
695 def error_result(self):
696 """Output only. Final error result of the job.
697
698 If present, indicates that the job has completed and was unsuccessful.
699
700 See:
701 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result
702
703 Returns:
704 Optional[Mapping]: the error information (None until set from the server).
705 """
706 status = self._properties.get("status")
707 if status is not None:
708 return status.get("errorResult")
709
710 @property
711 def errors(self):
712 """Output only. The first errors encountered during the running of the job.
713
714 The final message includes the number of errors that caused the process to stop.
715 Errors here do not necessarily mean that the job has not completed or was unsuccessful.
716
717 See:
718 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.errors
719
720 Returns:
721 Optional[List[Mapping]]:
722 the error information (None until set from the server).
723 """
724 status = self._properties.get("status")
725 if status is not None:
726 return status.get("errors")
727
728 @property
729 def state(self):
730 """Output only. Running state of the job.
731
732 Valid states include 'PENDING', 'RUNNING', and 'DONE'.
733
734 See:
735 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.state
736
737 Returns:
738 Optional[str]:
739 the state (None until set from the server).
740 """
741 status = self._properties.get("status", {})
742 return status.get("state")
743
744 def _set_properties(self, api_response):
745 """Update properties from resource in body of ``api_response``
746
747 Args:
748 api_response (Dict): response returned from an API call.
749 """
750 cleaned = api_response.copy()
751 statistics = cleaned.setdefault("statistics", {})
752 if "creationTime" in statistics:
753 statistics["creationTime"] = float(statistics["creationTime"])
754 if "startTime" in statistics:
755 statistics["startTime"] = float(statistics["startTime"])
756 if "endTime" in statistics:
757 statistics["endTime"] = float(statistics["endTime"])
758
759 self._properties = cleaned
760
761 # For Future interface
762 self._set_future_result()
763
764 @classmethod
765 def _check_resource_config(cls, resource):
766 """Helper for :meth:`from_api_repr`
767
768 Args:
769 resource (Dict): resource for the job.
770
771 Raises:
772 KeyError:
773 If the resource has no identifier, or
774 is missing the appropriate configuration.
775 """
776 if "jobReference" not in resource or "jobId" not in resource["jobReference"]:
777 raise KeyError(
778 "Resource lacks required identity information: "
779 '["jobReference"]["jobId"]'
780 )
781 if (
782 "configuration" not in resource
783 or cls._JOB_TYPE not in resource["configuration"]
784 ):
785 raise KeyError(
786 "Resource lacks required configuration: "
787 '["configuration"]["%s"]' % cls._JOB_TYPE
788 )
789
790 def to_api_repr(self):
791 """Generate a resource for the job."""
792 return copy.deepcopy(self._properties)
793
794 _build_resource = to_api_repr # backward-compatibility alias
795
796 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
797 """API call: begin the job via a POST request
798
799 See
800 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
801
802 Args:
803 client (Optional[google.cloud.bigquery.client.Client]):
804 The client to use. If not passed, falls back to the ``client``
805 associated with the job object or``NoneType``
806 retry (Optional[google.api_core.retry.Retry]):
807 How to retry the RPC.
808 timeout (Optional[float]):
809 The number of seconds to wait for the underlying HTTP transport
810 before using ``retry``.
811
812 Raises:
813 ValueError:
814 If the job has already begun.
815 """
816 if self.state is not None:
817 raise ValueError("Job already begun.")
818
819 client = self._require_client(client)
820 path = "/projects/%s/jobs" % (self.project,)
821
822 # jobs.insert is idempotent because we ensure that every new
823 # job has an ID.
824 span_attributes = {"path": path}
825 api_response = client._call_api(
826 retry,
827 span_name="BigQuery.job.begin",
828 span_attributes=span_attributes,
829 job_ref=self,
830 method="POST",
831 path=path,
832 data=self.to_api_repr(),
833 timeout=timeout,
834 )
835 self._set_properties(api_response)
836
837 def exists(
838 self,
839 client=None,
840 retry: "retries.Retry" = DEFAULT_RETRY,
841 timeout: Optional[float] = None,
842 ) -> bool:
843 """API call: test for the existence of the job via a GET request
844
845 See
846 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
847
848 Args:
849 client (Optional[google.cloud.bigquery.client.Client]):
850 the client to use. If not passed, falls back to the
851 ``client`` stored on the current dataset.
852
853 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
854 timeout (Optional[float]):
855 The number of seconds to wait for the underlying HTTP transport
856 before using ``retry``.
857
858 Returns:
859 bool: Boolean indicating existence of the job.
860 """
861 client = self._require_client(client)
862
863 extra_params = {"fields": "id"}
864 if self.location:
865 extra_params["location"] = self.location
866
867 try:
868 span_attributes = {"path": self.path}
869
870 client._call_api(
871 retry,
872 span_name="BigQuery.job.exists",
873 span_attributes=span_attributes,
874 job_ref=self,
875 method="GET",
876 path=self.path,
877 query_params=extra_params,
878 timeout=timeout,
879 )
880 except exceptions.NotFound:
881 return False
882 else:
883 return True
884
885 def reload(
886 self,
887 client=None,
888 retry: "retries.Retry" = DEFAULT_RETRY,
889 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
890 ):
891 """API call: refresh job properties via a GET request.
892
893 See
894 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
895
896 Args:
897 client (Optional[google.cloud.bigquery.client.Client]):
898 the client to use. If not passed, falls back to the
899 ``client`` stored on the current dataset.
900
901 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
902 timeout (Optional[float]):
903 The number of seconds to wait for the underlying HTTP transport
904 before using ``retry``.
905 """
906 client = self._require_client(client)
907
908 got_job = client.get_job(
909 self,
910 project=self.project,
911 location=self.location,
912 retry=retry,
913 timeout=timeout,
914 )
915 self._set_properties(got_job._properties)
916
917 def cancel(
918 self,
919 client=None,
920 retry: Optional[retries.Retry] = DEFAULT_RETRY,
921 timeout: Optional[float] = None,
922 ) -> bool:
923 """API call: cancel job via a POST request
924
925 See
926 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
927
928 Args:
929 client (Optional[google.cloud.bigquery.client.Client]):
930 the client to use. If not passed, falls back to the
931 ``client`` stored on the current dataset.
932 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
933 timeout (Optional[float]):
934 The number of seconds to wait for the underlying HTTP transport
935 before using ``retry``
936
937 Returns:
938 bool: Boolean indicating that the cancel request was sent.
939 """
940 client = self._require_client(client)
941
942 extra_params = {}
943 if self.location:
944 extra_params["location"] = self.location
945
946 path = "{}/cancel".format(self.path)
947 span_attributes = {"path": path}
948
949 api_response = client._call_api(
950 retry,
951 span_name="BigQuery.job.cancel",
952 span_attributes=span_attributes,
953 job_ref=self,
954 method="POST",
955 path=path,
956 query_params=extra_params,
957 timeout=timeout,
958 )
959 self._set_properties(api_response["job"])
960 # The Future interface requires that we return True if the *attempt*
961 # to cancel was successful.
962 return True
963
964 # The following methods implement the PollingFuture interface. Note that
965 # the methods above are from the pre-Future interface and are left for
966 # compatibility. The only "overloaded" method is :meth:`cancel`, which
967 # satisfies both interfaces.
968
969 def _set_future_result(self):
970 """Set the result or exception from the job if it is complete."""
971 # This must be done in a lock to prevent the polling thread
972 # and main thread from both executing the completion logic
973 # at the same time.
974 with self._completion_lock:
975 # If the operation isn't complete or if the result has already been
976 # set, do not call set_result/set_exception again.
977 # Note: self._result_set is set to True in set_result and
978 # set_exception, in case those methods are invoked directly.
979 if not self.done(reload=False) or self._result_set:
980 return
981
982 if self.error_result is not None:
983 exception = _error_result_to_exception(
984 self.error_result, self.errors or ()
985 )
986 self.set_exception(exception)
987 else:
988 self.set_result(self)
989
990 def done(
991 self,
992 retry: "retries.Retry" = DEFAULT_RETRY,
993 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
994 reload: bool = True,
995 ) -> bool:
996 """Checks if the job is complete.
997
998 Args:
999 retry (Optional[google.api_core.retry.Retry]):
1000 How to retry the RPC. If the job state is ``DONE``, retrying is aborted
1001 early, as the job will not change anymore.
1002 timeout (Optional[float]):
1003 The number of seconds to wait for the underlying HTTP transport
1004 before using ``retry``.
1005 reload (Optional[bool]):
1006 If ``True``, make an API call to refresh the job state of
1007 unfinished jobs before checking. Default ``True``.
1008
1009 Returns:
1010 bool: True if the job is complete, False otherwise.
1011 """
1012 # Do not refresh is the state is already done, as the job will not
1013 # change once complete.
1014 if self.state != _DONE_STATE and reload:
1015 self.reload(retry=retry, timeout=timeout)
1016 return self.state == _DONE_STATE
1017
1018 def result( # type: ignore # (incompatible with supertype)
1019 self,
1020 retry: Optional[retries.Retry] = DEFAULT_RETRY,
1021 timeout: Optional[float] = None,
1022 ) -> "_AsyncJob":
1023 """Start the job and wait for it to complete and get the result.
1024
1025 Args:
1026 retry (Optional[google.api_core.retry.Retry]):
1027 How to retry the RPC. If the job state is ``DONE``, retrying is aborted
1028 early, as the job will not change anymore.
1029 timeout (Optional[float]):
1030 The number of seconds to wait for the underlying HTTP transport
1031 before using ``retry``.
1032 If multiple requests are made under the hood, ``timeout``
1033 applies to each individual request.
1034
1035 Returns:
1036 _AsyncJob: This instance.
1037
1038 Raises:
1039 google.cloud.exceptions.GoogleAPICallError:
1040 if the job failed.
1041 concurrent.futures.TimeoutError:
1042 if the job did not complete in the given timeout.
1043 """
1044 if self.state is None:
1045 self._begin(retry=retry, timeout=timeout)
1046
1047 return super(_AsyncJob, self).result(timeout=timeout, retry=retry)
1048
1049 def cancelled(self):
1050 """Check if the job has been cancelled.
1051
1052 This always returns False. It's not possible to check if a job was
1053 cancelled in the API. This method is here to satisfy the interface
1054 for :class:`google.api_core.future.Future`.
1055
1056 Returns:
1057 bool: False
1058 """
1059 return (
1060 self.error_result is not None
1061 and self.error_result.get("reason") == _STOPPED_REASON
1062 )
1063
1064 def __repr__(self):
1065 result = (
1066 f"{self.__class__.__name__}<"
1067 f"project={self.project}, location={self.location}, id={self.job_id}"
1068 ">"
1069 )
1070 return result
1071
1072
1073class ScriptStackFrame(object):
1074 """Stack frame showing the line/column/procedure name where the current
1075 evaluation happened.
1076
1077 Args:
1078 resource (Map[str, Any]): JSON representation of object.
1079 """
1080
1081 def __init__(self, resource):
1082 self._properties = resource
1083
1084 @property
1085 def procedure_id(self):
1086 """Optional[str]: Name of the active procedure.
1087
1088 Omitted if in a top-level script.
1089 """
1090 return self._properties.get("procedureId")
1091
1092 @property
1093 def text(self):
1094 """str: Text of the current statement/expression."""
1095 return self._properties.get("text")
1096
1097 @property
1098 def start_line(self):
1099 """int: One-based start line."""
1100 return _helpers._int_or_none(self._properties.get("startLine"))
1101
1102 @property
1103 def start_column(self):
1104 """int: One-based start column."""
1105 return _helpers._int_or_none(self._properties.get("startColumn"))
1106
1107 @property
1108 def end_line(self):
1109 """int: One-based end line."""
1110 return _helpers._int_or_none(self._properties.get("endLine"))
1111
1112 @property
1113 def end_column(self):
1114 """int: One-based end column."""
1115 return _helpers._int_or_none(self._properties.get("endColumn"))
1116
1117
1118class ScriptStatistics(object):
1119 """Statistics for a child job of a script.
1120
1121 Args:
1122 resource (Map[str, Any]): JSON representation of object.
1123 """
1124
1125 def __init__(self, resource):
1126 self._properties = resource
1127
1128 @property
1129 def stack_frames(self) -> Sequence[ScriptStackFrame]:
1130 """Stack trace where the current evaluation happened.
1131
1132 Shows line/column/procedure name of each frame on the stack at the
1133 point where the current evaluation happened.
1134
1135 The leaf frame is first, the primary script is last.
1136 """
1137 return [
1138 ScriptStackFrame(frame) for frame in self._properties.get("stackFrames", [])
1139 ]
1140
1141 @property
1142 def evaluation_kind(self) -> Optional[str]:
1143 """str: Indicates the type of child job.
1144
1145 Possible values include ``STATEMENT`` and ``EXPRESSION``.
1146 """
1147 return self._properties.get("evaluationKind")
1148
1149
1150class SessionInfo:
1151 """[Preview] Information of the session if this job is part of one.
1152
1153 .. versionadded:: 2.29.0
1154
1155 Args:
1156 resource (Map[str, Any]): JSON representation of object.
1157 """
1158
1159 def __init__(self, resource):
1160 self._properties = resource
1161
1162 @property
1163 def session_id(self) -> Optional[str]:
1164 """The ID of the session."""
1165 return self._properties.get("sessionId")
1166
1167
1168class UnknownJob(_AsyncJob):
1169 """A job whose type cannot be determined."""
1170
1171 @classmethod
1172 def from_api_repr(cls, resource: dict, client) -> "UnknownJob":
1173 """Construct an UnknownJob from the JSON representation.
1174
1175 Args:
1176 resource (Dict): JSON representation of a job.
1177 client (google.cloud.bigquery.client.Client):
1178 Client connected to BigQuery API.
1179
1180 Returns:
1181 UnknownJob: Job corresponding to the resource.
1182 """
1183 job_ref_properties = resource.get(
1184 "jobReference", {"projectId": client.project, "jobId": None}
1185 )
1186 job_ref = _JobReference._from_api_repr(job_ref_properties)
1187 job = cls(job_ref, client)
1188 # Populate the job reference with the project, even if it has been
1189 # redacted, because we know it should equal that of the request.
1190 resource["jobReference"] = job_ref_properties
1191 job._properties = resource
1192 return job