Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py: 44%

328 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1# Copyright 2015 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Base classes and helpers for job classes.""" 

16 

17from collections import namedtuple 

18import copy 

19import http 

20import threading 

21import typing 

22from typing import ClassVar, Dict, Optional, Sequence 

23 

24from google.api_core import exceptions 

25import google.api_core.future.polling 

26 

27from google.cloud.bigquery import _helpers 

28from google.cloud.bigquery.retry import DEFAULT_RETRY 

29 

30if typing.TYPE_CHECKING: # pragma: NO COVER 

31 from google.api_core import retry as retries 

32 

33 

34_DONE_STATE = "DONE" 

35_STOPPED_REASON = "stopped" 

36_ERROR_REASON_TO_EXCEPTION = { 

37 "accessDenied": http.client.FORBIDDEN, 

38 "backendError": http.client.INTERNAL_SERVER_ERROR, 

39 "billingNotEnabled": http.client.FORBIDDEN, 

40 "billingTierLimitExceeded": http.client.BAD_REQUEST, 

41 "blocked": http.client.FORBIDDEN, 

42 "duplicate": http.client.CONFLICT, 

43 "internalError": http.client.INTERNAL_SERVER_ERROR, 

44 "invalid": http.client.BAD_REQUEST, 

45 "invalidQuery": http.client.BAD_REQUEST, 

46 "notFound": http.client.NOT_FOUND, 

47 "notImplemented": http.client.NOT_IMPLEMENTED, 

48 "policyViolation": http.client.FORBIDDEN, 

49 "quotaExceeded": http.client.FORBIDDEN, 

50 "rateLimitExceeded": http.client.FORBIDDEN, 

51 "resourceInUse": http.client.BAD_REQUEST, 

52 "resourcesExceeded": http.client.BAD_REQUEST, 

53 "responseTooLarge": http.client.FORBIDDEN, 

54 "stopped": http.client.OK, 

55 "tableUnavailable": http.client.BAD_REQUEST, 

56} 

57 

58 

59def _error_result_to_exception(error_result): 

60 """Maps BigQuery error reasons to an exception. 

61 

62 The reasons and their matching HTTP status codes are documented on 

63 the `troubleshooting errors`_ page. 

64 

65 .. _troubleshooting errors: https://cloud.google.com/bigquery\ 

66 /troubleshooting-errors 

67 

68 Args: 

69 error_result (Mapping[str, str]): The error result from BigQuery. 

70 

71 Returns: 

72 google.cloud.exceptions.GoogleAPICallError: The mapped exception. 

73 """ 

74 reason = error_result.get("reason") 

75 status_code = _ERROR_REASON_TO_EXCEPTION.get( 

76 reason, http.client.INTERNAL_SERVER_ERROR 

77 ) 

78 return exceptions.from_http_status( 

79 status_code, error_result.get("message", ""), errors=[error_result] 

80 ) 

81 

82 

83ReservationUsage = namedtuple("ReservationUsage", "name slot_ms") 

84ReservationUsage.__doc__ = "Job resource usage for a reservation." 

85ReservationUsage.name.__doc__ = ( 

86 'Reservation name or "unreserved" for on-demand resources usage.' 

87) 

88ReservationUsage.slot_ms.__doc__ = ( 

89 "Total slot milliseconds used by the reservation for a particular job." 

90) 

91 

92 

93class TransactionInfo(typing.NamedTuple): 

94 """[Alpha] Information of a multi-statement transaction. 

95 

96 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo 

97 

98 .. versionadded:: 2.24.0 

99 """ 

100 

101 transaction_id: str 

102 """Output only. ID of the transaction.""" 

103 

104 @classmethod 

105 def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo": 

106 return cls(transaction_info["transactionId"]) 

107 

108 

109class _JobReference(object): 

110 """A reference to a job. 

111 

112 Args: 

113 job_id (str): ID of the job to run. 

114 project (str): ID of the project where the job runs. 

115 location (str): Location of where the job runs. 

116 """ 

117 

118 def __init__(self, job_id, project, location): 

119 self._properties = {"jobId": job_id, "projectId": project} 

120 # The location field must not be populated if it is None. 

121 if location: 

122 self._properties["location"] = location 

123 

124 @property 

