1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Base classes and helpers for job classes."""
16
17from collections import namedtuple
18import copy
19import http
20import threading
21import typing
22from typing import ClassVar, Dict, Optional, Sequence
23
24from google.api_core import retry as retries
25from google.api_core import exceptions
26import google.api_core.future.polling
27
28from google.cloud.bigquery import _helpers
29from google.cloud.bigquery._helpers import _int_or_none
30from google.cloud.bigquery.retry import (
31 DEFAULT_GET_JOB_TIMEOUT,
32 DEFAULT_RETRY,
33)
34
35
36_DONE_STATE = "DONE"
37_STOPPED_REASON = "stopped"
38_ERROR_REASON_TO_EXCEPTION = {
39 "accessDenied": http.client.FORBIDDEN,
40 "backendError": http.client.INTERNAL_SERVER_ERROR,
41 "billingNotEnabled": http.client.FORBIDDEN,
42 "billingTierLimitExceeded": http.client.BAD_REQUEST,
43 "blocked": http.client.FORBIDDEN,
44 "duplicate": http.client.CONFLICT,
45 "internalError": http.client.INTERNAL_SERVER_ERROR,
46 "invalid": http.client.BAD_REQUEST,
47 "invalidQuery": http.client.BAD_REQUEST,
48 "notFound": http.client.NOT_FOUND,
49 "notImplemented": http.client.NOT_IMPLEMENTED,
50 "policyViolation": http.client.FORBIDDEN,
51 "quotaExceeded": http.client.FORBIDDEN,
52 "rateLimitExceeded": http.client.TOO_MANY_REQUESTS,
53 "resourceInUse": http.client.BAD_REQUEST,
54 "resourcesExceeded": http.client.BAD_REQUEST,
55 "responseTooLarge": http.client.FORBIDDEN,
56 "stopped": http.client.OK,
57 "tableUnavailable": http.client.BAD_REQUEST,
58}
59
60
61def _error_result_to_exception(error_result, errors=None):
62 """Maps BigQuery error reasons to an exception.
63
64 The reasons and their matching HTTP status codes are documented on
65 the `troubleshooting errors`_ page.
66
67 .. _troubleshooting errors: https://cloud.google.com/bigquery\
68 /troubleshooting-errors
69
70 Args:
71 error_result (Mapping[str, str]): The error result from BigQuery.
72 errors (Union[Iterable[str], None]): The detailed error messages.
73
74 Returns:
75 google.cloud.exceptions.GoogleAPICallError: The mapped exception.
76 """
77 reason = error_result.get("reason")
78 status_code = _ERROR_REASON_TO_EXCEPTION.get(
79 reason, http.client.INTERNAL_SERVER_ERROR
80 )
81 # Manually create error message to preserve both error_result and errors.
82 # Can be removed once b/310544564 and b/318889899 are resolved.
83 concatenated_errors = ""
84 if errors:
85 concatenated_errors = "; "
86 for err in errors:
87 concatenated_errors += ", ".join(
88 [f"{key}: {value}" for key, value in err.items()]
89 )
90 concatenated_errors += "; "
91
92 # strips off the last unneeded semicolon and space
93 concatenated_errors = concatenated_errors[:-2]
94
95 error_message = error_result.get("message", "") + concatenated_errors
96
97 return exceptions.from_http_status(
98 status_code, error_message, errors=[error_result]
99 )
100
101
102ReservationUsage = namedtuple("ReservationUsage", "name slot_ms")
103ReservationUsage.__doc__ = "Job resource usage for a reservation."
104ReservationUsage.name.__doc__ = (
105 'Reservation name or "unreserved" for on-demand resources usage.'
106)
107ReservationUsage.slot_ms.__doc__ = (
108 "Total slot milliseconds used by the reservation for a particular job."
109)
110
111
112class TransactionInfo(typing.NamedTuple):
113 """[Alpha] Information of a multi-statement transaction.
114
115 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo
116
117 .. versionadded:: 2.24.0
118 """
119
120 transaction_id: str
121 """Output only. ID of the transaction."""
122
123 @classmethod
124 def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo":
125 return cls(transaction_info["transactionId"])
126
127
128class _JobReference(object):
129 """A reference to a job.
130
131 Args:
132 job_id (str): ID of the job to run.
133 project (str): ID of the project where the job runs.
134 location (str): Location of where the job runs.
135 """
136
137 def __init__(self, job_id, project, location):
138 self._properties = {"jobId": job_id, "projectId": project}
139 # The location field must not be populated if it is None.
140 if location:
141 self._properties["location"] = location
142
143 @property
144 def job_id(self):
145 """str: ID of the job."""
146 return self._properties.get("jobId")
147
148 @property
149 def project(self):
150 """str: ID of the project where the job runs."""
151 return self._properties.get("projectId")
152
153 @property
154 def location(self):
155 """str: Location where the job runs."""
156 return self._properties.get("location")
157
158 def _to_api_repr(self):
159 """Returns the API resource representation of the job reference."""
160 return copy.deepcopy(self._properties)
161
162 @classmethod
163 def _from_api_repr(cls, resource):
164 """Returns a job reference for an API resource representation."""
165 job_id = resource.get("jobId")
166 project = resource.get("projectId")
167 location = resource.get("location")
168 job_ref = cls(job_id, project, location)
169 return job_ref
170
171
172class _JobConfig(object):
173 """Abstract base class for job configuration objects.
174
175 Args:
176 job_type (str): The key to use for the job configuration.
177 """
178
179 def __init__(self, job_type, **kwargs):
180 self._job_type = job_type
181 self._properties = {job_type: {}}
182 for prop, val in kwargs.items():
183 setattr(self, prop, val)
184
185 def __setattr__(self, name, value):
186 """Override to be able to raise error if an unknown property is being set"""
187 if not name.startswith("_") and not hasattr(type(self), name):
188 raise AttributeError(
189 "Property {} is unknown for {}.".format(name, type(self))
190 )
191 super(_JobConfig, self).__setattr__(name, value)
192
193 @property
194 def job_timeout_ms(self):
195 """Optional parameter. Job timeout in milliseconds. If this time limit is exceeded, BigQuery might attempt to stop the job.
196 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms
197 e.g.
198
199 job_config = bigquery.QueryJobConfig( job_timeout_ms = 5000 )
200 or
201 job_config.job_timeout_ms = 5000
202
203 Raises:
204 ValueError: If ``value`` type is invalid.
205 """
206
207 # None as this is an optional parameter.
208 if self._properties.get("jobTimeoutMs"):
209 return self._properties["jobTimeoutMs"]
210 return None
211
212 @job_timeout_ms.setter
213 def job_timeout_ms(self, value):
214 try:
215 value = _int_or_none(value)
216 except ValueError as err:
217 raise ValueError("Pass an int for jobTimeoutMs, e.g. 5000").with_traceback(
218 err.__traceback__
219 )
220
221 if value is not None:
222 # docs indicate a string is expected by the API
223 self._properties["jobTimeoutMs"] = str(value)
224 else:
225 self._properties.pop("jobTimeoutMs", None)
226
227 @property
228 def reservation(self):
229 """str: Optional. The reservation that job would use.
230
231 User can specify a reservation to execute the job. If reservation is
232 not set, reservation is determined based on the rules defined by the
233 reservation assignments. The expected format is
234 projects/{project}/locations/{location}/reservations/{reservation}.
235
236 Raises:
237 ValueError: If ``value`` type is not None or of string type.
238 """
239 return self._properties.setdefault("reservation", None)
240
241 @reservation.setter
242 def reservation(self, value):
243 if value and not isinstance(value, str):
244 raise ValueError("Reservation must be None or a string.")
245 self._properties["reservation"] = value
246
247 @property
248 def labels(self):
249 """Dict[str, str]: Labels for the job.
250
251 This method always returns a dict. Once a job has been created on the
252 server, its labels cannot be modified anymore.
253
254 Raises:
255 ValueError: If ``value`` type is invalid.
256 """
257 return self._properties.setdefault("labels", {})
258
259 @labels.setter
260 def labels(self, value):
261 if not isinstance(value, dict):
262 raise ValueError("Pass a dict")
263 self._properties["labels"] = value
264
265 def _get_sub_prop(self, key, default=None):
266 """Get a value in the ``self._properties[self._job_type]`` dictionary.
267
268 Most job properties are inside the dictionary related to the job type
269 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access
270 those properties::
271
272 self._get_sub_prop('destinationTable')
273
274 This is equivalent to using the ``_helpers._get_sub_prop`` function::
275
276 _helpers._get_sub_prop(
277 self._properties, ['query', 'destinationTable'])
278
279 Args:
280 key (str):
281 Key for the value to get in the
282 ``self._properties[self._job_type]`` dictionary.
283 default (Optional[object]):
284 Default value to return if the key is not found.
285 Defaults to :data:`None`.
286
287 Returns:
288 object: The value if present or the default.
289 """
290 return _helpers._get_sub_prop(
291 self._properties, [self._job_type, key], default=default
292 )
293
294 def _set_sub_prop(self, key, value):
295 """Set a value in the ``self._properties[self._job_type]`` dictionary.
296
297 Most job properties are inside the dictionary related to the job type
298 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set
299 those properties::
300
301 self._set_sub_prop('useLegacySql', False)
302
303 This is equivalent to using the ``_helper._set_sub_prop`` function::
304
305 _helper._set_sub_prop(
306 self._properties, ['query', 'useLegacySql'], False)
307
308 Args:
309 key (str):
310 Key to set in the ``self._properties[self._job_type]``
311 dictionary.
312 value (object): Value to set.
313 """
314 _helpers._set_sub_prop(self._properties, [self._job_type, key], value)
315
316 def _del_sub_prop(self, key):
317 """Remove ``key`` from the ``self._properties[self._job_type]`` dict.
318
319 Most job properties are inside the dictionary related to the job type
320 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear
321 those properties::
322
323 self._del_sub_prop('useLegacySql')
324
325 This is equivalent to using the ``_helper._del_sub_prop`` function::
326
327 _helper._del_sub_prop(
328 self._properties, ['query', 'useLegacySql'])
329
330 Args:
331 key (str):
332 Key to remove in the ``self._properties[self._job_type]``
333 dictionary.
334 """
335 _helpers._del_sub_prop(self._properties, [self._job_type, key])
336
337 def to_api_repr(self) -> dict:
338 """Build an API representation of the job config.
339
340 Returns:
341 Dict: A dictionary in the format used by the BigQuery API.
342 """
343 return copy.deepcopy(self._properties)
344
345 def _fill_from_default(self, default_job_config=None):
346 """Merge this job config with a default job config.
347
348 The keys in this object take precedence over the keys in the default
349 config. The merge is done at the top-level as well as for keys one
350 level below the job type.
351
352 Args:
353 default_job_config (google.cloud.bigquery.job._JobConfig):
354 The default job config that will be used to fill in self.
355
356 Returns:
357 google.cloud.bigquery.job._JobConfig: A new (merged) job config.
358 """
359 if not default_job_config:
360 new_job_config = copy.deepcopy(self)
361 return new_job_config
362
363 if self._job_type != default_job_config._job_type:
364 raise TypeError(
365 "attempted to merge two incompatible job types: "
366 + repr(self._job_type)
367 + ", "
368 + repr(default_job_config._job_type)
369 )
370
371 # cls is one of the job config subclasses that provides the job_type argument to
372 # this base class on instantiation, thus missing-parameter warning is a false
373 # positive here.
374 new_job_config = self.__class__() # pytype: disable=missing-parameter
375
376 default_job_properties = copy.deepcopy(default_job_config._properties)
377 for key in self._properties:
378 if key != self._job_type:
379 default_job_properties[key] = self._properties[key]
380
381 default_job_properties[self._job_type].update(self._properties[self._job_type])
382 new_job_config._properties = default_job_properties
383
384 return new_job_config
385
386 @classmethod
387 def from_api_repr(cls, resource: dict) -> "_JobConfig":
388 """Factory: construct a job configuration given its API representation
389
390 Args:
391 resource (Dict):
392 A job configuration in the same representation as is returned
393 from the API.
394
395 Returns:
396 google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``.
397 """
398 # cls is one of the job config subclasses that provides the job_type argument to
399 # this base class on instantiation, thus missing-parameter warning is a false
400 # positive here.
401 job_config = cls() # type: ignore # pytype: disable=missing-parameter
402 job_config._properties = resource
403 return job_config
404
405
406class _AsyncJob(google.api_core.future.polling.PollingFuture):
407 """Base class for asynchronous jobs.
408
409 Args:
410 job_id (Union[str, _JobReference]):
411 Job's ID in the project associated with the client or a
412 fully-qualified job reference.
413 client (google.cloud.bigquery.client.Client):
414 Client which holds credentials and project configuration.
415 """
416
417 _JOB_TYPE = "unknown"
418 _CONFIG_CLASS: ClassVar
419
420 def __init__(self, job_id, client):
421 super(_AsyncJob, self).__init__()
422
423 # The job reference can be either a plain job ID or the full resource.
424 # Populate the properties dictionary consistently depending on what has
425 # been passed in.
426 job_ref = job_id
427 if not isinstance(job_id, _JobReference):
428 job_ref = _JobReference(job_id, client.project, None)
429 self._properties = {"jobReference": job_ref._to_api_repr()}
430
431 self._client = client
432 self._result_set = False
433 self._completion_lock = threading.Lock()
434
435 @property
436 def configuration(self) -> _JobConfig:
437 """Job-type specific configurtion."""
438 configuration: _JobConfig = self._CONFIG_CLASS() # pytype: disable=not-callable
439 configuration._properties = self._properties.setdefault("configuration", {})
440 return configuration
441
442 @property
443 def job_id(self):
444 """str: ID of the job."""
445 return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"])
446
447 @property
448 def parent_job_id(self):
449 """Return the ID of the parent job.
450
451 See:
452 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id
453
454 Returns:
455 Optional[str]: parent job id.
456 """
457 return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"])
458
459 @property
460 def script_statistics(self) -> Optional["ScriptStatistics"]:
461 """Statistics for a child job of a script."""
462 resource = _helpers._get_sub_prop(
463 self._properties, ["statistics", "scriptStatistics"]
464 )
465 if resource is None:
466 return None
467 return ScriptStatistics(resource)
468
469 @property
470 def session_info(self) -> Optional["SessionInfo"]:
471 """[Preview] Information of the session if this job is part of one.
472
473 .. versionadded:: 2.29.0
474 """
475 resource = _helpers._get_sub_prop(
476 self._properties, ["statistics", "sessionInfo"]
477 )
478 if resource is None:
479 return None
480 return SessionInfo(resource)
481
482 @property
483 def num_child_jobs(self):
484 """The number of child jobs executed.
485
486 See:
487 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs
488
489 Returns:
490 int
491 """
492 count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"])
493 return int(count) if count is not None else 0
494
495 @property
496 def project(self):
497 """Project bound to the job.
498
499 Returns:
500 str: the project (derived from the client).
501 """
502 return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"])
503
504 @property
505 def location(self):
506 """str: Location where the job runs."""
507 return _helpers._get_sub_prop(self._properties, ["jobReference", "location"])
508
509 @property
510 def reservation_id(self):
511 """str: Name of the primary reservation assigned to this job.
512
513 Note that this could be different than reservations reported in
514 the reservation field if parent reservations were used to execute
515 this job.
516 """
517 return _helpers._get_sub_prop(
518 self._properties, ["statistics", "reservation_id"]
519 )
520
521 def _require_client(self, client):
522 """Check client or verify over-ride.
523
524 Args:
525 client (Optional[google.cloud.bigquery.client.Client]):
526 the client to use. If not passed, falls back to the
527 ``client`` stored on the current dataset.
528
529 Returns:
530 google.cloud.bigquery.client.Client:
531 The client passed in or the currently bound client.
532 """
533 if client is None:
534 client = self._client
535 return client
536
537 @property
538 def job_type(self):
539 """Type of job.
540
541 Returns:
542 str: one of 'load', 'copy', 'extract', 'query'.
543 """
544 return self._JOB_TYPE
545
546 @property
547 def path(self):
548 """URL path for the job's APIs.
549
550 Returns:
551 str: the path based on project and job ID.
552 """
553 return "/projects/%s/jobs/%s" % (self.project, self.job_id)
554
555 @property
556 def labels(self):
557 """Dict[str, str]: Labels for the job."""
558 return self._properties.setdefault("configuration", {}).setdefault("labels", {})
559
560 @property
561 def etag(self):
562 """ETag for the job resource.
563
564 Returns:
565 Optional[str]: the ETag (None until set from the server).
566 """
567 return self._properties.get("etag")
568
569 @property
570 def self_link(self):
571 """URL for the job resource.
572
573 Returns:
574 Optional[str]: the URL (None until set from the server).
575 """
576 return self._properties.get("selfLink")
577
578 @property
579 def user_email(self):
580 """E-mail address of user who submitted the job.
581
582 Returns:
583 Optional[str]: the URL (None until set from the server).
584 """
585 return self._properties.get("user_email")
586
587 @property
588 def created(self):
589 """Datetime at which the job was created.
590
591 Returns:
592 Optional[datetime.datetime]:
593 the creation time (None until set from the server).
594 """
595 millis = _helpers._get_sub_prop(
596 self._properties, ["statistics", "creationTime"]
597 )
598 if millis is not None:
599 return _helpers._datetime_from_microseconds(millis * 1000.0)
600
601 @property
602 def started(self):
603 """Datetime at which the job was started.
604
605 Returns:
606 Optional[datetime.datetime]:
607 the start time (None until set from the server).
608 """
609 millis = _helpers._get_sub_prop(self._properties, ["statistics", "startTime"])
610 if millis is not None:
611 return _helpers._datetime_from_microseconds(millis * 1000.0)
612
613 @property
614 def ended(self):
615 """Datetime at which the job finished.
616
617 Returns:
618 Optional[datetime.datetime]:
619 the end time (None until set from the server).
620 """
621 millis = _helpers._get_sub_prop(self._properties, ["statistics", "endTime"])
622 if millis is not None:
623 return _helpers._datetime_from_microseconds(millis * 1000.0)
624
625 def _job_statistics(self):
626 """Helper for job-type specific statistics-based properties."""
627 statistics = self._properties.get("statistics", {})
628 return statistics.get(self._JOB_TYPE, {})
629
630 @property
631 def reservation_usage(self):
632 """Job resource usage breakdown by reservation.
633
634 Returns:
635 List[google.cloud.bigquery.job.ReservationUsage]:
636 Reservation usage stats. Can be empty if not set from the server.
637 """
638 usage_stats_raw = _helpers._get_sub_prop(
639 self._properties, ["statistics", "reservationUsage"], default=()
640 )
641 return [
642 ReservationUsage(name=usage["name"], slot_ms=int(usage["slotMs"]))
643 for usage in usage_stats_raw
644 ]
645
646 @property
647 def transaction_info(self) -> Optional[TransactionInfo]:
648 """Information of the multi-statement transaction if this job is part of one.
649
650 Since a scripting query job can execute multiple transactions, this
651 property is only expected on child jobs. Use the
652 :meth:`google.cloud.bigquery.client.Client.list_jobs` method with the
653 ``parent_job`` parameter to iterate over child jobs.
654
655 .. versionadded:: 2.24.0
656 """
657 info = self._properties.get("statistics", {}).get("transactionInfo")
658 if info is None:
659 return None
660 else:
661 return TransactionInfo.from_api_repr(info)
662
663 @property
664 def error_result(self):
665 """Error information about the job as a whole.
666
667 Returns:
668 Optional[Mapping]: the error information (None until set from the server).
669 """
670 status = self._properties.get("status")
671 if status is not None:
672 return status.get("errorResult")
673
674 @property
675 def errors(self):
676 """Information about individual errors generated by the job.
677
678 Returns:
679 Optional[List[Mapping]]:
680 the error information (None until set from the server).
681 """
682 status = self._properties.get("status")
683 if status is not None:
684 return status.get("errors")
685
686 @property
687 def state(self):
688 """Status of the job.
689
690 Returns:
691 Optional[str]:
692 the state (None until set from the server).
693 """
694 status = self._properties.get("status", {})
695 return status.get("state")
696
697 def _set_properties(self, api_response):
698 """Update properties from resource in body of ``api_response``
699
700 Args:
701 api_response (Dict): response returned from an API call.
702 """
703 cleaned = api_response.copy()
704 statistics = cleaned.setdefault("statistics", {})
705 if "creationTime" in statistics:
706 statistics["creationTime"] = float(statistics["creationTime"])
707 if "startTime" in statistics:
708 statistics["startTime"] = float(statistics["startTime"])
709 if "endTime" in statistics:
710 statistics["endTime"] = float(statistics["endTime"])
711
712 self._properties = cleaned
713
714 # For Future interface
715 self._set_future_result()
716
717 @classmethod
718 def _check_resource_config(cls, resource):
719 """Helper for :meth:`from_api_repr`
720
721 Args:
722 resource (Dict): resource for the job.
723
724 Raises:
725 KeyError:
726 If the resource has no identifier, or
727 is missing the appropriate configuration.
728 """
729 if "jobReference" not in resource or "jobId" not in resource["jobReference"]:
730 raise KeyError(
731 "Resource lacks required identity information: "
732 '["jobReference"]["jobId"]'
733 )
734 if (
735 "configuration" not in resource
736 or cls._JOB_TYPE not in resource["configuration"]
737 ):
738 raise KeyError(
739 "Resource lacks required configuration: "
740 '["configuration"]["%s"]' % cls._JOB_TYPE
741 )
742
743 def to_api_repr(self):
744 """Generate a resource for the job."""
745 return copy.deepcopy(self._properties)
746
747 _build_resource = to_api_repr # backward-compatibility alias
748
749 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
750 """API call: begin the job via a POST request
751
752 See
753 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
754
755 Args:
756 client (Optional[google.cloud.bigquery.client.Client]):
757 The client to use. If not passed, falls back to the ``client``
758 associated with the job object or``NoneType``
759 retry (Optional[google.api_core.retry.Retry]):
760 How to retry the RPC.
761 timeout (Optional[float]):
762 The number of seconds to wait for the underlying HTTP transport
763 before using ``retry``.
764
765 Raises:
766 ValueError:
767 If the job has already begun.
768 """
769 if self.state is not None:
770 raise ValueError("Job already begun.")
771
772 client = self._require_client(client)
773 path = "/projects/%s/jobs" % (self.project,)
774
775 # jobs.insert is idempotent because we ensure that every new
776 # job has an ID.
777 span_attributes = {"path": path}
778 api_response = client._call_api(
779 retry,
780 span_name="BigQuery.job.begin",
781 span_attributes=span_attributes,
782 job_ref=self,
783 method="POST",
784 path=path,
785 data=self.to_api_repr(),
786 timeout=timeout,
787 )
788 self._set_properties(api_response)
789
790 def exists(
791 self,
792 client=None,
793 retry: "retries.Retry" = DEFAULT_RETRY,
794 timeout: Optional[float] = None,
795 ) -> bool:
796 """API call: test for the existence of the job via a GET request
797
798 See
799 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
800
801 Args:
802 client (Optional[google.cloud.bigquery.client.Client]):
803 the client to use. If not passed, falls back to the
804 ``client`` stored on the current dataset.
805
806 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
807 timeout (Optional[float]):
808 The number of seconds to wait for the underlying HTTP transport
809 before using ``retry``.
810
811 Returns:
812 bool: Boolean indicating existence of the job.
813 """
814 client = self._require_client(client)
815
816 extra_params = {"fields": "id"}
817 if self.location:
818 extra_params["location"] = self.location
819
820 try:
821 span_attributes = {"path": self.path}
822
823 client._call_api(
824 retry,
825 span_name="BigQuery.job.exists",
826 span_attributes=span_attributes,
827 job_ref=self,
828 method="GET",
829 path=self.path,
830 query_params=extra_params,
831 timeout=timeout,
832 )
833 except exceptions.NotFound:
834 return False
835 else:
836 return True
837
838 def reload(
839 self,
840 client=None,
841 retry: "retries.Retry" = DEFAULT_RETRY,
842 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
843 ):
844 """API call: refresh job properties via a GET request.
845
846 See
847 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
848
849 Args:
850 client (Optional[google.cloud.bigquery.client.Client]):
851 the client to use. If not passed, falls back to the
852 ``client`` stored on the current dataset.
853
854 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
855 timeout (Optional[float]):
856 The number of seconds to wait for the underlying HTTP transport
857 before using ``retry``.
858 """
859 client = self._require_client(client)
860
861 got_job = client.get_job(
862 self,
863 project=self.project,
864 location=self.location,
865 retry=retry,
866 timeout=timeout,
867 )
868 self._set_properties(got_job._properties)
869
870 def cancel(
871 self,
872 client=None,
873 retry: Optional[retries.Retry] = DEFAULT_RETRY,
874 timeout: Optional[float] = None,
875 ) -> bool:
876 """API call: cancel job via a POST request
877
878 See
879 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
880
881 Args:
882 client (Optional[google.cloud.bigquery.client.Client]):
883 the client to use. If not passed, falls back to the
884 ``client`` stored on the current dataset.
885 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
886 timeout (Optional[float]):
887 The number of seconds to wait for the underlying HTTP transport
888 before using ``retry``
889
890 Returns:
891 bool: Boolean indicating that the cancel request was sent.
892 """
893 client = self._require_client(client)
894
895 extra_params = {}
896 if self.location:
897 extra_params["location"] = self.location
898
899 path = "{}/cancel".format(self.path)
900 span_attributes = {"path": path}
901
902 api_response = client._call_api(
903 retry,
904 span_name="BigQuery.job.cancel",
905 span_attributes=span_attributes,
906 job_ref=self,
907 method="POST",
908 path=path,
909 query_params=extra_params,
910 timeout=timeout,
911 )
912 self._set_properties(api_response["job"])
913 # The Future interface requires that we return True if the *attempt*
914 # to cancel was successful.
915 return True
916
917 # The following methods implement the PollingFuture interface. Note that
918 # the methods above are from the pre-Future interface and are left for
919 # compatibility. The only "overloaded" method is :meth:`cancel`, which
920 # satisfies both interfaces.
921
922 def _set_future_result(self):
923 """Set the result or exception from the job if it is complete."""
924 # This must be done in a lock to prevent the polling thread
925 # and main thread from both executing the completion logic
926 # at the same time.
927 with self._completion_lock:
928 # If the operation isn't complete or if the result has already been
929 # set, do not call set_result/set_exception again.
930 # Note: self._result_set is set to True in set_result and
931 # set_exception, in case those methods are invoked directly.
932 if not self.done(reload=False) or self._result_set:
933 return
934
935 if self.error_result is not None:
936 exception = _error_result_to_exception(
937 self.error_result, self.errors or ()
938 )
939 self.set_exception(exception)
940 else:
941 self.set_result(self)
942
943 def done(
944 self,
945 retry: "retries.Retry" = DEFAULT_RETRY,
946 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
947 reload: bool = True,
948 ) -> bool:
949 """Checks if the job is complete.
950
951 Args:
952 retry (Optional[google.api_core.retry.Retry]):
953 How to retry the RPC. If the job state is ``DONE``, retrying is aborted
954 early, as the job will not change anymore.
955 timeout (Optional[float]):
956 The number of seconds to wait for the underlying HTTP transport
957 before using ``retry``.
958 reload (Optional[bool]):
959 If ``True``, make an API call to refresh the job state of
960 unfinished jobs before checking. Default ``True``.
961
962 Returns:
963 bool: True if the job is complete, False otherwise.
964 """
965 # Do not refresh is the state is already done, as the job will not
966 # change once complete.
967 if self.state != _DONE_STATE and reload:
968 self.reload(retry=retry, timeout=timeout)
969 return self.state == _DONE_STATE
970
971 def result( # type: ignore # (incompatible with supertype)
972 self,
973 retry: Optional[retries.Retry] = DEFAULT_RETRY,
974 timeout: Optional[float] = None,
975 ) -> "_AsyncJob":
976 """Start the job and wait for it to complete and get the result.
977
978 Args:
979 retry (Optional[google.api_core.retry.Retry]):
980 How to retry the RPC. If the job state is ``DONE``, retrying is aborted
981 early, as the job will not change anymore.
982 timeout (Optional[float]):
983 The number of seconds to wait for the underlying HTTP transport
984 before using ``retry``.
985 If multiple requests are made under the hood, ``timeout``
986 applies to each individual request.
987
988 Returns:
989 _AsyncJob: This instance.
990
991 Raises:
992 google.cloud.exceptions.GoogleAPICallError:
993 if the job failed.
994 concurrent.futures.TimeoutError:
995 if the job did not complete in the given timeout.
996 """
997 if self.state is None:
998 self._begin(retry=retry, timeout=timeout)
999
1000 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
1001 return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
1002
1003 def cancelled(self):
1004 """Check if the job has been cancelled.
1005
1006 This always returns False. It's not possible to check if a job was
1007 cancelled in the API. This method is here to satisfy the interface
1008 for :class:`google.api_core.future.Future`.
1009
1010 Returns:
1011 bool: False
1012 """
1013 return (
1014 self.error_result is not None
1015 and self.error_result.get("reason") == _STOPPED_REASON
1016 )
1017
1018 def __repr__(self):
1019 result = (
1020 f"{self.__class__.__name__}<"
1021 f"project={self.project}, location={self.location}, id={self.job_id}"
1022 ">"
1023 )
1024 return result
1025
1026
1027class ScriptStackFrame(object):
1028 """Stack frame showing the line/column/procedure name where the current
1029 evaluation happened.
1030
1031 Args:
1032 resource (Map[str, Any]): JSON representation of object.
1033 """
1034
1035 def __init__(self, resource):
1036 self._properties = resource
1037
1038 @property
1039 def procedure_id(self):
1040 """Optional[str]: Name of the active procedure.
1041
1042 Omitted if in a top-level script.
1043 """
1044 return self._properties.get("procedureId")
1045
1046 @property
1047 def text(self):
1048 """str: Text of the current statement/expression."""
1049 return self._properties.get("text")
1050
1051 @property
1052 def start_line(self):
1053 """int: One-based start line."""
1054 return _helpers._int_or_none(self._properties.get("startLine"))
1055
1056 @property
1057 def start_column(self):
1058 """int: One-based start column."""
1059 return _helpers._int_or_none(self._properties.get("startColumn"))
1060
1061 @property
1062 def end_line(self):
1063 """int: One-based end line."""
1064 return _helpers._int_or_none(self._properties.get("endLine"))
1065
1066 @property
1067 def end_column(self):
1068 """int: One-based end column."""
1069 return _helpers._int_or_none(self._properties.get("endColumn"))
1070
1071
1072class ScriptStatistics(object):
1073 """Statistics for a child job of a script.
1074
1075 Args:
1076 resource (Map[str, Any]): JSON representation of object.
1077 """
1078
1079 def __init__(self, resource):
1080 self._properties = resource
1081
1082 @property
1083 def stack_frames(self) -> Sequence[ScriptStackFrame]:
1084 """Stack trace where the current evaluation happened.
1085
1086 Shows line/column/procedure name of each frame on the stack at the
1087 point where the current evaluation happened.
1088
1089 The leaf frame is first, the primary script is last.
1090 """
1091 return [
1092 ScriptStackFrame(frame) for frame in self._properties.get("stackFrames", [])
1093 ]
1094
1095 @property
1096 def evaluation_kind(self) -> Optional[str]:
1097 """str: Indicates the type of child job.
1098
1099 Possible values include ``STATEMENT`` and ``EXPRESSION``.
1100 """
1101 return self._properties.get("evaluationKind")
1102
1103
1104class SessionInfo:
1105 """[Preview] Information of the session if this job is part of one.
1106
1107 .. versionadded:: 2.29.0
1108
1109 Args:
1110 resource (Map[str, Any]): JSON representation of object.
1111 """
1112
1113 def __init__(self, resource):
1114 self._properties = resource
1115
1116 @property
1117 def session_id(self) -> Optional[str]:
1118 """The ID of the session."""
1119 return self._properties.get("sessionId")
1120
1121
1122class UnknownJob(_AsyncJob):
1123 """A job whose type cannot be determined."""
1124
1125 @classmethod
1126 def from_api_repr(cls, resource: dict, client) -> "UnknownJob":
1127 """Construct an UnknownJob from the JSON representation.
1128
1129 Args:
1130 resource (Dict): JSON representation of a job.
1131 client (google.cloud.bigquery.client.Client):
1132 Client connected to BigQuery API.
1133
1134 Returns:
1135 UnknownJob: Job corresponding to the resource.
1136 """
1137 job_ref_properties = resource.get(
1138 "jobReference", {"projectId": client.project, "jobId": None}
1139 )
1140 job_ref = _JobReference._from_api_repr(job_ref_properties)
1141 job = cls(job_ref, client)
1142 # Populate the job reference with the project, even if it has been
1143 # redacted, because we know it should equal that of the request.
1144 resource["jobReference"] = job_ref_properties
1145 job._properties = resource
1146 return job