Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

361 statements  

1# Copyright 2015 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Base classes and helpers for job classes.""" 

16 

17from collections import namedtuple 

18import copy 

19import http 

20import threading 

21import typing 

22from typing import ClassVar, Dict, Optional, Sequence 

23 

24from google.api_core import retry as retries 

25from google.api_core import exceptions 

26import google.api_core.future.polling 

27 

28from google.cloud.bigquery import _helpers 

29from google.cloud.bigquery._helpers import _int_or_none 

30from google.cloud.bigquery.retry import ( 

31 DEFAULT_GET_JOB_TIMEOUT, 

32 DEFAULT_RETRY, 

33) 

34 

35 

36_DONE_STATE = "DONE" 

37_STOPPED_REASON = "stopped" 

38_ERROR_REASON_TO_EXCEPTION = { 

39 "accessDenied": http.client.FORBIDDEN, 

40 "backendError": http.client.INTERNAL_SERVER_ERROR, 

41 "billingNotEnabled": http.client.FORBIDDEN, 

42 "billingTierLimitExceeded": http.client.BAD_REQUEST, 

43 "blocked": http.client.FORBIDDEN, 

44 "duplicate": http.client.CONFLICT, 

45 "internalError": http.client.INTERNAL_SERVER_ERROR, 

46 "invalid": http.client.BAD_REQUEST, 

47 "invalidQuery": http.client.BAD_REQUEST, 

48 "notFound": http.client.NOT_FOUND, 

49 "notImplemented": http.client.NOT_IMPLEMENTED, 

50 "policyViolation": http.client.FORBIDDEN, 

51 "quotaExceeded": http.client.FORBIDDEN, 

52 "rateLimitExceeded": http.client.TOO_MANY_REQUESTS, 

53 "resourceInUse": http.client.BAD_REQUEST, 

54 "resourcesExceeded": http.client.BAD_REQUEST, 

55 "responseTooLarge": http.client.FORBIDDEN, 

56 "stopped": http.client.OK, 

57 "tableUnavailable": http.client.BAD_REQUEST, 

58} 

59 

60 

61def _error_result_to_exception(error_result, errors=None): 

62 """Maps BigQuery error reasons to an exception. 

63 

64 The reasons and their matching HTTP status codes are documented on 

65 the `troubleshooting errors`_ page. 

66 

67 .. _troubleshooting errors: https://cloud.google.com/bigquery\ 

68 /troubleshooting-errors 

69 

70 Args: 

71 error_result (Mapping[str, str]): The error result from BigQuery. 

72 errors (Union[Iterable[str], None]): The detailed error messages. 

73 

74 Returns: 

75 google.cloud.exceptions.GoogleAPICallError: The mapped exception. 

76 """ 

77 reason = error_result.get("reason") 

78 status_code = _ERROR_REASON_TO_EXCEPTION.get( 

79 reason, http.client.INTERNAL_SERVER_ERROR 

80 ) 

81 # Manually create error message to preserve both error_result and errors. 

82 # Can be removed once b/310544564 and b/318889899 are resolved. 

83 concatenated_errors = "" 

84 if errors: 

85 concatenated_errors = "; " 

86 for err in errors: 

87 concatenated_errors += ", ".join( 

88 [f"{key}: {value}" for key, value in err.items()] 

89 ) 

90 concatenated_errors += "; " 

91 

92 # strips off the last unneeded semicolon and space 

93 concatenated_errors = concatenated_errors[:-2] 

94 

95 error_message = error_result.get("message", "") + concatenated_errors 

96 

97 return exceptions.from_http_status( 

98 status_code, error_message, errors=[error_result] 

99 ) 

100 

101 

102ReservationUsage = namedtuple("ReservationUsage", "name slot_ms") 

103ReservationUsage.__doc__ = "Job resource usage for a reservation." 

104ReservationUsage.name.__doc__ = ( 

105 'Reservation name or "unreserved" for on-demand resources usage.' 

106) 

107ReservationUsage.slot_ms.__doc__ = ( 

108 "Total slot milliseconds used by the reservation for a particular job." 

109) 

110 

111 

112class TransactionInfo(typing.NamedTuple): 

113 """[Alpha] Information of a multi-statement transaction. 

114 

115 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo 

116 

117 .. versionadded:: 2.24.0 

118 """ 

119 

120 transaction_id: str 

121 """Output only. ID of the transaction.""" 

122 

123 @classmethod 

124 def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo": 

125 return cls(transaction_info["transactionId"]) 

126 

127 

128class _JobReference(object): 

129 """A reference to a job. 

130 

131 Args: 

132 job_id (str): ID of the job to run. 

133 project (str): ID of the project where the job runs. 

134 location (str): Location of where the job runs. 

135 """ 