125 def job_id(self): 

126 """str: ID of the job.""" 

127 return self._properties.get("jobId") 

128 

129 @property 

130 def project(self): 

131 """str: ID of the project where the job runs.""" 

132 return self._properties.get("projectId") 

133 

134 @property 

135 def location(self): 

136 """str: Location where the job runs.""" 

137 return self._properties.get("location") 

138 

139 def _to_api_repr(self): 

140 """Returns the API resource representation of the job reference.""" 

141 return copy.deepcopy(self._properties) 

142 

143 @classmethod 

144 def _from_api_repr(cls, resource): 

145 """Returns a job reference for an API resource representation.""" 

146 job_id = resource.get("jobId") 

147 project = resource.get("projectId") 

148 location = resource.get("location") 

149 job_ref = cls(job_id, project, location) 

150 return job_ref 

151 

152 

153class _JobConfig(object): 

154 """Abstract base class for job configuration objects. 

155 

156 Args: 

157 job_type (str): The key to use for the job configuration. 

158 """ 

159 

160 def __init__(self, job_type, **kwargs): 

161 self._job_type = job_type 

162 self._properties = {job_type: {}} 

163 for prop, val in kwargs.items(): 

164 setattr(self, prop, val) 

165 

166 def __setattr__(self, name, value): 

167 """Override to be able to raise error if an unknown property is being set""" 

168 if not name.startswith("_") and not hasattr(type(self), name): 

169 raise AttributeError( 

170 "Property {} is unknown for {}.".format(name, type(self)) 

171 ) 

172 super(_JobConfig, self).__setattr__(name, value) 

173 

174 @property 

175 def labels(self): 

176 """Dict[str, str]: Labels for the job. 

177 

178 This method always returns a dict. Once a job has been created on the 

179 server, its labels cannot be modified anymore. 

180 

181 Raises: 

182 ValueError: If ``value`` type is invalid. 

183 """ 

184 return self._properties.setdefault("labels", {}) 

185 

186 @labels.setter 

187 def labels(self, value): 

188 if not isinstance(value, dict): 

189 raise ValueError("Pass a dict") 

190 self._properties["labels"] = value 

191 

192 def _get_sub_prop(self, key, default=None): 

193 """Get a value in the ``self._properties[self._job_type]`` dictionary. 

194 

195 Most job properties are inside the dictionary related to the job type 

196 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access 

197 those properties:: 

198 

199 self._get_sub_prop('destinationTable') 

200 

201 This is equivalent to using the ``_helpers._get_sub_prop`` function:: 

202 

203 _helpers._get_sub_prop( 

204 self._properties, ['query', 'destinationTable']) 

205 

206 Args: 

207 key (str): 

208 Key for the value to get in the 

209 ``self._properties[self._job_type]`` dictionary. 

210 default (Optional[object]): 

211 Default value to return if the key is not found. 

212 Defaults to :data:`None`. 

213 

214 Returns: 

215 object: The value if present or the default. 

216 """ 

217 return _helpers._get_sub_prop( 

218 self._properties, [self._job_type, key], default=default 

219 ) 

220 

221 def _set_sub_prop(self, key, value): 

222 """Set a value in the ``self._properties[self._job_type]`` dictionary. 

223 

224 Most job properties are inside the dictionary related to the job type 

225 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set 

226 those properties:: 

227 

228 self._set_sub_prop('useLegacySql', False) 

229 

230 This is equivalent to using the ``_helper._set_sub_prop`` function:: 

231 

232 _helper._set_sub_prop( 

233 self._properties, ['query', 'useLegacySql'], False) 

234 

235 Args: 

236 key (str): 

237 Key to set in the ``self._properties[self._job_type]`` 

238 dictionary. 

239 value (object): Value to set. 

240 """ 

241 _helpers._set_sub_prop(self._properties, [self._job_type, key], value) 

242 

243 def _del_sub_prop(self, key): 

244 """Remove ``key`` from the ``self._properties[self._job_type]`` dict. 

245 

246 Most job properties are inside the dictionary related to the job type 

247 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear 

248 those properties:: 

249 

250 self._del_sub_prop('useLegacySql') 

251 

252 This is equivalent to using the ``_helper._del_sub_prop`` function:: 

253 

254 _helper._del_sub_prop( 

255 self._properties, ['query', 'useLegacySql']) 

256 

257 Args: 

258 key (str): 

259 Key to remove in the ``self._properties[self._job_type]`` 

260 dictionary. 

261 """ 

