1# Copyright 2015 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Base classes and helpers for job classes."""
16
17from collections import namedtuple
18import copy
19import http
20import threading
21import typing
22from typing import ClassVar, Dict, Optional, Sequence
23
24from google.api_core import retry as retries
25from google.api_core import exceptions
26import google.api_core.future.polling
27
28from google.cloud.bigquery import _helpers
29from google.cloud.bigquery._helpers import _int_or_none
30from google.cloud.bigquery.retry import (
31 DEFAULT_GET_JOB_TIMEOUT,
32 DEFAULT_RETRY,
33)
34
35
36_DONE_STATE = "DONE"
37_STOPPED_REASON = "stopped"
38_ERROR_REASON_TO_EXCEPTION = {
39 "accessDenied": http.client.FORBIDDEN,
40 "backendError": http.client.INTERNAL_SERVER_ERROR,
41 "billingNotEnabled": http.client.FORBIDDEN,
42 "billingTierLimitExceeded": http.client.BAD_REQUEST,
43 "blocked": http.client.FORBIDDEN,
44 "duplicate": http.client.CONFLICT,
45 "internalError": http.client.INTERNAL_SERVER_ERROR,
46 "invalid": http.client.BAD_REQUEST,
47 "invalidQuery": http.client.BAD_REQUEST,
48 "notFound": http.client.NOT_FOUND,
49 "notImplemented": http.client.NOT_IMPLEMENTED,
50 "policyViolation": http.client.FORBIDDEN,
51 "quotaExceeded": http.client.FORBIDDEN,
52 "rateLimitExceeded": http.client.TOO_MANY_REQUESTS,
53 "resourceInUse": http.client.BAD_REQUEST,
54 "resourcesExceeded": http.client.BAD_REQUEST,
55 "responseTooLarge": http.client.FORBIDDEN,
56 "stopped": http.client.OK,
57 "tableUnavailable": http.client.BAD_REQUEST,
58}
59
60
61def _error_result_to_exception(error_result, errors=None):
62 """Maps BigQuery error reasons to an exception.
63
64 The reasons and their matching HTTP status codes are documented on
65 the `troubleshooting errors`_ page.
66
67 .. _troubleshooting errors: https://cloud.google.com/bigquery\
68 /troubleshooting-errors
69
70 Args:
71 error_result (Mapping[str, str]): The error result from BigQuery.
72 errors (Union[Iterable[str], None]): The detailed error messages.
73
74 Returns:
75 google.cloud.exceptions.GoogleAPICallError: The mapped exception.
76 """
77 reason = error_result.get("reason")
78 status_code = _ERROR_REASON_TO_EXCEPTION.get(
79 reason, http.client.INTERNAL_SERVER_ERROR
80 )
81 # Manually create error message to preserve both error_result and errors.
82 # Can be removed once b/310544564 and b/318889899 are resolved.
83 concatenated_errors = ""
84 if errors:
85 concatenated_errors = "; "
86 for err in errors:
87 concatenated_errors += ", ".join(
88 [f"{key}: {value}" for key, value in err.items()]
89 )
90 concatenated_errors += "; "
91
92 # strips off the last unneeded semicolon and space
93 concatenated_errors = concatenated_errors[:-2]
94
95 error_message = error_result.get("message", "") + concatenated_errors
96
97 return exceptions.from_http_status(
98 status_code, error_message, errors=[error_result]
99 )
100
101
102ReservationUsage = namedtuple("ReservationUsage", "name slot_ms")
103ReservationUsage.__doc__ = "Job resource usage for a reservation."
104ReservationUsage.name.__doc__ = (
105 'Reservation name or "unreserved" for on-demand resources usage.'
106)
107ReservationUsage.slot_ms.__doc__ = (
108 "Total slot milliseconds used by the reservation for a particular job."
109)
110
111
112class TransactionInfo(typing.NamedTuple):
113 """[Alpha] Information of a multi-statement transaction.
114
115 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo
116
117 .. versionadded:: 2.24.0
118 """
119
120 transaction_id: str
121 """Output only. ID of the transaction."""
122
123 @classmethod
124 def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo":
125 return cls(transaction_info["transactionId"])
126
127
128class _JobReference(object):
129 """A reference to a job.
130
131 Args:
132 job_id (str): ID of the job to run.
133 project (str): ID of the project where the job runs.
134 location (str): Location of where the job runs.
135 """
136
137 def __init__(self, job_id, project, location):
138 self._properties = {"jobId": job_id, "projectId": project}
139 # The location field must not be populated if it is None.
140 if location:
141 self._properties["location"] = location
142
143 @property
144 def job_id(self):
145 """str: ID of the job."""
146 return self._properties.get("jobId")
147
148 @property
149 def project(self):
150 """str: ID of the project where the job runs."""
151 return self._properties.get("projectId")
152
153 @property
154 def location(self):
155 """str: Location where the job runs."""
156 return self._properties.get("location")
157
158 def _to_api_repr(self):
159 """Returns the API resource representation of the job reference."""
160 return copy.deepcopy(self._properties)
161
162 @classmethod
163 def _from_api_repr(cls, resource):
164 """Returns a job reference for an API resource representation."""
165 job_id = resource.get("jobId")
166 project = resource.get("projectId")
167 location = resource.get("location")
168 job_ref = cls(job_id, project, location)
169 return job_ref
170
171
172class _JobConfig(object):
173 """Abstract base class for job configuration objects.
174
175 Args:
176 job_type (str): The key to use for the job configuration.
177 """
178
179 def __init__(self, job_type, **kwargs):
180 self._job_type = job_type
181 self._properties = {job_type: {}}
182 for prop, val in kwargs.items():
183 setattr(self, prop, val)
184
185 def __setattr__(self, name, value):
186 """Override to be able to raise error if an unknown property is being set"""
187 if not name.startswith("_") and not hasattr(type(self), name):
188 raise AttributeError(
189 "Property {} is unknown for {}.".format(name, type(self))
190 )
191 super(_JobConfig, self).__setattr__(name, value)
192
193 @property
194 def job_timeout_ms(self):
195 """Optional parameter. Job timeout in milliseconds. If this time limit is exceeded, BigQuery might attempt to stop the job.
196 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms
197 e.g.
198
199 job_config = bigquery.QueryJobConfig( job_timeout_ms = 5000 )
200 or
201 job_config.job_timeout_ms = 5000
202
203 Raises:
204 ValueError: If ``value`` type is invalid.
205 """
206
207 # None as this is an optional parameter.
208 if self._properties.get("jobTimeoutMs"):
209 return self._properties["jobTimeoutMs"]
210 return None
211
212 @job_timeout_ms.setter
213 def job_timeout_ms(self, value):
214 try:
215 value = _int_or_none(value)
216 except ValueError as err:
217 raise ValueError("Pass an int for jobTimeoutMs, e.g. 5000").with_traceback(
218 err.__traceback__
219 )
220
221 if value is not None:
222 # docs indicate a string is expected by the API
223 self._properties["jobTimeoutMs"] = str(value)
224 else:
225 self._properties.pop("jobTimeoutMs", None)
226
227 @property
228 def max_slots(self) -> Optional[int]:
229 """The maximum rate of slot consumption to allow for this job.
230
231 If set, the number of slots used to execute the job will be throttled
232 to try and keep its slot consumption below the requested rate.
233 This feature is not generally available.
234 """
235
236 max_slots = self._properties.get("maxSlots")
237 if max_slots is not None:
238 if isinstance(max_slots, str):
239 return int(max_slots)
240 if isinstance(max_slots, int):
241 return max_slots
242 return None
243
244 @max_slots.setter
245 def max_slots(self, value):
246 try:
247 value = _int_or_none(value)
248 except ValueError as err:
249 raise ValueError("Pass an int for max slots, e.g. 100").with_traceback(
250 err.__traceback__
251 )
252
253 if value is not None:
254 self._properties["maxSlots"] = str(value)
255 else:
256 self._properties.pop("maxSlots", None)
257
258 @property
259 def reservation(self):
260 """str: Optional. The reservation that job would use.
261
262 User can specify a reservation to execute the job. If reservation is
263 not set, reservation is determined based on the rules defined by the
264 reservation assignments. The expected format is
265 projects/{project}/locations/{location}/reservations/{reservation}.
266
267 Raises:
268 ValueError: If ``value`` type is not None or of string type.
269 """
270 return self._properties.setdefault("reservation", None)
271
272 @reservation.setter
273 def reservation(self, value):
274 if value and not isinstance(value, str):
275 raise ValueError("Reservation must be None or a string.")
276 self._properties["reservation"] = value
277
278 @property
279 def labels(self):
280 """Dict[str, str]: Labels for the job.
281
282 This method always returns a dict. Once a job has been created on the
283 server, its labels cannot be modified anymore.
284
285 Raises:
286 ValueError: If ``value`` type is invalid.
287 """
288 return self._properties.setdefault("labels", {})
289
290 @labels.setter
291 def labels(self, value):
292 if not isinstance(value, dict):
293 raise ValueError("Pass a dict")
294 self._properties["labels"] = value
295
296 def _get_sub_prop(self, key, default=None):
297 """Get a value in the ``self._properties[self._job_type]`` dictionary.
298
299 Most job properties are inside the dictionary related to the job type
300 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access
301 those properties::
302
303 self._get_sub_prop('destinationTable')
304
305 This is equivalent to using the ``_helpers._get_sub_prop`` function::
306
307 _helpers._get_sub_prop(
308 self._properties, ['query', 'destinationTable'])
309
310 Args:
311 key (str):
312 Key for the value to get in the
313 ``self._properties[self._job_type]`` dictionary.
314 default (Optional[object]):
315 Default value to return if the key is not found.
316 Defaults to :data:`None`.
317
318 Returns:
319 object: The value if present or the default.
320 """
321 return _helpers._get_sub_prop(
322 self._properties, [self._job_type, key], default=default
323 )
324
325 def _set_sub_prop(self, key, value):
326 """Set a value in the ``self._properties[self._job_type]`` dictionary.
327
328 Most job properties are inside the dictionary related to the job type
329 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set
330 those properties::
331
332 self._set_sub_prop('useLegacySql', False)
333
334 This is equivalent to using the ``_helper._set_sub_prop`` function::
335
336 _helper._set_sub_prop(
337 self._properties, ['query', 'useLegacySql'], False)
338
339 Args:
340 key (str):
341 Key to set in the ``self._properties[self._job_type]``
342 dictionary.
343 value (object): Value to set.
344 """
345 _helpers._set_sub_prop(self._properties, [self._job_type, key], value)
346
347 def _del_sub_prop(self, key):
348 """Remove ``key`` from the ``self._properties[self._job_type]`` dict.
349
350 Most job properties are inside the dictionary related to the job type
351 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear
352 those properties::
353
354 self._del_sub_prop('useLegacySql')
355
356 This is equivalent to using the ``_helper._del_sub_prop`` function::
357
358 _helper._del_sub_prop(
359 self._properties, ['query', 'useLegacySql'])
360
361 Args:
362 key (str):
363 Key to remove in the ``self._properties[self._job_type]``
364 dictionary.
365 """
366 _helpers._del_sub_prop(self._properties, [self._job_type, key])
367
368 def to_api_repr(self) -> dict:
369 """Build an API representation of the job config.
370
371 Returns:
372 Dict: A dictionary in the format used by the BigQuery API.
373 """
374 return copy.deepcopy(self._properties)
375
376 def _fill_from_default(self, default_job_config=None):
377 """Merge this job config with a default job config.
378
379 The keys in this object take precedence over the keys in the default
380 config. The merge is done at the top-level as well as for keys one
381 level below the job type.
382
383 Args:
384 default_job_config (google.cloud.bigquery.job._JobConfig):
385 The default job config that will be used to fill in self.
386
387 Returns:
388 google.cloud.bigquery.job._JobConfig: A new (merged) job config.
389 """
390 if not default_job_config:
391 new_job_config = copy.deepcopy(self)
392 return new_job_config
393
394 if self._job_type != default_job_config._job_type:
395 raise TypeError(
396 "attempted to merge two incompatible job types: "
397 + repr(self._job_type)
398 + ", "
399 + repr(default_job_config._job_type)
400 )
401
402 # cls is one of the job config subclasses that provides the job_type argument to
403 # this base class on instantiation, thus missing-parameter warning is a false
404 # positive here.
405 new_job_config = self.__class__() # pytype: disable=missing-parameter
406
407 default_job_properties = copy.deepcopy(default_job_config._properties)
408 for key in self._properties:
409 if key != self._job_type:
410 default_job_properties[key] = self._properties[key]
411
412 default_job_properties[self._job_type].update(self._properties[self._job_type])
413 new_job_config._properties = default_job_properties
414
415 return new_job_config
416
417 @classmethod
418 def from_api_repr(cls, resource: dict) -> "_JobConfig":
419 """Factory: construct a job configuration given its API representation
420
421 Args:
422 resource (Dict):
423 A job configuration in the same representation as is returned
424 from the API.
425
426 Returns:
427 google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``.
428 """
429 # cls is one of the job config subclasses that provides the job_type argument to
430 # this base class on instantiation, thus missing-parameter warning is a false
431 # positive here.
432 job_config = cls() # type: ignore # pytype: disable=missing-parameter
433 job_config._properties = resource
434 return job_config
435
436
437class _AsyncJob(google.api_core.future.polling.PollingFuture):
438 """Base class for asynchronous jobs.
439
440 Args:
441 job_id (Union[str, _JobReference]):
442 Job's ID in the project associated with the client or a
443 fully-qualified job reference.
444 client (google.cloud.bigquery.client.Client):
445 Client which holds credentials and project configuration.
446 """
447
448 _JOB_TYPE = "unknown"
449 _CONFIG_CLASS: ClassVar
450
451 def __init__(self, job_id, client):
452 super(_AsyncJob, self).__init__()
453
454 # The job reference can be either a plain job ID or the full resource.
455 # Populate the properties dictionary consistently depending on what has
456 # been passed in.
457 job_ref = job_id
458 if not isinstance(job_id, _JobReference):
459 job_ref = _JobReference(job_id, client.project, None)
460 self._properties = {"jobReference": job_ref._to_api_repr()}
461
462 self._client = client
463 self._result_set = False
464 self._completion_lock = threading.Lock()
465
466 @property
467 def configuration(self) -> _JobConfig:
468 """Job-type specific configurtion."""
469 configuration: _JobConfig = self._CONFIG_CLASS() # pytype: disable=not-callable
470 configuration._properties = self._properties.setdefault("configuration", {})
471 return configuration
472
473 @property
474 def job_id(self):
475 """str: ID of the job."""
476 return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"])
477
478 @property
479 def parent_job_id(self):
480 """Return the ID of the parent job.
481
482 See:
483 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id
484
485 Returns:
486 Optional[str]: parent job id.
487 """
488 return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"])
489
490 @property
491 def script_statistics(self) -> Optional["ScriptStatistics"]:
492 """Statistics for a child job of a script."""
493 resource = _helpers._get_sub_prop(
494 self._properties, ["statistics", "scriptStatistics"]
495 )
496 if resource is None:
497 return None
498 return ScriptStatistics(resource)
499
500 @property
501 def session_info(self) -> Optional["SessionInfo"]:
502 """[Preview] Information of the session if this job is part of one.
503
504 .. versionadded:: 2.29.0
505 """
506 resource = _helpers._get_sub_prop(
507 self._properties, ["statistics", "sessionInfo"]
508 )
509 if resource is None:
510 return None
511 return SessionInfo(resource)
512
513 @property
514 def num_child_jobs(self):
515 """The number of child jobs executed.
516
517 See:
518 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs
519
520 Returns:
521 int
522 """
523 count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"])
524 return int(count) if count is not None else 0
525
526 @property
527 def project(self):
528 """Project bound to the job.
529
530 Returns:
531 str: the project (derived from the client).
532 """
533 return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"])
534
535 @property
536 def location(self):
537 """str: Location where the job runs."""
538 return _helpers._get_sub_prop(self._properties, ["jobReference", "location"])
539
540 @property
541 def reservation_id(self):
542 """str: Name of the primary reservation assigned to this job.
543
544 Note that this could be different than reservations reported in
545 the reservation field if parent reservations were used to execute
546 this job.
547 """
548 return _helpers._get_sub_prop(
549 self._properties, ["statistics", "reservation_id"]
550 )
551
552 def _require_client(self, client):
553 """Check client or verify over-ride.
554
555 Args:
556 client (Optional[google.cloud.bigquery.client.Client]):
557 the client to use. If not passed, falls back to the
558 ``client`` stored on the current dataset.
559
560 Returns:
561 google.cloud.bigquery.client.Client:
562 The client passed in or the currently bound client.
563 """
564 if client is None:
565 client = self._client
566 return client
567
568 @property
569 def job_type(self):
570 """Type of job.
571
572 Returns:
573 str: one of 'load', 'copy', 'extract', 'query'.
574 """
575 return self._JOB_TYPE
576
577 @property
578 def path(self):
579 """URL path for the job's APIs.
580
581 Returns:
582 str: the path based on project and job ID.
583 """
584 return "/projects/%s/jobs/%s" % (self.project, self.job_id)
585
586 @property
587 def labels(self):
588 """Dict[str, str]: Labels for the job."""
589 return self._properties.setdefault("configuration", {}).setdefault("labels", {})
590
591 @property
592 def etag(self):
593 """ETag for the job resource.
594
595 Returns:
596 Optional[str]: the ETag (None until set from the server).
597 """
598 return self._properties.get("etag")
599
600 @property
601 def self_link(self):
602 """URL for the job resource.
603
604 Returns:
605 Optional[str]: the URL (None until set from the server).
606 """
607 return self._properties.get("selfLink")
608
609 @property
610 def user_email(self):
611 """E-mail address of user who submitted the job.
612
613 Returns:
614 Optional[str]: the URL (None until set from the server).
615 """
616 return self._properties.get("user_email")
617
618 @property
619 def created(self):
620 """Datetime at which the job was created.
621
622 Returns:
623 Optional[datetime.datetime]:
624 the creation time (None until set from the server).
625 """
626 millis = _helpers._get_sub_prop(
627 self._properties, ["statistics", "creationTime"]
628 )
629 if millis is not None:
630 return _helpers._datetime_from_microseconds(millis * 1000.0)
631
632 @property
633 def started(self):
634 """Datetime at which the job was started.
635
636 Returns:
637 Optional[datetime.datetime]:
638 the start time (None until set from the server).
639 """
640 millis = _helpers._get_sub_prop(self._properties, ["statistics", "startTime"])
641 if millis is not None:
642 return _helpers._datetime_from_microseconds(millis * 1000.0)
643
644 @property
645 def ended(self):
646 """Datetime at which the job finished.
647
648 Returns:
649 Optional[datetime.datetime]:
650 the end time (None until set from the server).
651 """
652 millis = _helpers._get_sub_prop(self._properties, ["statistics", "endTime"])
653 if millis is not None:
654 return _helpers._datetime_from_microseconds(millis * 1000.0)
655
656 def _job_statistics(self):
657 """Helper for job-type specific statistics-based properties."""
658 statistics = self._properties.get("statistics", {})
659 return statistics.get(self._JOB_TYPE, {})
660
661 @property
662 def reservation_usage(self):
663 """Job resource usage breakdown by reservation.
664
665 Returns:
666 List[google.cloud.bigquery.job.ReservationUsage]:
667 Reservation usage stats. Can be empty if not set from the server.
668 """
669 usage_stats_raw = _helpers._get_sub_prop(
670 self._properties, ["statistics", "reservationUsage"], default=()
671 )
672 return [
673 ReservationUsage(name=usage["name"], slot_ms=int(usage["slotMs"]))
674 for usage in usage_stats_raw
675 ]
676
677 @property
678 def transaction_info(self) -> Optional[TransactionInfo]:
679 """Information of the multi-statement transaction if this job is part of one.
680
681 Since a scripting query job can execute multiple transactions, this
682 property is only expected on child jobs. Use the
683 :meth:`google.cloud.bigquery.client.Client.list_jobs` method with the
684 ``parent_job`` parameter to iterate over child jobs.
685
686 .. versionadded:: 2.24.0
687 """
688 info = self._properties.get("statistics", {}).get("transactionInfo")
689 if info is None:
690 return None
691 else:
692 return TransactionInfo.from_api_repr(info)
693
694 @property
695 def error_result(self):
696 """Error information about the job as a whole.
697
698 Returns:
699 Optional[Mapping]: the error information (None until set from the server).
700 """
701 status = self._properties.get("status")
702 if status is not None:
703 return status.get("errorResult")
704
705 @property
706 def errors(self):
707 """Information about individual errors generated by the job.
708
709 Returns:
710 Optional[List[Mapping]]:
711 the error information (None until set from the server).
712 """
713 status = self._properties.get("status")
714 if status is not None:
715 return status.get("errors")
716
717 @property
718 def state(self):
719 """Status of the job.
720
721 Returns:
722 Optional[str]:
723 the state (None until set from the server).
724 """
725 status = self._properties.get("status", {})
726 return status.get("state")
727
728 def _set_properties(self, api_response):
729 """Update properties from resource in body of ``api_response``
730
731 Args:
732 api_response (Dict): response returned from an API call.
733 """
734 cleaned = api_response.copy()
735 statistics = cleaned.setdefault("statistics", {})
736 if "creationTime" in statistics:
737 statistics["creationTime"] = float(statistics["creationTime"])
738 if "startTime" in statistics:
739 statistics["startTime"] = float(statistics["startTime"])
740 if "endTime" in statistics:
741 statistics["endTime"] = float(statistics["endTime"])
742
743 self._properties = cleaned
744
745 # For Future interface
746 self._set_future_result()
747
748 @classmethod
749 def _check_resource_config(cls, resource):
750 """Helper for :meth:`from_api_repr`
751
752 Args:
753 resource (Dict): resource for the job.
754
755 Raises:
756 KeyError:
757 If the resource has no identifier, or
758 is missing the appropriate configuration.
759 """
760 if "jobReference" not in resource or "jobId" not in resource["jobReference"]:
761 raise KeyError(
762 "Resource lacks required identity information: "
763 '["jobReference"]["jobId"]'
764 )
765 if (
766 "configuration" not in resource
767 or cls._JOB_TYPE not in resource["configuration"]
768 ):
769 raise KeyError(
770 "Resource lacks required configuration: "
771 '["configuration"]["%s"]' % cls._JOB_TYPE
772 )
773
774 def to_api_repr(self):
775 """Generate a resource for the job."""
776 return copy.deepcopy(self._properties)
777
778 _build_resource = to_api_repr # backward-compatibility alias
779
780 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
781 """API call: begin the job via a POST request
782
783 See
784 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
785
786 Args:
787 client (Optional[google.cloud.bigquery.client.Client]):
788 The client to use. If not passed, falls back to the ``client``
789 associated with the job object or``NoneType``
790 retry (Optional[google.api_core.retry.Retry]):
791 How to retry the RPC.
792 timeout (Optional[float]):
793 The number of seconds to wait for the underlying HTTP transport
794 before using ``retry``.
795
796 Raises:
797 ValueError:
798 If the job has already begun.
799 """
800 if self.state is not None:
801 raise ValueError("Job already begun.")
802
803 client = self._require_client(client)
804 path = "/projects/%s/jobs" % (self.project,)
805
806 # jobs.insert is idempotent because we ensure that every new
807 # job has an ID.
808 span_attributes = {"path": path}
809 api_response = client._call_api(
810 retry,
811 span_name="BigQuery.job.begin",
812 span_attributes=span_attributes,
813 job_ref=self,
814 method="POST",
815 path=path,
816 data=self.to_api_repr(),
817 timeout=timeout,
818 )
819 self._set_properties(api_response)
820
821 def exists(
822 self,
823 client=None,
824 retry: "retries.Retry" = DEFAULT_RETRY,
825 timeout: Optional[float] = None,
826 ) -> bool:
827 """API call: test for the existence of the job via a GET request
828
829 See
830 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
831
832 Args:
833 client (Optional[google.cloud.bigquery.client.Client]):
834 the client to use. If not passed, falls back to the
835 ``client`` stored on the current dataset.
836
837 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
838 timeout (Optional[float]):
839 The number of seconds to wait for the underlying HTTP transport
840 before using ``retry``.
841
842 Returns:
843 bool: Boolean indicating existence of the job.
844 """
845 client = self._require_client(client)
846
847 extra_params = {"fields": "id"}
848 if self.location:
849 extra_params["location"] = self.location
850
851 try:
852 span_attributes = {"path": self.path}
853
854 client._call_api(
855 retry,
856 span_name="BigQuery.job.exists",
857 span_attributes=span_attributes,
858 job_ref=self,
859 method="GET",
860 path=self.path,
861 query_params=extra_params,
862 timeout=timeout,
863 )
864 except exceptions.NotFound:
865 return False
866 else:
867 return True
868
869 def reload(
870 self,
871 client=None,
872 retry: "retries.Retry" = DEFAULT_RETRY,
873 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
874 ):
875 """API call: refresh job properties via a GET request.
876
877 See
878 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
879
880 Args:
881 client (Optional[google.cloud.bigquery.client.Client]):
882 the client to use. If not passed, falls back to the
883 ``client`` stored on the current dataset.
884
885 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
886 timeout (Optional[float]):
887 The number of seconds to wait for the underlying HTTP transport
888 before using ``retry``.
889 """
890 client = self._require_client(client)
891
892 got_job = client.get_job(
893 self,
894 project=self.project,
895 location=self.location,
896 retry=retry,
897 timeout=timeout,
898 )
899 self._set_properties(got_job._properties)
900
901 def cancel(
902 self,
903 client=None,
904 retry: Optional[retries.Retry] = DEFAULT_RETRY,
905 timeout: Optional[float] = None,
906 ) -> bool:
907 """API call: cancel job via a POST request
908
909 See
910 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
911
912 Args:
913 client (Optional[google.cloud.bigquery.client.Client]):
914 the client to use. If not passed, falls back to the
915 ``client`` stored on the current dataset.
916 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
917 timeout (Optional[float]):
918 The number of seconds to wait for the underlying HTTP transport
919 before using ``retry``
920
921 Returns:
922 bool: Boolean indicating that the cancel request was sent.
923 """
924 client = self._require_client(client)
925
926 extra_params = {}
927 if self.location:
928 extra_params["location"] = self.location
929
930 path = "{}/cancel".format(self.path)
931 span_attributes = {"path": path}
932
933 api_response = client._call_api(
934 retry,
935 span_name="BigQuery.job.cancel",
936 span_attributes=span_attributes,
937 job_ref=self,
938 method="POST",
939 path=path,
940 query_params=extra_params,
941 timeout=timeout,
942 )
943 self._set_properties(api_response["job"])
944 # The Future interface requires that we return True if the *attempt*
945 # to cancel was successful.
946 return True
947
948 # The following methods implement the PollingFuture interface. Note that
949 # the methods above are from the pre-Future interface and are left for
950 # compatibility. The only "overloaded" method is :meth:`cancel`, which
951 # satisfies both interfaces.
952
953 def _set_future_result(self):
954 """Set the result or exception from the job if it is complete."""
955 # This must be done in a lock to prevent the polling thread
956 # and main thread from both executing the completion logic
957 # at the same time.
958 with self._completion_lock:
959 # If the operation isn't complete or if the result has already been
960 # set, do not call set_result/set_exception again.
961 # Note: self._result_set is set to True in set_result and
962 # set_exception, in case those methods are invoked directly.
963 if not self.done(reload=False) or self._result_set:
964 return
965
966 if self.error_result is not None:
967 exception = _error_result_to_exception(
968 self.error_result, self.errors or ()
969 )
970 self.set_exception(exception)
971 else:
972 self.set_result(self)
973
974 def done(
975 self,
976 retry: "retries.Retry" = DEFAULT_RETRY,
977 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
978 reload: bool = True,
979 ) -> bool:
980 """Checks if the job is complete.
981
982 Args:
983 retry (Optional[google.api_core.retry.Retry]):
984 How to retry the RPC. If the job state is ``DONE``, retrying is aborted
985 early, as the job will not change anymore.
986 timeout (Optional[float]):
987 The number of seconds to wait for the underlying HTTP transport
988 before using ``retry``.
989 reload (Optional[bool]):
990 If ``True``, make an API call to refresh the job state of
991 unfinished jobs before checking. Default ``True``.
992
993 Returns:
994 bool: True if the job is complete, False otherwise.
995 """
996 # Do not refresh is the state is already done, as the job will not
997 # change once complete.
998 if self.state != _DONE_STATE and reload:
999 self.reload(retry=retry, timeout=timeout)
1000 return self.state == _DONE_STATE
1001
1002 def result( # type: ignore # (incompatible with supertype)
1003 self,
1004 retry: Optional[retries.Retry] = DEFAULT_RETRY,
1005 timeout: Optional[float] = None,
1006 ) -> "_AsyncJob":
1007 """Start the job and wait for it to complete and get the result.
1008
1009 Args:
1010 retry (Optional[google.api_core.retry.Retry]):
1011 How to retry the RPC. If the job state is ``DONE``, retrying is aborted
1012 early, as the job will not change anymore.
1013 timeout (Optional[float]):
1014 The number of seconds to wait for the underlying HTTP transport
1015 before using ``retry``.
1016 If multiple requests are made under the hood, ``timeout``
1017 applies to each individual request.
1018
1019 Returns:
1020 _AsyncJob: This instance.
1021
1022 Raises:
1023 google.cloud.exceptions.GoogleAPICallError:
1024 if the job failed.
1025 concurrent.futures.TimeoutError:
1026 if the job did not complete in the given timeout.
1027 """
1028 if self.state is None:
1029 self._begin(retry=retry, timeout=timeout)
1030
1031 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
1032 return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
1033
1034 def cancelled(self):
1035 """Check if the job has been cancelled.
1036
1037 This always returns False. It's not possible to check if a job was
1038 cancelled in the API. This method is here to satisfy the interface
1039 for :class:`google.api_core.future.Future`.
1040
1041 Returns:
1042 bool: False
1043 """
1044 return (
1045 self.error_result is not None
1046 and self.error_result.get("reason") == _STOPPED_REASON
1047 )
1048
1049 def __repr__(self):
1050 result = (
1051 f"{self.__class__.__name__}<"
1052 f"project={self.project}, location={self.location}, id={self.job_id}"
1053 ">"
1054 )
1055 return result
1056
1057
1058class ScriptStackFrame(object):
1059 """Stack frame showing the line/column/procedure name where the current
1060 evaluation happened.
1061
1062 Args:
1063 resource (Map[str, Any]): JSON representation of object.
1064 """
1065
1066 def __init__(self, resource):
1067 self._properties = resource
1068
1069 @property
1070 def procedure_id(self):
1071 """Optional[str]: Name of the active procedure.
1072
1073 Omitted if in a top-level script.
1074 """
1075 return self._properties.get("procedureId")
1076
1077 @property
1078 def text(self):
1079 """str: Text of the current statement/expression."""
1080 return self._properties.get("text")
1081
1082 @property
1083 def start_line(self):
1084 """int: One-based start line."""
1085 return _helpers._int_or_none(self._properties.get("startLine"))
1086
1087 @property
1088 def start_column(self):
1089 """int: One-based start column."""
1090 return _helpers._int_or_none(self._properties.get("startColumn"))
1091
1092 @property
1093 def end_line(self):
1094 """int: One-based end line."""
1095 return _helpers._int_or_none(self._properties.get("endLine"))
1096
1097 @property
1098 def end_column(self):
1099 """int: One-based end column."""
1100 return _helpers._int_or_none(self._properties.get("endColumn"))
1101
1102
1103class ScriptStatistics(object):
1104 """Statistics for a child job of a script.
1105
1106 Args:
1107 resource (Map[str, Any]): JSON representation of object.
1108 """
1109
1110 def __init__(self, resource):
1111 self._properties = resource
1112
1113 @property
1114 def stack_frames(self) -> Sequence[ScriptStackFrame]:
1115 """Stack trace where the current evaluation happened.
1116
1117 Shows line/column/procedure name of each frame on the stack at the
1118 point where the current evaluation happened.
1119
1120 The leaf frame is first, the primary script is last.
1121 """
1122 return [
1123 ScriptStackFrame(frame) for frame in self._properties.get("stackFrames", [])
1124 ]
1125
1126 @property
1127 def evaluation_kind(self) -> Optional[str]:
1128 """str: Indicates the type of child job.
1129
1130 Possible values include ``STATEMENT`` and ``EXPRESSION``.
1131 """
1132 return self._properties.get("evaluationKind")
1133
1134
1135class SessionInfo:
1136 """[Preview] Information of the session if this job is part of one.
1137
1138 .. versionadded:: 2.29.0
1139
1140 Args:
1141 resource (Map[str, Any]): JSON representation of object.
1142 """
1143
1144 def __init__(self, resource):
1145 self._properties = resource
1146
1147 @property
1148 def session_id(self) -> Optional[str]:
1149 """The ID of the session."""
1150 return self._properties.get("sessionId")
1151
1152
1153class UnknownJob(_AsyncJob):
1154 """A job whose type cannot be determined."""
1155
1156 @classmethod
1157 def from_api_repr(cls, resource: dict, client) -> "UnknownJob":
1158 """Construct an UnknownJob from the JSON representation.
1159
1160 Args:
1161 resource (Dict): JSON representation of a job.
1162 client (google.cloud.bigquery.client.Client):
1163 Client connected to BigQuery API.
1164
1165 Returns:
1166 UnknownJob: Job corresponding to the resource.
1167 """
1168 job_ref_properties = resource.get(
1169 "jobReference", {"projectId": client.project, "jobId": None}
1170 )
1171 job_ref = _JobReference._from_api_repr(job_ref_properties)
1172 job = cls(job_ref, client)
1173 # Populate the job reference with the project, even if it has been
1174 # redacted, because we know it should equal that of the request.
1175 resource["jobReference"] = job_ref_properties
1176 job._properties = resource
1177 return job