136 

137 def __init__(self, job_id, project, location): 

138 self._properties = {"jobId": job_id, "projectId": project} 

139 # The location field must not be populated if it is None. 

140 if location: 

141 self._properties["location"] = location 

142 

143 @property 

144 def job_id(self): 

145 """str: ID of the job.""" 

146 return self._properties.get("jobId") 

147 

148 @property 

149 def project(self): 

150 """str: ID of the project where the job runs.""" 

151 return self._properties.get("projectId") 

152 

153 @property 

154 def location(self): 

155 """str: Location where the job runs.""" 

156 return self._properties.get("location") 

157 

158 def _to_api_repr(self): 

159 """Returns the API resource representation of the job reference.""" 

160 return copy.deepcopy(self._properties) 

161 

162 @classmethod 

163 def _from_api_repr(cls, resource): 

164 """Returns a job reference for an API resource representation.""" 

165 job_id = resource.get("jobId") 

166 project = resource.get("projectId") 

167 location = resource.get("location") 

168 job_ref = cls(job_id, project, location) 

169 return job_ref 

170 

171 

172class _JobConfig(object): 

173 """Abstract base class for job configuration objects. 

174 

175 Args: 

176 job_type (str): The key to use for the job configuration. 

177 """ 

178 

179 def __init__(self, job_type, **kwargs): 

180 self._job_type = job_type 

181 self._properties = {job_type: {}} 

182 for prop, val in kwargs.items(): 

183 setattr(self, prop, val) 

184 

185 def __setattr__(self, name, value): 

186 """Override to be able to raise error if an unknown property is being set""" 

187 if not name.startswith("_") and not hasattr(type(self), name): 

188 raise AttributeError( 

189 "Property {} is unknown for {}.".format(name, type(self)) 

190 ) 

191 super(_JobConfig, self).__setattr__(name, value) 

192 

193 @property 

194 def job_timeout_ms(self): 

195 """Optional parameter. Job timeout in milliseconds. If this time limit is exceeded, BigQuery might attempt to stop the job. 

196 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms 

197 e.g. 

198 

199 job_config = bigquery.QueryJobConfig( job_timeout_ms = 5000 ) 

200 or 

201 job_config.job_timeout_ms = 5000 

202 

203 Raises: 

204 ValueError: If ``value`` type is invalid. 

205 """ 

206 

207 # None as this is an optional parameter. 

208 if self._properties.get("jobTimeoutMs"): 

209 return self._properties["jobTimeoutMs"] 

210 return None 

211 

212 @job_timeout_ms.setter 

213 def job_timeout_ms(self, value): 

214 try: 

215 value = _int_or_none(value) 

216 except ValueError as err: 

217 raise ValueError("Pass an int for jobTimeoutMs, e.g. 5000").with_traceback( 

218 err.__traceback__ 

219 ) 

220 

221 if value is not None: 

222 # docs indicate a string is expected by the API 

223 self._properties["jobTimeoutMs"] = str(value) 

224 else: 

225 self._properties.pop("jobTimeoutMs", None) 

226 

227 @property 

228 def reservation(self): 

229 """str: Optional. The reservation that job would use. 

230 

231 User can specify a reservation to execute the job. If reservation is 

232 not set, reservation is determined based on the rules defined by the 

233 reservation assignments. The expected format is 

234 projects/{project}/locations/{location}/reservations/{reservation}. 

235 

236 Raises: 

237 ValueError: If ``value`` type is not None or of string type. 

238 """ 

239 return self._properties.setdefault("reservation", None) 

240 

241 @reservation.setter 

242 def reservation(self, value): 

243 if value and not isinstance(value, str): 

244 raise ValueError("Reservation must be None or a string.") 

245 self._properties["reservation"] = value 

246 

247 @property 

248 def labels(self): 

249 """Dict[str, str]: Labels for the job. 

250 

251 This method always returns a dict. Once a job has been created on the 

252 server, its labels cannot be modified anymore. 

253 

254 Raises: 

255 ValueError: If ``value`` type is invalid. 

256 """ 

257 return self._properties.setdefault("labels", {}) 

258 

259 @labels.setter 

260 def labels(self, value): 

261 if not isinstance(value, dict): 

262 raise ValueError("Pass a dict") 

263 self._properties["labels"] = value 

264 

265 def _get_sub_prop(self, key, default=None): 