262 _helpers._del_sub_prop(self._properties, [self._job_type, key]) 

263 

264 def to_api_repr(self) -> dict: 

265 """Build an API representation of the job config. 

266 

267 Returns: 

268 Dict: A dictionary in the format used by the BigQuery API. 

269 """ 

270 return copy.deepcopy(self._properties) 

271 

272 def _fill_from_default(self, default_job_config=None): 

273 """Merge this job config with a default job config. 

274 

275 The keys in this object take precedence over the keys in the default 

276 config. The merge is done at the top-level as well as for keys one 

277 level below the job type. 

278 

279 Args: 

280 default_job_config (google.cloud.bigquery.job._JobConfig): 

281 The default job config that will be used to fill in self. 

282 

283 Returns: 

284 google.cloud.bigquery.job._JobConfig: A new (merged) job config. 

285 """ 

286 if not default_job_config: 

287 new_job_config = copy.deepcopy(self) 

288 return new_job_config 

289 

290 if self._job_type != default_job_config._job_type: 

291 raise TypeError( 

292 "attempted to merge two incompatible job types: " 

293 + repr(self._job_type) 

294 + ", " 

295 + repr(default_job_config._job_type) 

296 ) 

297 

298 # cls is one of the job config subclasses that provides the job_type argument to 

299 # this base class on instantiation, thus missing-parameter warning is a false 

300 # positive here. 

301 new_job_config = self.__class__() # pytype: disable=missing-parameter 

302 

303 default_job_properties = copy.deepcopy(default_job_config._properties) 

304 for key in self._properties: 

305 if key != self._job_type: 

306 default_job_properties[key] = self._properties[key] 

307 

308 default_job_properties[self._job_type].update(self._properties[self._job_type]) 

309 new_job_config._properties = default_job_properties 

310 

311 return new_job_config 

312 

313 @classmethod 

314 def from_api_repr(cls, resource: dict) -> "_JobConfig": 

315 """Factory: construct a job configuration given its API representation 

316 

317 Args: 

318 resource (Dict): 

319 A job configuration in the same representation as is returned 

320 from the API. 

321 

322 Returns: 

323 google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``. 

324 """ 

325 # cls is one of the job config subclasses that provides the job_type argument to 

326 # this base class on instantiation, thus missing-parameter warning is a false 

327 # positive here. 

328 job_config = cls() # type: ignore # pytype: disable=missing-parameter 

329 job_config._properties = resource 

330 return job_config 

331 

332 

333class _AsyncJob(google.api_core.future.polling.PollingFuture): 

334 """Base class for asynchronous jobs. 

335 

336 Args: 

337 job_id (Union[str, _JobReference]): 

338 Job's ID in the project associated with the client or a 

339 fully-qualified job reference. 

340 client (google.cloud.bigquery.client.Client): 

341 Client which holds credentials and project configuration. 

342 """ 

343 

344 _JOB_TYPE = "unknown" 

345 _CONFIG_CLASS: ClassVar 

346 

347 def __init__(self, job_id, client): 

348 super(_AsyncJob, self).__init__() 

349 

350 # The job reference can be either a plain job ID or the full resource. 

351 # Populate the properties dictionary consistently depending on what has 

352 # been passed in. 

353 job_ref = job_id 

354 if not isinstance(job_id, _JobReference): 

355 job_ref = _JobReference(job_id, client.project, None) 

356 self._properties = {"jobReference": job_ref._to_api_repr()} 

357 

358 self._client = client 

359 self._result_set = False 

360 self._completion_lock = threading.Lock() 

361 

362 @property 

363 def configuration(self) -> _JobConfig: 

364 """Job-type specific configurtion.""" 

365 configuration = self._CONFIG_CLASS() 

366 configuration._properties = self._properties.setdefault("configuration", {}) 

367 return configuration 

368 

369 @property 

370 def job_id(self): 

371 """str: ID of the job.""" 

372 return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"]) 

373 

374 @property 

375 def parent_job_id(self): 

