Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py: 44%
328 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Base classes and helpers for job classes."""
17from collections import namedtuple
18import copy
19import http
20import threading
21import typing
22from typing import ClassVar, Dict, Optional, Sequence
24from google.api_core import exceptions
25import google.api_core.future.polling
27from google.cloud.bigquery import _helpers
28from google.cloud.bigquery.retry import DEFAULT_RETRY
30if typing.TYPE_CHECKING: # pragma: NO COVER
31 from google.api_core import retry as retries
34_DONE_STATE = "DONE"
35_STOPPED_REASON = "stopped"
36_ERROR_REASON_TO_EXCEPTION = {
37 "accessDenied": http.client.FORBIDDEN,
38 "backendError": http.client.INTERNAL_SERVER_ERROR,
39 "billingNotEnabled": http.client.FORBIDDEN,
40 "billingTierLimitExceeded": http.client.BAD_REQUEST,
41 "blocked": http.client.FORBIDDEN,
42 "duplicate": http.client.CONFLICT,
43 "internalError": http.client.INTERNAL_SERVER_ERROR,
44 "invalid": http.client.BAD_REQUEST,
45 "invalidQuery": http.client.BAD_REQUEST,
46 "notFound": http.client.NOT_FOUND,
47 "notImplemented": http.client.NOT_IMPLEMENTED,
48 "policyViolation": http.client.FORBIDDEN,
49 "quotaExceeded": http.client.FORBIDDEN,
50 "rateLimitExceeded": http.client.FORBIDDEN,
51 "resourceInUse": http.client.BAD_REQUEST,
52 "resourcesExceeded": http.client.BAD_REQUEST,
53 "responseTooLarge": http.client.FORBIDDEN,
54 "stopped": http.client.OK,
55 "tableUnavailable": http.client.BAD_REQUEST,
56}
59def _error_result_to_exception(error_result):
60 """Maps BigQuery error reasons to an exception.
62 The reasons and their matching HTTP status codes are documented on
63 the `troubleshooting errors`_ page.
65 .. _troubleshooting errors: https://cloud.google.com/bigquery\
66 /troubleshooting-errors
68 Args:
69 error_result (Mapping[str, str]): The error result from BigQuery.
71 Returns:
72 google.cloud.exceptions.GoogleAPICallError: The mapped exception.
73 """
74 reason = error_result.get("reason")
75 status_code = _ERROR_REASON_TO_EXCEPTION.get(
76 reason, http.client.INTERNAL_SERVER_ERROR
77 )
78 return exceptions.from_http_status(
79 status_code, error_result.get("message", ""), errors=[error_result]
80 )
83ReservationUsage = namedtuple("ReservationUsage", "name slot_ms")
84ReservationUsage.__doc__ = "Job resource usage for a reservation."
85ReservationUsage.name.__doc__ = (
86 'Reservation name or "unreserved" for on-demand resources usage.'
87)
88ReservationUsage.slot_ms.__doc__ = (
89 "Total slot milliseconds used by the reservation for a particular job."
90)
93class TransactionInfo(typing.NamedTuple):
94 """[Alpha] Information of a multi-statement transaction.
96 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo
98 .. versionadded:: 2.24.0
99 """
101 transaction_id: str
102 """Output only. ID of the transaction."""
104 @classmethod
105 def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo":
106 return cls(transaction_info["transactionId"])
109class _JobReference(object):
110 """A reference to a job.
112 Args:
113 job_id (str): ID of the job to run.
114 project (str): ID of the project where the job runs.
115 location (str): Location of where the job runs.
116 """
118 def __init__(self, job_id, project, location):
119 self._properties = {"jobId": job_id, "projectId": project}
120 # The location field must not be populated if it is None.
121 if location:
122 self._properties["location"] = location
124 @property
125 def job_id(self):
126 """str: ID of the job."""
127 return self._properties.get("jobId")
129 @property
130 def project(self):
131 """str: ID of the project where the job runs."""
132 return self._properties.get("projectId")
134 @property
135 def location(self):
136 """str: Location where the job runs."""
137 return self._properties.get("location")
139 def _to_api_repr(self):
140 """Returns the API resource representation of the job reference."""
141 return copy.deepcopy(self._properties)
143 @classmethod
144 def _from_api_repr(cls, resource):
145 """Returns a job reference for an API resource representation."""
146 job_id = resource.get("jobId")
147 project = resource.get("projectId")
148 location = resource.get("location")
149 job_ref = cls(job_id, project, location)
150 return job_ref
153class _JobConfig(object):
154 """Abstract base class for job configuration objects.
156 Args:
157 job_type (str): The key to use for the job configuration.
158 """
160 def __init__(self, job_type, **kwargs):
161 self._job_type = job_type
162 self._properties = {job_type: {}}
163 for prop, val in kwargs.items():
164 setattr(self, prop, val)
166 def __setattr__(self, name, value):
167 """Override to be able to raise error if an unknown property is being set"""
168 if not name.startswith("_") and not hasattr(type(self), name):
169 raise AttributeError(
170 "Property {} is unknown for {}.".format(name, type(self))
171 )
172 super(_JobConfig, self).__setattr__(name, value)
174 @property
175 def labels(self):
176 """Dict[str, str]: Labels for the job.
178 This method always returns a dict. Once a job has been created on the
179 server, its labels cannot be modified anymore.
181 Raises:
182 ValueError: If ``value`` type is invalid.
183 """
184 return self._properties.setdefault("labels", {})
186 @labels.setter
187 def labels(self, value):
188 if not isinstance(value, dict):
189 raise ValueError("Pass a dict")
190 self._properties["labels"] = value
192 def _get_sub_prop(self, key, default=None):
193 """Get a value in the ``self._properties[self._job_type]`` dictionary.
195 Most job properties are inside the dictionary related to the job type
196 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access
197 those properties::
199 self._get_sub_prop('destinationTable')
201 This is equivalent to using the ``_helpers._get_sub_prop`` function::
203 _helpers._get_sub_prop(
204 self._properties, ['query', 'destinationTable'])
206 Args:
207 key (str):
208 Key for the value to get in the
209 ``self._properties[self._job_type]`` dictionary.
210 default (Optional[object]):
211 Default value to return if the key is not found.
212 Defaults to :data:`None`.
214 Returns:
215 object: The value if present or the default.
216 """
217 return _helpers._get_sub_prop(
218 self._properties, [self._job_type, key], default=default
219 )
221 def _set_sub_prop(self, key, value):
222 """Set a value in the ``self._properties[self._job_type]`` dictionary.
224 Most job properties are inside the dictionary related to the job type
225 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set
226 those properties::
228 self._set_sub_prop('useLegacySql', False)
230 This is equivalent to using the ``_helper._set_sub_prop`` function::
232 _helper._set_sub_prop(
233 self._properties, ['query', 'useLegacySql'], False)
235 Args:
236 key (str):
237 Key to set in the ``self._properties[self._job_type]``
238 dictionary.
239 value (object): Value to set.
240 """
241 _helpers._set_sub_prop(self._properties, [self._job_type, key], value)
243 def _del_sub_prop(self, key):
244 """Remove ``key`` from the ``self._properties[self._job_type]`` dict.
246 Most job properties are inside the dictionary related to the job type
247 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear
248 those properties::
250 self._del_sub_prop('useLegacySql')
252 This is equivalent to using the ``_helper._del_sub_prop`` function::
254 _helper._del_sub_prop(
255 self._properties, ['query', 'useLegacySql'])
257 Args:
258 key (str):
259 Key to remove in the ``self._properties[self._job_type]``
260 dictionary.
261 """
262 _helpers._del_sub_prop(self._properties, [self._job_type, key])
264 def to_api_repr(self) -> dict:
265 """Build an API representation of the job config.
267 Returns:
268 Dict: A dictionary in the format used by the BigQuery API.
269 """
270 return copy.deepcopy(self._properties)
272 def _fill_from_default(self, default_job_config=None):
273 """Merge this job config with a default job config.
275 The keys in this object take precedence over the keys in the default
276 config. The merge is done at the top-level as well as for keys one
277 level below the job type.
279 Args:
280 default_job_config (google.cloud.bigquery.job._JobConfig):
281 The default job config that will be used to fill in self.
283 Returns:
284 google.cloud.bigquery.job._JobConfig: A new (merged) job config.
285 """
286 if not default_job_config:
287 new_job_config = copy.deepcopy(self)
288 return new_job_config
290 if self._job_type != default_job_config._job_type:
291 raise TypeError(
292 "attempted to merge two incompatible job types: "
293 + repr(self._job_type)
294 + ", "
295 + repr(default_job_config._job_type)
296 )
298 # cls is one of the job config subclasses that provides the job_type argument to
299 # this base class on instantiation, thus missing-parameter warning is a false
300 # positive here.
301 new_job_config = self.__class__() # pytype: disable=missing-parameter
303 default_job_properties = copy.deepcopy(default_job_config._properties)
304 for key in self._properties:
305 if key != self._job_type:
306 default_job_properties[key] = self._properties[key]
308 default_job_properties[self._job_type].update(self._properties[self._job_type])
309 new_job_config._properties = default_job_properties
311 return new_job_config
313 @classmethod
314 def from_api_repr(cls, resource: dict) -> "_JobConfig":
315 """Factory: construct a job configuration given its API representation
317 Args:
318 resource (Dict):
319 A job configuration in the same representation as is returned
320 from the API.
322 Returns:
323 google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``.
324 """
325 # cls is one of the job config subclasses that provides the job_type argument to
326 # this base class on instantiation, thus missing-parameter warning is a false
327 # positive here.
328 job_config = cls() # type: ignore # pytype: disable=missing-parameter
329 job_config._properties = resource
330 return job_config
333class _AsyncJob(google.api_core.future.polling.PollingFuture):
334 """Base class for asynchronous jobs.
336 Args:
337 job_id (Union[str, _JobReference]):
338 Job's ID in the project associated with the client or a
339 fully-qualified job reference.
340 client (google.cloud.bigquery.client.Client):
341 Client which holds credentials and project configuration.
342 """
344 _JOB_TYPE = "unknown"
345 _CONFIG_CLASS: ClassVar
347 def __init__(self, job_id, client):
348 super(_AsyncJob, self).__init__()
350 # The job reference can be either a plain job ID or the full resource.
351 # Populate the properties dictionary consistently depending on what has
352 # been passed in.
353 job_ref = job_id
354 if not isinstance(job_id, _JobReference):
355 job_ref = _JobReference(job_id, client.project, None)
356 self._properties = {"jobReference": job_ref._to_api_repr()}
358 self._client = client
359 self._result_set = False
360 self._completion_lock = threading.Lock()
362 @property
363 def configuration(self) -> _JobConfig:
364 """Job-type specific configurtion."""
365 configuration = self._CONFIG_CLASS()
366 configuration._properties = self._properties.setdefault("configuration", {})
367 return configuration
369 @property
370 def job_id(self):
371 """str: ID of the job."""
372 return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"])
374 @property
375 def parent_job_id(self):
376 """Return the ID of the parent job.
378 See:
379 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id
381 Returns:
382 Optional[str]: parent job id.
383 """
384 return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"])
386 @property
387 def script_statistics(self) -> Optional["ScriptStatistics"]:
388 """Statistics for a child job of a script."""
389 resource = _helpers._get_sub_prop(
390 self._properties, ["statistics", "scriptStatistics"]
391 )
392 if resource is None:
393 return None
394 return ScriptStatistics(resource)
396 @property
397 def session_info(self) -> Optional["SessionInfo"]:
398 """[Preview] Information of the session if this job is part of one.
400 .. versionadded:: 2.29.0
401 """
402 resource = _helpers._get_sub_prop(
403 self._properties, ["statistics", "sessionInfo"]
404 )
405 if resource is None:
406 return None
407 return SessionInfo(resource)
409 @property
410 def num_child_jobs(self):
411 """The number of child jobs executed.
413 See:
414 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs
416 Returns:
417 int
418 """
419 count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"])
420 return int(count) if count is not None else 0
422 @property
423 def project(self):
424 """Project bound to the job.
426 Returns:
427 str: the project (derived from the client).
428 """
429 return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"])
431 @property
432 def location(self):
433 """str: Location where the job runs."""
434 return _helpers._get_sub_prop(self._properties, ["jobReference", "location"])
436 def _require_client(self, client):
437 """Check client or verify over-ride.
439 Args:
440 client (Optional[google.cloud.bigquery.client.Client]):
441 the client to use. If not passed, falls back to the
442 ``client`` stored on the current dataset.
444 Returns:
445 google.cloud.bigquery.client.Client:
446 The client passed in or the currently bound client.
447 """
448 if client is None:
449 client = self._client
450 return client
452 @property
453 def job_type(self):
454 """Type of job.
456 Returns:
457 str: one of 'load', 'copy', 'extract', 'query'.
458 """
459 return self._JOB_TYPE
461 @property
462 def path(self):
463 """URL path for the job's APIs.
465 Returns:
466 str: the path based on project and job ID.
467 """
468 return "/projects/%s/jobs/%s" % (self.project, self.job_id)
470 @property
471 def labels(self):
472 """Dict[str, str]: Labels for the job."""
473 return self._properties.setdefault("configuration", {}).setdefault("labels", {})
475 @property
476 def etag(self):
477 """ETag for the job resource.
479 Returns:
480 Optional[str]: the ETag (None until set from the server).
481 """
482 return self._properties.get("etag")
484 @property
485 def self_link(self):
486 """URL for the job resource.
488 Returns:
489 Optional[str]: the URL (None until set from the server).
490 """
491 return self._properties.get("selfLink")
493 @property
494 def user_email(self):
495 """E-mail address of user who submitted the job.
497 Returns:
498 Optional[str]: the URL (None until set from the server).
499 """
500 return self._properties.get("user_email")
502 @property
503 def created(self):
504 """Datetime at which the job was created.
506 Returns:
507 Optional[datetime.datetime]:
508 the creation time (None until set from the server).
509 """
510 millis = _helpers._get_sub_prop(
511 self._properties, ["statistics", "creationTime"]
512 )
513 if millis is not None:
514 return _helpers._datetime_from_microseconds(millis * 1000.0)
516 @property
517 def started(self):
518 """Datetime at which the job was started.
520 Returns:
521 Optional[datetime.datetime]:
522 the start time (None until set from the server).
523 """
524 millis = _helpers._get_sub_prop(self._properties, ["statistics", "startTime"])
525 if millis is not None:
526 return _helpers._datetime_from_microseconds(millis * 1000.0)
528 @property
529 def ended(self):
530 """Datetime at which the job finished.
532 Returns:
533 Optional[datetime.datetime]:
534 the end time (None until set from the server).
535 """
536 millis = _helpers._get_sub_prop(self._properties, ["statistics", "endTime"])
537 if millis is not None:
538 return _helpers._datetime_from_microseconds(millis * 1000.0)
540 def _job_statistics(self):
541 """Helper for job-type specific statistics-based properties."""
542 statistics = self._properties.get("statistics", {})
543 return statistics.get(self._JOB_TYPE, {})
545 @property
546 def reservation_usage(self):
547 """Job resource usage breakdown by reservation.
549 Returns:
550 List[google.cloud.bigquery.job.ReservationUsage]:
551 Reservation usage stats. Can be empty if not set from the server.
552 """
553 usage_stats_raw = _helpers._get_sub_prop(
554 self._properties, ["statistics", "reservationUsage"], default=()
555 )
556 return [
557 ReservationUsage(name=usage["name"], slot_ms=int(usage["slotMs"]))
558 for usage in usage_stats_raw
559 ]
561 @property
562 def transaction_info(self) -> Optional[TransactionInfo]:
563 """Information of the multi-statement transaction if this job is part of one.
565 Since a scripting query job can execute multiple transactions, this
566 property is only expected on child jobs. Use the
567 :meth:`google.cloud.bigquery.client.Client.list_jobs` method with the
568 ``parent_job`` parameter to iterate over child jobs.
570 .. versionadded:: 2.24.0
571 """
572 info = self._properties.get("statistics", {}).get("transactionInfo")
573 if info is None:
574 return None
575 else:
576 return TransactionInfo.from_api_repr(info)
578 @property
579 def error_result(self):
580 """Error information about the job as a whole.
582 Returns:
583 Optional[Mapping]: the error information (None until set from the server).
584 """
585 status = self._properties.get("status")
586 if status is not None:
587 return status.get("errorResult")
589 @property
590 def errors(self):
591 """Information about individual errors generated by the job.
593 Returns:
594 Optional[List[Mapping]]:
595 the error information (None until set from the server).
596 """
597 status = self._properties.get("status")
598 if status is not None:
599 return status.get("errors")
601 @property
602 def state(self):
603 """Status of the job.
605 Returns:
606 Optional[str]:
607 the state (None until set from the server).
608 """
609 status = self._properties.get("status", {})
610 return status.get("state")
612 def _set_properties(self, api_response):
613 """Update properties from resource in body of ``api_response``
615 Args:
616 api_response (Dict): response returned from an API call.
617 """
618 cleaned = api_response.copy()
619 statistics = cleaned.setdefault("statistics", {})
620 if "creationTime" in statistics:
621 statistics["creationTime"] = float(statistics["creationTime"])
622 if "startTime" in statistics:
623 statistics["startTime"] = float(statistics["startTime"])
624 if "endTime" in statistics:
625 statistics["endTime"] = float(statistics["endTime"])
627 self._properties = cleaned
629 # For Future interface
630 self._set_future_result()
632 @classmethod
633 def _check_resource_config(cls, resource):
634 """Helper for :meth:`from_api_repr`
636 Args:
637 resource (Dict): resource for the job.
639 Raises:
640 KeyError:
641 If the resource has no identifier, or
642 is missing the appropriate configuration.
643 """
644 if "jobReference" not in resource or "jobId" not in resource["jobReference"]:
645 raise KeyError(
646 "Resource lacks required identity information: "
647 '["jobReference"]["jobId"]'
648 )
649 if (
650 "configuration" not in resource
651 or cls._JOB_TYPE not in resource["configuration"]
652 ):
653 raise KeyError(
654 "Resource lacks required configuration: "
655 '["configuration"]["%s"]' % cls._JOB_TYPE
656 )
658 def to_api_repr(self):
659 """Generate a resource for the job."""
660 return copy.deepcopy(self._properties)
662 _build_resource = to_api_repr # backward-compatibility alias
664 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
665 """API call: begin the job via a POST request
667 See
668 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
670 Args:
671 client (Optional[google.cloud.bigquery.client.Client]):
672 The client to use. If not passed, falls back to the ``client``
673 associated with the job object or``NoneType``
674 retry (Optional[google.api_core.retry.Retry]):
675 How to retry the RPC.
676 timeout (Optional[float]):
677 The number of seconds to wait for the underlying HTTP transport
678 before using ``retry``.
680 Raises:
681 ValueError:
682 If the job has already begun.
683 """
684 if self.state is not None:
685 raise ValueError("Job already begun.")
687 client = self._require_client(client)
688 path = "/projects/%s/jobs" % (self.project,)
690 # jobs.insert is idempotent because we ensure that every new
691 # job has an ID.
692 span_attributes = {"path": path}
693 api_response = client._call_api(
694 retry,
695 span_name="BigQuery.job.begin",
696 span_attributes=span_attributes,
697 job_ref=self,
698 method="POST",
699 path=path,
700 data=self.to_api_repr(),
701 timeout=timeout,
702 )
703 self._set_properties(api_response)
705 def exists(
706 self, client=None, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None
707 ) -> bool:
708 """API call: test for the existence of the job via a GET request
710 See
711 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
713 Args:
714 client (Optional[google.cloud.bigquery.client.Client]):
715 the client to use. If not passed, falls back to the
716 ``client`` stored on the current dataset.
718 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
719 timeout (Optional[float]):
720 The number of seconds to wait for the underlying HTTP transport
721 before using ``retry``.
723 Returns:
724 bool: Boolean indicating existence of the job.
725 """
726 client = self._require_client(client)
728 extra_params = {"fields": "id"}
729 if self.location:
730 extra_params["location"] = self.location
732 try:
733 span_attributes = {"path": self.path}
735 client._call_api(
736 retry,
737 span_name="BigQuery.job.exists",
738 span_attributes=span_attributes,
739 job_ref=self,
740 method="GET",
741 path=self.path,
742 query_params=extra_params,
743 timeout=timeout,
744 )
745 except exceptions.NotFound:
746 return False
747 else:
748 return True
750 def reload(
751 self, client=None, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None
752 ):
753 """API call: refresh job properties via a GET request.
755 See
756 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
758 Args:
759 client (Optional[google.cloud.bigquery.client.Client]):
760 the client to use. If not passed, falls back to the
761 ``client`` stored on the current dataset.
763 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
764 timeout (Optional[float]):
765 The number of seconds to wait for the underlying HTTP transport
766 before using ``retry``.
767 """
768 client = self._require_client(client)
770 extra_params = {}
771 if self.location:
772 extra_params["location"] = self.location
773 span_attributes = {"path": self.path}
775 api_response = client._call_api(
776 retry,
777 span_name="BigQuery.job.reload",
778 span_attributes=span_attributes,
779 job_ref=self,
780 method="GET",
781 path=self.path,
782 query_params=extra_params,
783 timeout=timeout,
784 )
785 self._set_properties(api_response)
787 def cancel(
788 self, client=None, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None
789 ) -> bool:
790 """API call: cancel job via a POST request
792 See
793 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
795 Args:
796 client (Optional[google.cloud.bigquery.client.Client]):
797 the client to use. If not passed, falls back to the
798 ``client`` stored on the current dataset.
799 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
800 timeout (Optional[float]):
801 The number of seconds to wait for the underlying HTTP transport
802 before using ``retry``
804 Returns:
805 bool: Boolean indicating that the cancel request was sent.
806 """
807 client = self._require_client(client)
809 extra_params = {}
810 if self.location:
811 extra_params["location"] = self.location
813 path = "{}/cancel".format(self.path)
814 span_attributes = {"path": path}
816 api_response = client._call_api(
817 retry,
818 span_name="BigQuery.job.cancel",
819 span_attributes=span_attributes,
820 job_ref=self,
821 method="POST",
822 path=path,
823 query_params=extra_params,
824 timeout=timeout,
825 )
826 self._set_properties(api_response["job"])
827 # The Future interface requires that we return True if the *attempt*
828 # to cancel was successful.
829 return True
831 # The following methods implement the PollingFuture interface. Note that
832 # the methods above are from the pre-Future interface and are left for
833 # compatibility. The only "overloaded" method is :meth:`cancel`, which
834 # satisfies both interfaces.
836 def _set_future_result(self):
837 """Set the result or exception from the job if it is complete."""
838 # This must be done in a lock to prevent the polling thread
839 # and main thread from both executing the completion logic
840 # at the same time.
841 with self._completion_lock:
842 # If the operation isn't complete or if the result has already been
843 # set, do not call set_result/set_exception again.
844 # Note: self._result_set is set to True in set_result and
845 # set_exception, in case those methods are invoked directly.
846 if not self.done(reload=False) or self._result_set:
847 return
849 if self.error_result is not None:
850 exception = _error_result_to_exception(self.error_result)
851 self.set_exception(exception)
852 else:
853 self.set_result(self)
855 def done(
856 self,
857 retry: "retries.Retry" = DEFAULT_RETRY,
858 timeout: float = None,
859 reload: bool = True,
860 ) -> bool:
861 """Checks if the job is complete.
863 Args:
864 retry (Optional[google.api_core.retry.Retry]):
865 How to retry the RPC. If the job state is ``DONE``, retrying is aborted
866 early, as the job will not change anymore.
867 timeout (Optional[float]):
868 The number of seconds to wait for the underlying HTTP transport
869 before using ``retry``.
870 reload (Optional[bool]):
871 If ``True``, make an API call to refresh the job state of
872 unfinished jobs before checking. Default ``True``.
874 Returns:
875 bool: True if the job is complete, False otherwise.
876 """
877 # Do not refresh is the state is already done, as the job will not
878 # change once complete.
879 if self.state != _DONE_STATE and reload:
880 self.reload(retry=retry, timeout=timeout)
881 return self.state == _DONE_STATE
883 def result( # type: ignore # (signature complaint)
884 self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None
885 ) -> "_AsyncJob":
886 """Start the job and wait for it to complete and get the result.
888 Args:
889 retry (Optional[google.api_core.retry.Retry]):
890 How to retry the RPC. If the job state is ``DONE``, retrying is aborted
891 early, as the job will not change anymore.
892 timeout (Optional[float]):
893 The number of seconds to wait for the underlying HTTP transport
894 before using ``retry``.
895 If multiple requests are made under the hood, ``timeout``
896 applies to each individual request.
898 Returns:
899 _AsyncJob: This instance.
901 Raises:
902 google.cloud.exceptions.GoogleAPICallError:
903 if the job failed.
904 concurrent.futures.TimeoutError:
905 if the job did not complete in the given timeout.
906 """
907 if self.state is None:
908 self._begin(retry=retry, timeout=timeout)
910 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
911 return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
913 def cancelled(self):
914 """Check if the job has been cancelled.
916 This always returns False. It's not possible to check if a job was
917 cancelled in the API. This method is here to satisfy the interface
918 for :class:`google.api_core.future.Future`.
920 Returns:
921 bool: False
922 """
923 return (
924 self.error_result is not None
925 and self.error_result.get("reason") == _STOPPED_REASON
926 )
928 def __repr__(self):
929 result = (
930 f"{self.__class__.__name__}<"
931 f"project={self.project}, location={self.location}, id={self.job_id}"
932 ">"
933 )
934 return result
937class ScriptStackFrame(object):
938 """Stack frame showing the line/column/procedure name where the current
939 evaluation happened.
941 Args:
942 resource (Map[str, Any]): JSON representation of object.
943 """
945 def __init__(self, resource):
946 self._properties = resource
948 @property
949 def procedure_id(self):
950 """Optional[str]: Name of the active procedure.
952 Omitted if in a top-level script.
953 """
954 return self._properties.get("procedureId")
956 @property
957 def text(self):
958 """str: Text of the current statement/expression."""
959 return self._properties.get("text")
961 @property
962 def start_line(self):
963 """int: One-based start line."""
964 return _helpers._int_or_none(self._properties.get("startLine"))
966 @property
967 def start_column(self):
968 """int: One-based start column."""
969 return _helpers._int_or_none(self._properties.get("startColumn"))
971 @property
972 def end_line(self):
973 """int: One-based end line."""
974 return _helpers._int_or_none(self._properties.get("endLine"))
976 @property
977 def end_column(self):
978 """int: One-based end column."""
979 return _helpers._int_or_none(self._properties.get("endColumn"))
982class ScriptStatistics(object):
983 """Statistics for a child job of a script.
985 Args:
986 resource (Map[str, Any]): JSON representation of object.
987 """
989 def __init__(self, resource):
990 self._properties = resource
992 @property
993 def stack_frames(self) -> Sequence[ScriptStackFrame]:
994 """Stack trace where the current evaluation happened.
996 Shows line/column/procedure name of each frame on the stack at the
997 point where the current evaluation happened.
999 The leaf frame is first, the primary script is last.
1000 """
1001 return [
1002 ScriptStackFrame(frame) for frame in self._properties.get("stackFrames", [])
1003 ]
1005 @property
1006 def evaluation_kind(self) -> Optional[str]:
1007 """str: Indicates the type of child job.
1009 Possible values include ``STATEMENT`` and ``EXPRESSION``.
1010 """
1011 return self._properties.get("evaluationKind")
1014class SessionInfo:
1015 """[Preview] Information of the session if this job is part of one.
1017 .. versionadded:: 2.29.0
1019 Args:
1020 resource (Map[str, Any]): JSON representation of object.
1021 """
1023 def __init__(self, resource):
1024 self._properties = resource
1026 @property
1027 def session_id(self) -> Optional[str]:
1028 """The ID of the session."""
1029 return self._properties.get("sessionId")
1032class UnknownJob(_AsyncJob):
1033 """A job whose type cannot be determined."""
1035 @classmethod
1036 def from_api_repr(cls, resource: dict, client) -> "UnknownJob":
1037 """Construct an UnknownJob from the JSON representation.
1039 Args:
1040 resource (Dict): JSON representation of a job.
1041 client (google.cloud.bigquery.client.Client):
1042 Client connected to BigQuery API.
1044 Returns:
1045 UnknownJob: Job corresponding to the resource.
1046 """
1047 job_ref_properties = resource.get(
1048 "jobReference", {"projectId": client.project, "jobId": None}
1049 )
1050 job_ref = _JobReference._from_api_repr(job_ref_properties)
1051 job = cls(job_ref, client)
1052 # Populate the job reference with the project, even if it has been
1053 # redacted, because we know it should equal that of the request.
1054 resource["jobReference"] = job_ref_properties
1055 job._properties = resource
1056 return job