266 """Get a value in the ``self._properties[self._job_type]`` dictionary. 

267 

268 Most job properties are inside the dictionary related to the job type 

269 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access 

270 those properties:: 

271 

272 self._get_sub_prop('destinationTable') 

273 

274 This is equivalent to using the ``_helpers._get_sub_prop`` function:: 

275 

276 _helpers._get_sub_prop( 

277 self._properties, ['query', 'destinationTable']) 

278 

279 Args: 

280 key (str): 

281 Key for the value to get in the 

282 ``self._properties[self._job_type]`` dictionary. 

283 default (Optional[object]): 

284 Default value to return if the key is not found. 

285 Defaults to :data:`None`. 

286 

287 Returns: 

288 object: The value if present or the default. 

289 """ 

290 return _helpers._get_sub_prop( 

291 self._properties, [self._job_type, key], default=default 

292 ) 

293 

294 def _set_sub_prop(self, key, value): 

295 """Set a value in the ``self._properties[self._job_type]`` dictionary. 

296 

297 Most job properties are inside the dictionary related to the job type 

298 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set 

299 those properties:: 

300 

301 self._set_sub_prop('useLegacySql', False) 

302 

303 This is equivalent to using the ``_helper._set_sub_prop`` function:: 

304 

305 _helper._set_sub_prop( 

306 self._properties, ['query', 'useLegacySql'], False) 

307 

308 Args: 

309 key (str): 

310 Key to set in the ``self._properties[self._job_type]`` 

311 dictionary. 

312 value (object): Value to set. 

313 """ 

314 _helpers._set_sub_prop(self._properties, [self._job_type, key], value) 

315 

316 def _del_sub_prop(self, key): 

317 """Remove ``key`` from the ``self._properties[self._job_type]`` dict. 

318 

319 Most job properties are inside the dictionary related to the job type 

320 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear 

321 those properties:: 

322 

323 self._del_sub_prop('useLegacySql') 

324 

325 This is equivalent to using the ``_helper._del_sub_prop`` function:: 

326 

327 _helper._del_sub_prop( 

328 self._properties, ['query', 'useLegacySql']) 

329 

330 Args: 

331 key (str): 

332 Key to remove in the ``self._properties[self._job_type]`` 

333 dictionary. 

334 """ 

335 _helpers._del_sub_prop(self._properties, [self._job_type, key]) 

336 

337 def to_api_repr(self) -> dict: 

338 """Build an API representation of the job config. 

339 

340 Returns: 

341 Dict: A dictionary in the format used by the BigQuery API. 

342 """ 

343 return copy.deepcopy(self._properties) 

344 

345 def _fill_from_default(self, default_job_config=None): 

346 """Merge this job config with a default job config. 

347 

348 The keys in this object take precedence over the keys in the default 

349 config. The merge is done at the top-level as well as for keys one 

350 level below the job type. 

351 

352 Args: 

353 default_job_config (google.cloud.bigquery.job._JobConfig): 

354 The default job config that will be used to fill in self. 

355 

356 Returns: 

357 google.cloud.bigquery.job._JobConfig: A new (merged) job config. 

358 """ 

359 if not default_job_config: 

360 new_job_config = copy.deepcopy(self) 

361 return new_job_config 

362 

363 if self._job_type != default_job_config._job_type: 

364 raise TypeError( 

365 "attempted to merge two incompatible job types: " 

366 + repr(self._job_type) 

367 + ", " 

368 + repr(default_job_config._job_type) 

369 ) 

370 

371 # cls is one of the job config subclasses that provides the job_type argument to 

372 # this base class on instantiation, thus missing-parameter warning is a false 

373 # positive here. 

374 new_job_config = self.__class__() # pytype: disable=missing-parameter 

375 

376 default_job_properties = copy.deepcopy(default_job_config._properties) 

377 for key in self._properties: 

378 if key != self._job_type: 

379 default_job_properties[key] = self._properties[key] 

380 

381 default_job_properties[self._job_type].update(self._properties[self._job_type]) 

382 new_job_config._properties = default_job_properties 

383 

384 return new_job_config 

385 

386 @classmethod 

387 def from_api_repr(cls, resource: dict) -> "_JobConfig": 

388 """Factory: construct a job configuration given its API representation 

389 

390 Args: 

391 resource (Dict): 

392 A job configuration in the same representation as is returned 

393 from the API. 

394 

395 Returns: 

396 google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``. 

397 """ 

398 # cls is one of the job config subclasses that provides the job_type argument to 

399 # this base class on instantiation, thus missing-parameter warning is a false 

400 # positive here. 

401 job_config = cls() # type: ignore # pytype: disable=missing-parameter 

402 job_config._properties = resource 

403 return job_config 

404 

405 

406class _AsyncJob(google.api_core.future.polling.PollingFuture): 