376 """Return the ID of the parent job. 

377 

378 See: 

379 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id 

380 

381 Returns: 

382 Optional[str]: parent job id. 

383 """ 

384 return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"]) 

385 

386 @property 

387 def script_statistics(self) -> Optional["ScriptStatistics"]: 

388 """Statistics for a child job of a script.""" 

389 resource = _helpers._get_sub_prop( 

390 self._properties, ["statistics", "scriptStatistics"] 

391 ) 

392 if resource is None: 

393 return None 

394 return ScriptStatistics(resource) 

395 

396 @property 

397 def session_info(self) -> Optional["SessionInfo"]: 

398 """[Preview] Information of the session if this job is part of one. 

399 

400 .. versionadded:: 2.29.0 

401 """ 

402 resource = _helpers._get_sub_prop( 

403 self._properties, ["statistics", "sessionInfo"] 

404 ) 

405 if resource is None: 

406 return None 

407 return SessionInfo(resource) 

408 

409 @property 

410 def num_child_jobs(self): 

411 """The number of child jobs executed. 

412 

413 See: 

414 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs 

415 

416 Returns: 

417 int 

418 """ 

419 count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"]) 

420 return int(count) if count is not None else 0 

421 

422 @property 

423 def project(self): 

424 """Project bound to the job. 

425 

426 Returns: 

427 str: the project (derived from the client). 

428 """ 

429 return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"]) 

430 

431 @property 

432 def location(self): 

433 """str: Location where the job runs.""" 

434 return _helpers._get_sub_prop(self._properties, ["jobReference", "location"]) 

435 

436 def _require_client(self, client): 

437 """Check client or verify over-ride. 

438 

439 Args: 

440 client (Optional[google.cloud.bigquery.client.Client]): 

441 the client to use. If not passed, falls back to the 

442 ``client`` stored on the current dataset. 

443 

444 Returns: 

445 google.cloud.bigquery.client.Client: 

446 The client passed in or the currently bound client. 

447 """ 

448 if client is None: 

449 client = self._client 

450 return client 

451 

452 @property 

453 def job_type(self): 

454 """Type of job. 

455 

456 Returns: 

457 str: one of 'load', 'copy', 'extract', 'query'. 

458 """ 

459 return self._JOB_TYPE 

460 

461 @property 

462 def path(self): 

463 """URL path for the job's APIs. 

464 

465 Returns: 

466 str: the path based on project and job ID. 

467 """ 

468 return "/projects/%s/jobs/%s" % (self.project, self.job_id) 

469 

470 @property 

471 def labels(self): 

472 """Dict[str, str]: Labels for the job.""" 

473 return self._properties.setdefault("configuration", {}).setdefault("labels", {}) 

474 

475 @property 

476 def etag(self): 

477 """ETag for the job resource. 

478 

479 Returns: 

480 Optional[str]: the ETag (None until set from the server). 

481 """ 

482 return self._properties.get("etag") 

483 

484 @property 

485 def self_link(self): 

486 """URL for the job resource. 

487 

488 Returns: 

489 Optional[str]: the URL (None until set from the server). 

490 """ 

491 return self._properties.get("selfLink") 

492 

493 @property 

494 def user_email(self): 

495 """E-mail address of user who submitted the job. 

496 

497 Returns: 

498 Optional[str]: the URL (None until set from the server). 

499 """ 

500 return self._properties.get("user_email") 

501 

502 @property 

503 def created(self): 

504 """Datetime at which the job was created. 

505 

506 Returns: 

507 Optional[datetime.datetime]: 

508 the creation time (None until set from the server). 

509 """ 

510 millis = _helpers._get_sub_prop( 

511 self._properties, ["statistics", "creationTime"] 

512 ) 

513 if millis is not None: 

514 return _helpers._datetime_from_microseconds(millis * 1000.0) 

515 

516 @property 

517 def started(self): 

518 """Datetime at which the job was started. 

519 

520 Returns: 

521 Optional[datetime.datetime]: 

522 the start time (None until set from the server). 

523 """ 

524 millis = _helpers._get_sub_prop(self._properties, ["statistics", "startTime"]) 

525 if millis is not None: 

526 return _helpers._datetime_from_microseconds(millis * 1000.0) 

527 

528 @property 

529 def ended(self): 