407 """Base class for asynchronous jobs. 

408 

409 Args: 

410 job_id (Union[str, _JobReference]): 

411 Job's ID in the project associated with the client or a 

412 fully-qualified job reference. 

413 client (google.cloud.bigquery.client.Client): 

414 Client which holds credentials and project configuration. 

415 """ 

416 

417 _JOB_TYPE = "unknown" 

418 _CONFIG_CLASS: ClassVar 

419 

420 def __init__(self, job_id, client): 

421 super(_AsyncJob, self).__init__() 

422 

423 # The job reference can be either a plain job ID or the full resource. 

424 # Populate the properties dictionary consistently depending on what has 

425 # been passed in. 

426 job_ref = job_id 

427 if not isinstance(job_id, _JobReference): 

428 job_ref = _JobReference(job_id, client.project, None) 

429 self._properties = {"jobReference": job_ref._to_api_repr()} 

430 

431 self._client = client 

432 self._result_set = False 

433 self._completion_lock = threading.Lock() 

434 

435 @property 

436 def configuration(self) -> _JobConfig: 

437 """Job-type specific configurtion.""" 

438 configuration: _JobConfig = self._CONFIG_CLASS() # pytype: disable=not-callable 

439 configuration._properties = self._properties.setdefault("configuration", {}) 

440 return configuration 

441 

442 @property 

443 def job_id(self): 

444 """str: ID of the job.""" 

445 return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"]) 

446 

447 @property 

448 def parent_job_id(self): 

449 """Return the ID of the parent job. 

450 

451 See: 

452 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id 

453 

454 Returns: 

455 Optional[str]: parent job id. 

456 """ 

457 return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"]) 

458 

459 @property 

460 def script_statistics(self) -> Optional["ScriptStatistics"]: 

461 """Statistics for a child job of a script.""" 

462 resource = _helpers._get_sub_prop( 

463 self._properties, ["statistics", "scriptStatistics"] 

464 ) 

465 if resource is None: 

466 return None 

467 return ScriptStatistics(resource) 

468 

469 @property 

470 def session_info(self) -> Optional["SessionInfo"]: 

471 """[Preview] Information of the session if this job is part of one. 

472 

473 .. versionadded:: 2.29.0 

474 """ 

475 resource = _helpers._get_sub_prop( 

476 self._properties, ["statistics", "sessionInfo"] 

477 ) 

478 if resource is None: 

479 return None 

480 return SessionInfo(resource) 

481 

482 @property 

483 def num_child_jobs(self): 

484 """The number of child jobs executed. 

485 

486 See: 

487 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs 

488 

489 Returns: 

490 int 

491 """ 

492 count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"]) 

493 return int(count) if count is not None else 0 

494 

495 @property 

496 def project(self): 

497 """Project bound to the job. 

498 

499 Returns: 

500 str: the project (derived from the client). 

501 """ 

502 return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"]) 

503 

504 @property 

505 def location(self): 

506 """str: Location where the job runs.""" 

507 return _helpers._get_sub_prop(self._properties, ["jobReference", "location"]) 

508 

509 @property 

510 def reservation_id(self): 

511 """str: Name of the primary reservation assigned to this job. 

512 

513 Note that this could be different than reservations reported in 

514 the reservation field if parent reservations were used to execute 

515 this job. 

516 """ 

517 return _helpers._get_sub_prop( 

518 self._properties, ["statistics", "reservation_id"] 

519 ) 

520 

521 def _require_client(self, client): 

522 """Check client or verify over-ride. 

523 

524 Args: 

525 client (Optional[google.cloud.bigquery.client.Client]): 

526 the client to use. If not passed, falls back to the 

527 ``client`` stored on the current dataset. 

528 

529 Returns: 

530 google.cloud.bigquery.client.Client: 

531 The client passed in or the currently bound client. 

532 """ 

533 if client is None: 

534 client = self._client 

535 return client 

536 

537 @property 

538 def job_type(self): 

539 """Type of job. 

540 

541 Returns: 

542 str: one of 'load', 'copy', 'extract', 'query'. 

543 """ 

544 return self._JOB_TYPE 

545 

546 @property 

547 def path(self): 

548 """URL path for the job's APIs. 

549 

550 Returns: 

551 str: the path based on project and job ID. 

552 """ 

553 return "/projects/%s/jobs/%s" % (self.project, self.job_id) 

554 

555 @property 

556 def labels(self): 

557 """Dict[str, str]: Labels for the job.""" 

558 return self._properties.setdefault("configuration", {}).setdefault("labels", {}) 

559 

560 @property 

561 def etag(self): 

562 """ETag for the job resource. 

563 

564 Returns: 

565 Optional[str]: the ETag (None until set from the server). 

566 """ 

567 return self._properties.get("etag") 

568 

569 @property 

570 def self_link(self): 