530 """Datetime at which the job finished. 

531 

532 Returns: 

533 Optional[datetime.datetime]: 

534 the end time (None until set from the server). 

535 """ 

536 millis = _helpers._get_sub_prop(self._properties, ["statistics", "endTime"]) 

537 if millis is not None: 

538 return _helpers._datetime_from_microseconds(millis * 1000.0) 

539 

540 def _job_statistics(self): 

541 """Helper for job-type specific statistics-based properties.""" 

542 statistics = self._properties.get("statistics", {}) 

543 return statistics.get(self._JOB_TYPE, {}) 

544 

545 @property 

546 def reservation_usage(self): 

547 """Job resource usage breakdown by reservation. 

548 

549 Returns: 

550 List[google.cloud.bigquery.job.ReservationUsage]: 

551 Reservation usage stats. Can be empty if not set from the server. 

552 """ 

553 usage_stats_raw = _helpers._get_sub_prop( 

554 self._properties, ["statistics", "reservationUsage"], default=() 

555 ) 

556 return [ 

557 ReservationUsage(name=usage["name"], slot_ms=int(usage["slotMs"])) 

558 for usage in usage_stats_raw 

559 ] 

560 

561 @property 

562 def transaction_info(self) -> Optional[TransactionInfo]: 

563 """Information of the multi-statement transaction if this job is part of one. 

564 

565 Since a scripting query job can execute multiple transactions, this 

566 property is only expected on child jobs. Use the 

567 :meth:`google.cloud.bigquery.client.Client.list_jobs` method with the 

568 ``parent_job`` parameter to iterate over child jobs. 

569 

570 .. versionadded:: 2.24.0 

571 """ 

572 info = self._properties.get("statistics", {}).get("transactionInfo") 

573 if info is None: 

574 return None 

575 else: 

576 return TransactionInfo.from_api_repr(info) 

577 

578 @property 

579 def error_result(self): 

580 """Error information about the job as a whole. 

581 

582 Returns: 

583 Optional[Mapping]: the error information (None until set from the server). 

584 """ 

585 status = self._properties.get("status") 

586 if status is not None: 

587 return status.get("errorResult") 

588 

589 @property 

590 def errors(self): 

591 """Information about individual errors generated by the job. 

592 

593 Returns: 

594 Optional[List[Mapping]]: 

595 the error information (None until set from the server). 

596 """ 

597 status = self._properties.get("status") 

598 if status is not None: 

599 return status.get("errors") 

600 

601 @property 

602 def state(self): 

603 """Status of the job. 

604 

605 Returns: 

606 Optional[str]: 

607 the state (None until set from the server). 

608 """ 

609 status = self._properties.get("status", {}) 

610 return status.get("state") 

611 

612 def _set_properties(self, api_response): 

613 """Update properties from resource in body of ``api_response`` 

614 

615 Args: 

616 api_response (Dict): response returned from an API call. 

617 """ 

618 cleaned = api_response.copy() 

619 statistics = cleaned.setdefault("statistics", {}) 

620 if "creationTime" in statistics: 

621 statistics["creationTime"] = float(statistics["creationTime"]) 

622 if "startTime" in statistics: 

623 statistics["startTime"] = float(statistics["startTime"]) 

624 if "endTime" in statistics: 

625 statistics["endTime"] = float(statistics["endTime"]) 

626 

627 self._properties = cleaned 

628 

629 # For Future interface 

630 self._set_future_result() 

631 

632 @classmethod 

633 def _check_resource_config(cls, resource): 

634 """Helper for :meth:`from_api_repr` 

635 

636 Args: 

637 resource (Dict): resource for the job. 

638 

639 Raises: 

640 KeyError: 

641 If the resource has no identifier, or 

642 is missing the appropriate configuration. 

643 """ 

644 if "jobReference" not in resource or "jobId" not in resource["jobReference"]: 

645 raise KeyError( 

646 "Resource lacks required identity information: " 

647 '["jobReference"]["jobId"]' 

648 ) 

649 if ( 

650 "configuration" not in resource 

651 or cls._JOB_TYPE not in resource["configuration"] 

652 ): 

653 raise KeyError( 

654 "Resource lacks required configuration: " 

655 '["configuration"]["%s"]' % cls._JOB_TYPE 

656 ) 

657 

658 def to_api_repr(self): 

659 """Generate a resource for the job.""" 

660 return copy.deepcopy(self._properties) 

661 

662 _build_resource = to_api_repr # backward-compatibility alias 

663 

664 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): 

665 """API call: begin the job via a POST request 

666 

667 See 

668 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert 

669 

670 Args: 

671 client (Optional[google.cloud.bigquery.client.Client]): 

672 The client to use. If not passed, falls back to the ``client`` 

673 associated with the job object or``NoneType`` 

674 retry (Optional[google.api_core.retry.Retry]): 

675 How to retry the RPC. 

676 timeout (Optional[float]): 

677 The number of seconds to wait for the underlying HTTP transport 

678 before using ``retry``. 

679 

680 Raises: 

681 ValueError: 

682 If the job has already begun. 

683 """ 

684 if self.state is not None: 

685 raise ValueError("Job already begun.") 

686 

687 client = self._require_client(client) 

688 path = "/projects/%s/jobs" % (self.project,) 

689 

690 # jobs.insert is idempotent because we ensure that every new 

691 # job has an ID. 

692 span_attributes = {"path": path} 

693 api_response = client._call_api( 

694 retry, 

695 span_name="BigQuery.job.begin", 

696 span_attributes=span_attributes, 

697 job_ref=self, 

698 method="POST", 

699 path=path, 

700 data=self.to_api_repr(), 

701 timeout=timeout, 

702 ) 

703 self._set_properties(api_response) 

704 

705 def exists( 

706 self, client=None, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None 

707 ) -> bool: 

708 """API call: test for the existence of the job via a GET request 

709 

710 See 

711 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get 

712 

713 Args: 

714 client (Optional[google.cloud.bigquery.client.Client]): 

715 the client to use. If not passed, falls back to the 

716 ``client`` stored on the current dataset. 

717 

718 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

719 timeout (Optional[float]): 

720 The number of seconds to wait for the underlying HTTP transport 

721 before using ``retry``. 

722 

723 Returns: 

724 bool: Boolean indicating existence of the job. 

725 """ 

726 client = self._require_client(client) 

727 

728 extra_params = {"fields": "id"} 

729 if self.location: 

730 extra_params["location"] = self.location 

731 

732 try: 

733 span_attributes = {"path": self.path} 

734 

735 client._call_api( 

736 retry, 

737 span_name="BigQuery.job.exists", 

738 span_attributes=span_attributes, 

739 job_ref=self, 

740 method="GET", 

741 path=self.path, 

742 query_params=extra_params, 

743 timeout=timeout, 

744 ) 

745 except exceptions.NotFound: 

746 return False 

747 else: 

748 return True 

749 

750 def reload( 

751 self, client=None, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None 

752 ): 

753 """API call: refresh job properties via a GET request. 

754 

755 See 

756 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get 

757 

758 Args: 

759 client (Optional[google.cloud.bigquery.client.Client]): 

760 the client to use. If not passed, falls back to the 

761 ``client`` stored on the current dataset. 

762 

763 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

764 timeout (Optional[float]): 

765 The number of seconds to wait for the underlying HTTP transport 

766 before using ``retry``. 

767 """ 

768 client = self._require_client(client) 

769 

770 extra_params = {} 

771 if self.location: 

772 extra_params["location"] = self.location 

773 span_attributes = {"path": self.path} 

774 

775 api_response = client._call_api( 

776 retry, 

777 span_name="BigQuery.job.reload", 

778 span_attributes=span_attributes, 

779 job_ref=self, 

780 method="GET", 

781 path=self.path, 

782 query_params=extra_params, 

783 timeout=timeout, 

784 ) 

785 self._set_properties(api_response) 

786 

787 def cancel( 

788 self, client=None, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None 

789 ) -> bool: 

790 """API call: cancel job via a POST request 

791 

792 See 

793 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel 

794 

795 Args: 

796 client (Optional[google.cloud.bigquery.client.Client]): 

797 the client to use. If not passed, falls back to the 

798 ``client`` stored on the current dataset. 

799 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

800 timeout (Optional[float]): 

801 The number of seconds to wait for the underlying HTTP transport 

802 before using ``retry`` 

803 

804 Returns: 

805 bool: Boolean indicating that the cancel request was sent. 

806 """ 