571 """URL for the job resource. 

572 

573 Returns: 

574 Optional[str]: the URL (None until set from the server). 

575 """ 

576 return self._properties.get("selfLink") 

577 

578 @property 

579 def user_email(self): 

580 """E-mail address of user who submitted the job. 

581 

582 Returns: 

583 Optional[str]: the URL (None until set from the server). 

584 """ 

585 return self._properties.get("user_email") 

586 

587 @property 

588 def created(self): 

589 """Datetime at which the job was created. 

590 

591 Returns: 

592 Optional[datetime.datetime]: 

593 the creation time (None until set from the server). 

594 """ 

595 millis = _helpers._get_sub_prop( 

596 self._properties, ["statistics", "creationTime"] 

597 ) 

598 if millis is not None: 

599 return _helpers._datetime_from_microseconds(millis * 1000.0) 

600 

601 @property 

602 def started(self): 

603 """Datetime at which the job was started. 

604 

605 Returns: 

606 Optional[datetime.datetime]: 

607 the start time (None until set from the server). 

608 """ 

609 millis = _helpers._get_sub_prop(self._properties, ["statistics", "startTime"]) 

610 if millis is not None: 

611 return _helpers._datetime_from_microseconds(millis * 1000.0) 

612 

613 @property 

614 def ended(self): 

615 """Datetime at which the job finished. 

616 

617 Returns: 

618 Optional[datetime.datetime]: 

619 the end time (None until set from the server). 

620 """ 

621 millis = _helpers._get_sub_prop(self._properties, ["statistics", "endTime"]) 

622 if millis is not None: 

623 return _helpers._datetime_from_microseconds(millis * 1000.0) 

624 

625 def _job_statistics(self): 

626 """Helper for job-type specific statistics-based properties.""" 

627 statistics = self._properties.get("statistics", {}) 

628 return statistics.get(self._JOB_TYPE, {}) 

629 

630 @property 

631 def reservation_usage(self): 

632 """Job resource usage breakdown by reservation. 

633 

634 Returns: 

635 List[google.cloud.bigquery.job.ReservationUsage]: 

636 Reservation usage stats. Can be empty if not set from the server. 

637 """ 

638 usage_stats_raw = _helpers._get_sub_prop( 

639 self._properties, ["statistics", "reservationUsage"], default=() 

640 ) 

641 return [ 

642 ReservationUsage(name=usage["name"], slot_ms=int(usage["slotMs"])) 

643 for usage in usage_stats_raw 

644 ] 

645 

646 @property 

647 def transaction_info(self) -> Optional[TransactionInfo]: 

648 """Information of the multi-statement transaction if this job is part of one. 

649 

650 Since a scripting query job can execute multiple transactions, this 

651 property is only expected on child jobs. Use the 

652 :meth:`google.cloud.bigquery.client.Client.list_jobs` method with the 

653 ``parent_job`` parameter to iterate over child jobs. 

654 

655 .. versionadded:: 2.24.0 

656 """ 

657 info = self._properties.get("statistics", {}).get("transactionInfo") 

658 if info is None: 

659 return None 

660 else: 

661 return TransactionInfo.from_api_repr(info) 

662 

663 @property 

664 def error_result(self): 

665 """Error information about the job as a whole. 

666 

667 Returns: 

668 Optional[Mapping]: the error information (None until set from the server). 

669 """ 

670 status = self._properties.get("status") 

671 if status is not None: 

672 return status.get("errorResult") 

673 

674 @property 

675 def errors(self): 

676 """Information about individual errors generated by the job. 

677 

678 Returns: 

679 Optional[List[Mapping]]: 

680 the error information (None until set from the server). 

681 """ 

682 status = self._properties.get("status") 

683 if status is not None: 

684 return status.get("errors") 

685 

686 @property 

687 def state(self): 

688 """Status of the job. 

689 

690 Returns: 

691 Optional[str]: 

692 the state (None until set from the server). 

693 """ 

694 status = self._properties.get("status", {}) 

695 return status.get("state") 

696 

697 def _set_properties(self, api_response): 

698 """Update properties from resource in body of ``api_response`` 

699 

700 Args: 

701 api_response (Dict): response returned from an API call. 

702 """ 

703 cleaned = api_response.copy() 

704 statistics = cleaned.setdefault("statistics", {}) 

705 if "creationTime" in statistics: 

706 statistics["creationTime"] = float(statistics["creationTime"]) 

707 if "startTime" in statistics: 

708 statistics["startTime"] = float(statistics["startTime"]) 

709 if "endTime" in statistics: 

710 statistics["endTime"] = float(statistics["endTime"]) 

711 

712 self._properties = cleaned 