807 client = self._require_client(client) 

808 

809 extra_params = {} 

810 if self.location: 

811 extra_params["location"] = self.location 

812 

813 path = "{}/cancel".format(self.path) 

814 span_attributes = {"path": path} 

815 

816 api_response = client._call_api( 

817 retry, 

818 span_name="BigQuery.job.cancel", 

819 span_attributes=span_attributes, 

820 job_ref=self, 

821 method="POST", 

822 path=path, 

823 query_params=extra_params, 

824 timeout=timeout, 

825 ) 

826 self._set_properties(api_response["job"]) 

827 # The Future interface requires that we return True if the *attempt* 

828 # to cancel was successful. 

829 return True 

830 

831 # The following methods implement the PollingFuture interface. Note that 

832 # the methods above are from the pre-Future interface and are left for 

833 # compatibility. The only "overloaded" method is :meth:`cancel`, which 

834 # satisfies both interfaces. 

835 

836 def _set_future_result(self): 

837 """Set the result or exception from the job if it is complete.""" 

838 # This must be done in a lock to prevent the polling thread 

839 # and main thread from both executing the completion logic 

840 # at the same time. 

841 with self._completion_lock: 

842 # If the operation isn't complete or if the result has already been 

843 # set, do not call set_result/set_exception again. 

844 # Note: self._result_set is set to True in set_result and 

845 # set_exception, in case those methods are invoked directly. 

846 if not self.done(reload=False) or self._result_set: 

847 return 

848 

849 if self.error_result is not None: 

850 exception = _error_result_to_exception(self.error_result) 

851 self.set_exception(exception) 

852 else: 

853 self.set_result(self) 

854 

855 def done( 

856 self, 

857 retry: "retries.Retry" = DEFAULT_RETRY, 

858 timeout: float = None, 

859 reload: bool = True, 

860 ) -> bool: 

861 """Checks if the job is complete. 

862 

863 Args: 

864 retry (Optional[google.api_core.retry.Retry]): 

865 How to retry the RPC. If the job state is ``DONE``, retrying is aborted 

866 early, as the job will not change anymore. 

867 timeout (Optional[float]): 

868 The number of seconds to wait for the underlying HTTP transport 

869 before using ``retry``. 

870 reload (Optional[bool]): 

871 If ``True``, make an API call to refresh the job state of 

872 unfinished jobs before checking. Default ``True``. 

873 

874 Returns: 

875 bool: True if the job is complete, False otherwise. 

876 """ 

877 # Do not refresh is the state is already done, as the job will not 

878 # change once complete. 

879 if self.state != _DONE_STATE and reload: 

880 self.reload(retry=retry, timeout=timeout) 

881 return self.state == _DONE_STATE 

882 

883 def result( # type: ignore # (signature complaint) 

884 self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: float = None 

885 ) -> "_AsyncJob": 

886 """Start the job and wait for it to complete and get the result. 

887 

888 Args: 

889 retry (Optional[google.api_core.retry.Retry]): 

890 How to retry the RPC. If the job state is ``DONE``, retrying is aborted 

891 early, as the job will not change anymore. 

892 timeout (Optional[float]): 

893 The number of seconds to wait for the underlying HTTP transport 

894 before using ``retry``. 

895 If multiple requests are made under the hood, ``timeout`` 

896 applies to each individual request. 

897 

898 Returns: 

899 _AsyncJob: This instance. 

900 

901 Raises: 

902 google.cloud.exceptions.GoogleAPICallError: 

903 if the job failed. 

904 concurrent.futures.TimeoutError: 

905 if the job did not complete in the given timeout. 

906 """ 

907 if self.state is None: 

908 self._begin(retry=retry, timeout=timeout) 

909 

910 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry} 

911 return super(_AsyncJob, self).result(timeout=timeout, **kwargs) 

912 

913 def cancelled(self): 

914 """Check if the job has been cancelled. 

915 

916 This always returns False. It's not possible to check if a job was 

917 cancelled in the API. This method is here to satisfy the interface 

918 for :class:`google.api_core.future.Future`. 

919 

920 Returns: 

921 bool: False 

922 """ 

923 return ( 

924 self.error_result is not None 

925 and self.error_result.get("reason") == _STOPPED_REASON 

926 ) 

927 

928 def __repr__(self): 

929 result = ( 

930 f"{self.__class__.__name__}<" 

931 f"project={self.project}, location={self.location}, id={self.job_id}" 

932 ">" 

933 ) 

934 return result 

935 

936 

937class ScriptStackFrame(object): 

938 """Stack frame showing the line/column/procedure name where the current 

939 evaluation happened. 

940 

941 Args: 

942 resource (Map[str, Any]): JSON representation of object. 

943 """ 

944 

945 def __init__(self, resource): 

946 self._properties = resource 

947 

948 @property 

949 def procedure_id(self): 

950 """Optional[str]: Name of the active procedure. 

951 

952 Omitted if in a top-level script. 

953 """ 

954 return self._properties.get("procedureId") 

955 

956 @property 

957 def text(self): 

958 """str: Text of the current statement/expression.""" 

959 return self._properties.get("text") 

960 

961 @property 

962 def start_line(self): 

963 """int: One-based start line.""" 

964 return _helpers._int_or_none(self._properties.get("startLine")) 

965 

966 @property 

967 def start_column(self): 

968 """int: One-based start column.""" 

969 return _helpers._int_or_none(self._properties.get("startColumn")) 

970 

971 @property 

972 def end_line(self): 

973 """int: One-based end line.""" 

974 return _helpers._int_or_none(self._properties.get("endLine")) 

975 

976 @property 

977 def end_column(self): 

978 """int: One-based end column.""" 

979 return _helpers._int_or_none(self._properties.get("endColumn")) 

980 

981 

982class ScriptStatistics(object): 

983 """Statistics for a child job of a script. 

984 

985 Args: 

986 resource (Map[str, Any]): JSON representation of object. 

987 """ 

988 

989 def __init__(self, resource): 

990 self._properties = resource 

991 

992 @property 

993 def stack_frames(self) -> Sequence[ScriptStackFrame]: 

994 """Stack trace where the current evaluation happened. 

995 

996 Shows line/column/procedure name of each frame on the stack at the 

997 point where the current evaluation happened. 

998 

999 The leaf frame is first, the primary script is last. 

1000 """ 

1001 return [ 

1002 ScriptStackFrame(frame) for frame in self._properties.get("stackFrames", []) 

1003 ] 

1004 

1005 @property 

1006 def evaluation_kind(self) -> Optional[str]: 

1007 """str: Indicates the type of child job. 

1008 

1009 Possible values include ``STATEMENT`` and ``EXPRESSION``. 

1010 """ 

1011 return self._properties.get("evaluationKind") 

1012 

1013 

1014class SessionInfo: 

1015 """[Preview] Information of the session if this job is part of one. 

1016 

1017 .. versionadded:: 2.29.0 

1018 

1019 Args: 

1020 resource (Map[str, Any]): JSON representation of object. 

1021 """ 

1022 

1023 def __init__(self, resource): 

1024 self._properties = resource 

1025 

1026 @property 

1027 def session_id(self) -> Optional[str]: 

1028 """The ID of the session.""" 

1029 return self._properties.get("sessionId") 

1030 

1031 

1032class UnknownJob(_AsyncJob): 

1033 """A job whose type cannot be determined.""" 

1034 

1035 @classmethod 

1036 def from_api_repr(cls, resource: dict, client) -> "UnknownJob": 

1037 """Construct an UnknownJob from the JSON representation. 

1038 

1039 Args: 

1040 resource (Dict): JSON representation of a job. 

1041 client (google.cloud.bigquery.client.Client): 

1042 Client connected to BigQuery API. 

1043 

1044 Returns: 

1045 UnknownJob: Job corresponding to the resource. 

1046 """ 

1047 job_ref_properties = resource.get( 

1048 "jobReference", {"projectId": client.project, "jobId": None} 

1049 ) 

1050 job_ref = _JobReference._from_api_repr(job_ref_properties) 

1051 job = cls(job_ref, client) 

1052 # Populate the job reference with the project, even if it has been 

1053 # redacted, because we know it should equal that of the request. 

1054 resource["jobReference"] = job_ref_properties 

1055 job._properties = resource 

1056 return job