713 

714 # For Future interface 

715 self._set_future_result() 

716 

717 @classmethod 

718 def _check_resource_config(cls, resource): 

719 """Helper for :meth:`from_api_repr` 

720 

721 Args: 

722 resource (Dict): resource for the job. 

723 

724 Raises: 

725 KeyError: 

726 If the resource has no identifier, or 

727 is missing the appropriate configuration. 

728 """ 

729 if "jobReference" not in resource or "jobId" not in resource["jobReference"]: 

730 raise KeyError( 

731 "Resource lacks required identity information: " 

732 '["jobReference"]["jobId"]' 

733 ) 

734 if ( 

735 "configuration" not in resource 

736 or cls._JOB_TYPE not in resource["configuration"] 

737 ): 

738 raise KeyError( 

739 "Resource lacks required configuration: " 

740 '["configuration"]["%s"]' % cls._JOB_TYPE 

741 ) 

742 

743 def to_api_repr(self): 

744 """Generate a resource for the job.""" 

745 return copy.deepcopy(self._properties) 

746 

747 _build_resource = to_api_repr # backward-compatibility alias 

748 

749 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): 

750 """API call: begin the job via a POST request 

751 

752 See 

753 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert 

754 

755 Args: 

756 client (Optional[google.cloud.bigquery.client.Client]): 

757 The client to use. If not passed, falls back to the ``client`` 

758 associated with the job object or``NoneType`` 

759 retry (Optional[google.api_core.retry.Retry]): 

760 How to retry the RPC. 

761 timeout (Optional[float]): 

762 The number of seconds to wait for the underlying HTTP transport 

763 before using ``retry``. 

764 

765 Raises: 

766 ValueError: 

767 If the job has already begun. 

768 """ 

769 if self.state is not None: 

770 raise ValueError("Job already begun.") 

771 

772 client = self._require_client(client) 

773 path = "/projects/%s/jobs" % (self.project,) 

774 

775 # jobs.insert is idempotent because we ensure that every new 

776 # job has an ID. 

777 span_attributes = {"path": path} 

778 api_response = client._call_api( 

779 retry, 

780 span_name="BigQuery.job.begin", 

781 span_attributes=span_attributes, 

782 job_ref=self, 

783 method="POST", 

784 path=path, 

785 data=self.to_api_repr(), 

786 timeout=timeout, 

787 ) 

788 self._set_properties(api_response) 

789 

790 def exists( 

791 self, 

792 client=None, 

793 retry: "retries.Retry" = DEFAULT_RETRY, 

794 timeout: Optional[float] = None, 

795 ) -> bool: 

796 """API call: test for the existence of the job via a GET request 

797 

798 See 

799 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get 

800 

801 Args: 

802 client (Optional[google.cloud.bigquery.client.Client]): 

803 the client to use. If not passed, falls back to the 

804 ``client`` stored on the current dataset. 

805 

806 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

807 timeout (Optional[float]): 

808 The number of seconds to wait for the underlying HTTP transport 

809 before using ``retry``. 

810 

811 Returns: 

812 bool: Boolean indicating existence of the job. 

813 """ 

814 client = self._require_client(client) 

815 

816 extra_params = {"fields": "id"} 

817 if self.location: 

818 extra_params["location"] = self.location 

819 

820 try: 

821 span_attributes = {"path": self.path} 

822 

823 client._call_api( 

824 retry, 

825 span_name="BigQuery.job.exists", 

826 span_attributes=span_attributes, 

827 job_ref=self, 

828 method="GET", 

829 path=self.path, 

830 query_params=extra_params, 

831 timeout=timeout, 

832 ) 

833 except exceptions.NotFound: 

834 return False 

835 else: 

836 return True 

837 

838 def reload( 

839 self, 

840 client=None, 

841 retry: "retries.Retry" = DEFAULT_RETRY, 

842 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT, 

843 ): 

844 """API call: refresh job properties via a GET request. 

845 

846 See 

847 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get 

848 

849 Args: 

850 client (Optional[google.cloud.bigquery.client.Client]): 

851 the client to use. If not passed, falls back to the 

852 ``client`` stored on the current dataset. 

853 

854 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

855 timeout (Optional[float]): 

856 The number of seconds to wait for the underlying HTTP transport 

857 before using ``retry``. 

858 """ 

859 client = self._require_client(client) 

860 

861 got_job = client.get_job( 

862 self, 

863 project=self.project, 

864 location=self.location, 

865 retry=retry, 

866 timeout=timeout, 

867 ) 

868 self._set_properties(got_job._properties) 

869 

870 def cancel( 

871 self, 

872 client=None, 

873 retry: Optional[retries.Retry] = DEFAULT_RETRY, 

874 timeout: Optional[float] = None, 

875 ) -> bool: 

876 """API call: cancel job via a POST request 

877 

878 See 

879 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel 

880 

881 Args: 

882 client (Optional[google.cloud.bigquery.client.Client]): 

883 the client to use. If not passed, falls back to the 

884 ``client`` stored on the current dataset. 

885 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

886 timeout (Optional[float]): 

887 The number of seconds to wait for the underlying HTTP transport 

888 before using ``retry`` 

889 

890 Returns: 

891 bool: Boolean indicating that the cancel request was sent. 

892 """ 

893 client = self._require_client(client) 

894 

895 extra_params = {} 

896 if self.location: 

897 extra_params["location"] = self.location 

898 

899 path = "{}/cancel".format(self.path) 

900 span_attributes = {"path": path} 

901 

902 api_response = client._call_api( 

903 retry, 

904 span_name="BigQuery.job.cancel", 

905 span_attributes=span_attributes, 

906 job_ref=self, 

907 method="POST", 

908 path=path, 

909 query_params=extra_params, 

910 timeout=timeout, 

911 ) 

912 self._set_properties(api_response["job"]) 

913 # The Future interface requires that we return True if the *attempt* 

914 # to cancel was successful. 

915 return True 

916 

917 # The following methods implement the PollingFuture interface. Note that 

918 # the methods above are from the pre-Future interface and are left for 

919 # compatibility. The only "overloaded" method is :meth:`cancel`, which 

920 # satisfies both interfaces. 

921 

922 def _set_future_result(self): 

923 """Set the result or exception from the job if it is complete.""" 

924 # This must be done in a lock to prevent the polling thread 

925 # and main thread from both executing the completion logic 

926 # at the same time. 

927 with self._completion_lock: 

928 # If the operation isn't complete or if the result has already been 

929 # set, do not call set_result/set_exception again. 

930 # Note: self._result_set is set to True in set_result and 

931 # set_exception, in case those methods are invoked directly. 

932 if not self.done(reload=False) or self._result_set: 

933 return 

934 

935 if self.error_result is not None: 

936 exception = _error_result_to_exception( 

937 self.error_result, self.errors or () 

938 ) 

939 self.set_exception(exception) 

940 else: 

941 self.set_result(self) 

942 

943 def done( 

944 self, 

945 retry: "retries.Retry" = DEFAULT_RETRY, 

946 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT, 

947 reload: bool = True, 

948 ) -> bool: 

949 """Checks if the job is complete. 

950 

951 Args: 

952 retry (Optional[google.api_core.retry.Retry]): 

953 How to retry the RPC. If the job state is ``DONE``, retrying is aborted 

954 early, as the job will not change anymore. 

955 timeout (Optional[float]): 

956 The number of seconds to wait for the underlying HTTP transport 

957 before using ``retry``. 

958 reload (Optional[bool]): 

959 If ``True``, make an API call to refresh the job state of 

960 unfinished jobs before checking. Default ``True``. 

961 

962 Returns: 

963 bool: True if the job is complete, False otherwise. 

964 """ 

965 # Do not refresh is the state is already done, as the job will not 

966 # change once complete. 

967 if self.state != _DONE_STATE and reload: 

968 self.reload(retry=retry, timeout=timeout) 

969 return self.state == _DONE_STATE 

970 

971 def result( # type: ignore # (incompatible with supertype) 

972 self, 

973 retry: Optional[retries.Retry] = DEFAULT_RETRY, 

974 timeout: Optional[float] = None, 

975 ) -> "_AsyncJob": 

976 """Start the job and wait for it to complete and get the result. 

977 

978 Args: 

979 retry (Optional[google.api_core.retry.Retry]): 

980 How to retry the RPC. If the job state is ``DONE``, retrying is aborted 

981 early, as the job will not change anymore. 

982 timeout (Optional[float]): 

983 The number of seconds to wait for the underlying HTTP transport 

984 before using ``retry``. 

985 If multiple requests are made under the hood, ``timeout`` 

986 applies to each individual request. 

987 

988 Returns: 

989 _AsyncJob: This instance. 

990 

991 Raises: 

992 google.cloud.exceptions.GoogleAPICallError: 

993 if the job failed. 

994 concurrent.futures.TimeoutError: 

995 if the job did not complete in the given timeout. 

996 """ 

997 if self.state is None: 

998 self._begin(retry=retry, timeout=timeout) 

999 

1000 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry} 

1001 return super(_AsyncJob, self).result(timeout=timeout, **kwargs) 

1002 

1003 def cancelled(self): 

1004 """Check if the job has been cancelled. 

1005 

1006 This always returns False. It's not possible to check if a job was 

1007 cancelled in the API. This method is here to satisfy the interface 

1008 for :class:`google.api_core.future.Future`. 

1009 

1010 Returns: 

1011 bool: False 

1012 """ 

1013 return ( 

1014 self.error_result is not None 

1015 and self.error_result.get("reason") == _STOPPED_REASON 

1016 ) 

1017 

1018 def __repr__(self): 

1019 result = ( 

1020 f"{self.__class__.__name__}<" 

1021 f"project={self.project}, location={self.location}, id={self.job_id}" 

1022 ">" 

1023 ) 

1024 return result 

1025 

1026 

1027class ScriptStackFrame(object): 

1028 """Stack frame showing the line/column/procedure name where the current 

1029 evaluation happened. 

1030 

1031 Args: 

1032 resource (Map[str, Any]): JSON representation of object. 

1033 """ 

1034 

1035 def __init__(self, resource): 

1036 self._properties = resource 

1037 

1038 @property 

1039 def procedure_id(self): 

1040 """Optional[str]: Name of the active procedure. 

1041 

1042 Omitted if in a top-level script. 

1043 """ 

1044 return self._properties.get("procedureId") 

1045 

1046 @property 

1047 def text(self): 

1048 """str: Text of the current statement/expression.""" 

1049 return self._properties.get("text") 

1050 

1051 @property 

1052 def start_line(self): 

1053 """int: One-based start line.""" 

1054 return _helpers._int_or_none(self._properties.get("startLine")) 

1055 

1056 @property 

1057 def start_column(self): 

1058 """int: One-based start column.""" 

1059 return _helpers._int_or_none(self._properties.get("startColumn")) 

1060 

1061 @property 

1062 def end_line(self): 

1063 """int: One-based end line.""" 

1064 return _helpers._int_or_none(self._properties.get("endLine")) 

1065 

1066 @property 

1067 def end_column(self): 

1068 """int: One-based end column.""" 

1069 return _helpers._int_or_none(self._properties.get("endColumn")) 

1070 

1071 

1072class ScriptStatistics(object): 

1073 """Statistics for a child job of a script. 

1074 

1075 Args: 

1076 resource (Map[str, Any]): JSON representation of object. 

1077 """ 

1078 

1079 def __init__(self, resource): 

1080 self._properties = resource 

1081 

1082 @property 

1083 def stack_frames(self) -> Sequence[ScriptStackFrame]: 

1084 """Stack trace where the current evaluation happened. 

1085 

1086 Shows line/column/procedure name of each frame on the stack at the 

1087 point where the current evaluation happened. 

1088 

1089 The leaf frame is first, the primary script is last. 

1090 """ 

1091 return [ 

1092 ScriptStackFrame(frame) for frame in self._properties.get("stackFrames", []) 

1093 ] 

1094 

1095 @property 

1096 def evaluation_kind(self) -> Optional[str]: 

1097 """str: Indicates the type of child job. 

1098 

1099 Possible values include ``STATEMENT`` and ``EXPRESSION``. 

1100 """ 

1101 return self._properties.get("evaluationKind") 

1102 

1103 

1104class SessionInfo: 

1105 """[Preview] Information of the session if this job is part of one. 

1106 

1107 .. versionadded:: 2.29.0 

1108 

1109 Args: 

1110 resource (Map[str, Any]): JSON representation of object. 

1111 """ 

1112 

1113 def __init__(self, resource): 

1114 self._properties = resource 

1115 

1116 @property 

1117 def session_id(self) -> Optional[str]: 

1118 """The ID of the session.""" 

1119 return self._properties.get("sessionId") 

1120 

1121 

1122class UnknownJob(_AsyncJob): 

1123 """A job whose type cannot be determined.""" 

1124 

1125 @classmethod 

1126 def from_api_repr(cls, resource: dict, client) -> "UnknownJob": 

1127 """Construct an UnknownJob from the JSON representation. 

1128 

1129 Args: 

1130 resource (Dict): JSON representation of a job. 

1131 client (google.cloud.bigquery.client.Client): 

1132 Client connected to BigQuery API. 

1133 

1134 Returns: 

1135 UnknownJob: Job corresponding to the resource. 

1136 """ 

1137 job_ref_properties = resource.get( 

1138 "jobReference", {"projectId": client.project, "jobId": None} 

1139 ) 

1140 job_ref = _JobReference._from_api_repr(job_ref_properties) 

1141 job = cls(job_ref, client) 

1142 # Populate the job reference with the project, even if it has been 

1143 # redacted, because we know it should equal that of the request. 

1144 resource["jobReference"] = job_ref_properties 

1145 job._properties = resource 

1146 return job