Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

378 statements  

1# Copyright 2015 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Base classes and helpers for job classes.""" 

16 

17from collections import namedtuple 

18import copy 

19import http 

20import threading 

21import typing 

22from typing import ClassVar, Dict, Optional, Sequence 

23 

24from google.api_core import retry as retries 

25from google.api_core import exceptions 

26import google.api_core.future.polling 

27 

28from google.cloud.bigquery import _helpers 

29from google.cloud.bigquery._helpers import _int_or_none 

30from google.cloud.bigquery.retry import ( 

31 DEFAULT_GET_JOB_TIMEOUT, 

32 DEFAULT_RETRY, 

33) 

34 

35 

36_DONE_STATE = "DONE" 

37_STOPPED_REASON = "stopped" 

38_ERROR_REASON_TO_EXCEPTION = { 

39 "accessDenied": http.client.FORBIDDEN, 

40 "backendError": http.client.INTERNAL_SERVER_ERROR, 

41 "billingNotEnabled": http.client.FORBIDDEN, 

42 "billingTierLimitExceeded": http.client.BAD_REQUEST, 

43 "blocked": http.client.FORBIDDEN, 

44 "duplicate": http.client.CONFLICT, 

45 "internalError": http.client.INTERNAL_SERVER_ERROR, 

46 "invalid": http.client.BAD_REQUEST, 

47 "invalidQuery": http.client.BAD_REQUEST, 

48 "notFound": http.client.NOT_FOUND, 

49 "notImplemented": http.client.NOT_IMPLEMENTED, 

50 "policyViolation": http.client.FORBIDDEN, 

51 "quotaExceeded": http.client.FORBIDDEN, 

52 "rateLimitExceeded": http.client.TOO_MANY_REQUESTS, 

53 "resourceInUse": http.client.BAD_REQUEST, 

54 "resourcesExceeded": http.client.BAD_REQUEST, 

55 "responseTooLarge": http.client.FORBIDDEN, 

56 "stopped": http.client.OK, 

57 "tableUnavailable": http.client.BAD_REQUEST, 

58} 

59 

60 

61def _error_result_to_exception(error_result, errors=None): 

62 """Maps BigQuery error reasons to an exception. 

63 

64 The reasons and their matching HTTP status codes are documented on 

65 the `troubleshooting errors`_ page. 

66 

67 .. _troubleshooting errors: https://cloud.google.com/bigquery\ 

68 /troubleshooting-errors 

69 

70 Args: 

71 error_result (Mapping[str, str]): The error result from BigQuery. 

72 errors (Union[Iterable[str], None]): The detailed error messages. 

73 

74 Returns: 

75 google.cloud.exceptions.GoogleAPICallError: The mapped exception. 

76 """ 

77 reason = error_result.get("reason") 

78 status_code = _ERROR_REASON_TO_EXCEPTION.get( 

79 reason, http.client.INTERNAL_SERVER_ERROR 

80 ) 

81 # Manually create error message to preserve both error_result and errors. 

82 # Can be removed once b/310544564 and b/318889899 are resolved. 

83 concatenated_errors = "" 

84 if errors: 

85 concatenated_errors = "; " 

86 for err in errors: 

87 concatenated_errors += ", ".join( 

88 [f"{key}: {value}" for key, value in err.items()] 

89 ) 

90 concatenated_errors += "; " 

91 

92 # strips off the last unneeded semicolon and space 

93 concatenated_errors = concatenated_errors[:-2] 

94 

95 error_message = error_result.get("message", "") + concatenated_errors 

96 

97 return exceptions.from_http_status( 

98 status_code, error_message, errors=[error_result] 

99 ) 

100 

101 

102ReservationUsage = namedtuple("ReservationUsage", "name slot_ms") 

103ReservationUsage.__doc__ = "Job resource usage for a reservation." 

104ReservationUsage.name.__doc__ = ( 

105 'Reservation name or "unreserved" for on-demand resources usage.' 

106) 

107ReservationUsage.slot_ms.__doc__ = ( 

108 "Total slot milliseconds used by the reservation for a particular job." 

109) 

110 

111 

112class TransactionInfo(typing.NamedTuple): 

113 """[Alpha] Information of a multi-statement transaction. 

114 

115 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo 

116 

117 .. versionadded:: 2.24.0 

118 """ 

119 

120 transaction_id: str 

121 """Output only. ID of the transaction.""" 

122 

123 @classmethod 

124 def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo": 

125 return cls(transaction_info["transactionId"]) 

126 

127 

128class _JobReference(object): 

129 """A reference to a job. 

130 

131 Args: 

132 job_id (str): ID of the job to run. 

133 project (str): ID of the project where the job runs. 

134 location (str): Location of where the job runs. 

135 """ 

136 

137 def __init__(self, job_id, project, location): 

138 self._properties = {"jobId": job_id, "projectId": project} 

139 # The location field must not be populated if it is None. 

140 if location: 

141 self._properties["location"] = location 

142 

143 @property 

144 def job_id(self): 

145 """str: ID of the job.""" 

146 return self._properties.get("jobId") 

147 

148 @property 

149 def project(self): 

150 """str: ID of the project where the job runs.""" 

151 return self._properties.get("projectId") 

152 

153 @property 

154 def location(self): 

155 """str: Location where the job runs.""" 

156 return self._properties.get("location") 

157 

158 def _to_api_repr(self): 

159 """Returns the API resource representation of the job reference.""" 

160 return copy.deepcopy(self._properties) 

161 

162 @classmethod 

163 def _from_api_repr(cls, resource): 

164 """Returns a job reference for an API resource representation.""" 

165 job_id = resource.get("jobId") 

166 project = resource.get("projectId") 

167 location = resource.get("location") 

168 job_ref = cls(job_id, project, location) 

169 return job_ref 

170 

171 

172class _JobConfig(object): 

173 """Abstract base class for job configuration objects. 

174 

175 Args: 

176 job_type (str): The key to use for the job configuration. 

177 """ 

178 

179 def __init__(self, job_type, **kwargs): 

180 self._job_type = job_type 

181 self._properties = {job_type: {}} 

182 for prop, val in kwargs.items(): 

183 setattr(self, prop, val) 

184 

185 def __setattr__(self, name, value): 

186 """Override to be able to raise error if an unknown property is being set""" 

187 if not name.startswith("_") and not hasattr(type(self), name): 

188 raise AttributeError( 

189 "Property {} is unknown for {}.".format(name, type(self)) 

190 ) 

191 super(_JobConfig, self).__setattr__(name, value) 

192 

193 @property 

194 def job_timeout_ms(self): 

195 """Optional parameter. Job timeout in milliseconds. If this time limit is exceeded, BigQuery might attempt to stop the job. 

196 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms 

197 e.g. 

198 

199 job_config = bigquery.QueryJobConfig( job_timeout_ms = 5000 ) 

200 or 

201 job_config.job_timeout_ms = 5000 

202 

203 Raises: 

204 ValueError: If ``value`` type is invalid. 

205 """ 

206 

207 # None as this is an optional parameter. 

208 if self._properties.get("jobTimeoutMs"): 

209 return self._properties["jobTimeoutMs"] 

210 return None 

211 

212 @job_timeout_ms.setter 

213 def job_timeout_ms(self, value): 

214 try: 

215 value = _int_or_none(value) 

216 except ValueError as err: 

217 raise ValueError("Pass an int for jobTimeoutMs, e.g. 5000").with_traceback( 

218 err.__traceback__ 

219 ) 

220 

221 if value is not None: 

222 # docs indicate a string is expected by the API 

223 self._properties["jobTimeoutMs"] = str(value) 

224 else: 

225 self._properties.pop("jobTimeoutMs", None) 

226 

227 @property 

228 def max_slots(self) -> Optional[int]: 

229 """The maximum rate of slot consumption to allow for this job. 

230 

231 If set, the number of slots used to execute the job will be throttled 

232 to try and keep its slot consumption below the requested rate. 

233 This feature is not generally available. 

234 """ 

235 

236 max_slots = self._properties.get("maxSlots") 

237 if max_slots is not None: 

238 if isinstance(max_slots, str): 

239 return int(max_slots) 

240 if isinstance(max_slots, int): 

241 return max_slots 

242 return None 

243 

244 @max_slots.setter 

245 def max_slots(self, value): 

246 try: 

247 value = _int_or_none(value) 

248 except ValueError as err: 

249 raise ValueError("Pass an int for max slots, e.g. 100").with_traceback( 

250 err.__traceback__ 

251 ) 

252 

253 if value is not None: 

254 self._properties["maxSlots"] = str(value) 

255 else: 

256 self._properties.pop("maxSlots", None) 

257 

258 @property 

259 def reservation(self): 

260 """str: Optional. The reservation that job would use. 

261 

262 User can specify a reservation to execute the job. If reservation is 

263 not set, reservation is determined based on the rules defined by the 

264 reservation assignments. The expected format is 

265 projects/{project}/locations/{location}/reservations/{reservation}. 

266 

267 Raises: 

268 ValueError: If ``value`` type is not None or of string type. 

269 """ 

270 return self._properties.setdefault("reservation", None) 

271 

272 @reservation.setter 

273 def reservation(self, value): 

274 if value and not isinstance(value, str): 

275 raise ValueError("Reservation must be None or a string.") 

276 self._properties["reservation"] = value 

277 

278 @property 

279 def labels(self): 

280 """Dict[str, str]: Labels for the job. 

281 

282 This method always returns a dict. Once a job has been created on the 

283 server, its labels cannot be modified anymore. 

284 

285 Raises: 

286 ValueError: If ``value`` type is invalid. 

287 """ 

288 return self._properties.setdefault("labels", {}) 

289 

290 @labels.setter 

291 def labels(self, value): 

292 if not isinstance(value, dict): 

293 raise ValueError("Pass a dict") 

294 self._properties["labels"] = value 

295 

296 def _get_sub_prop(self, key, default=None): 

297 """Get a value in the ``self._properties[self._job_type]`` dictionary. 

298 

299 Most job properties are inside the dictionary related to the job type 

300 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access 

301 those properties:: 

302 

303 self._get_sub_prop('destinationTable') 

304 

305 This is equivalent to using the ``_helpers._get_sub_prop`` function:: 

306 

307 _helpers._get_sub_prop( 

308 self._properties, ['query', 'destinationTable']) 

309 

310 Args: 

311 key (str): 

312 Key for the value to get in the 

313 ``self._properties[self._job_type]`` dictionary. 

314 default (Optional[object]): 

315 Default value to return if the key is not found. 

316 Defaults to :data:`None`. 

317 

318 Returns: 

319 object: The value if present or the default. 

320 """ 

321 return _helpers._get_sub_prop( 

322 self._properties, [self._job_type, key], default=default 

323 ) 

324 

325 def _set_sub_prop(self, key, value): 

326 """Set a value in the ``self._properties[self._job_type]`` dictionary. 

327 

328 Most job properties are inside the dictionary related to the job type 

329 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set 

330 those properties:: 

331 

332 self._set_sub_prop('useLegacySql', False) 

333 

334 This is equivalent to using the ``_helper._set_sub_prop`` function:: 

335 

336 _helper._set_sub_prop( 

337 self._properties, ['query', 'useLegacySql'], False) 

338 

339 Args: 

340 key (str): 

341 Key to set in the ``self._properties[self._job_type]`` 

342 dictionary. 

343 value (object): Value to set. 

344 """ 

345 _helpers._set_sub_prop(self._properties, [self._job_type, key], value) 

346 

347 def _del_sub_prop(self, key): 

348 """Remove ``key`` from the ``self._properties[self._job_type]`` dict. 

349 

350 Most job properties are inside the dictionary related to the job type 

351 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear 

352 those properties:: 

353 

354 self._del_sub_prop('useLegacySql') 

355 

356 This is equivalent to using the ``_helper._del_sub_prop`` function:: 

357 

358 _helper._del_sub_prop( 

359 self._properties, ['query', 'useLegacySql']) 

360 

361 Args: 

362 key (str): 

363 Key to remove in the ``self._properties[self._job_type]`` 

364 dictionary. 

365 """ 

366 _helpers._del_sub_prop(self._properties, [self._job_type, key]) 

367 

368 def to_api_repr(self) -> dict: 

369 """Build an API representation of the job config. 

370 

371 Returns: 

372 Dict: A dictionary in the format used by the BigQuery API. 

373 """ 

374 return copy.deepcopy(self._properties) 

375 

376 def _fill_from_default(self, default_job_config=None): 

377 """Merge this job config with a default job config. 

378 

379 The keys in this object take precedence over the keys in the default 

380 config. The merge is done at the top-level as well as for keys one 

381 level below the job type. 

382 

383 Args: 

384 default_job_config (google.cloud.bigquery.job._JobConfig): 

385 The default job config that will be used to fill in self. 

386 

387 Returns: 

388 google.cloud.bigquery.job._JobConfig: A new (merged) job config. 

389 """ 

390 if not default_job_config: 

391 new_job_config = copy.deepcopy(self) 

392 return new_job_config 

393 

394 if self._job_type != default_job_config._job_type: 

395 raise TypeError( 

396 "attempted to merge two incompatible job types: " 

397 + repr(self._job_type) 

398 + ", " 

399 + repr(default_job_config._job_type) 

400 ) 

401 

402 # cls is one of the job config subclasses that provides the job_type argument to 

403 # this base class on instantiation, thus missing-parameter warning is a false 

404 # positive here. 

405 new_job_config = self.__class__() # pytype: disable=missing-parameter 

406 

407 default_job_properties = copy.deepcopy(default_job_config._properties) 

408 for key in self._properties: 

409 if key != self._job_type: 

410 default_job_properties[key] = self._properties[key] 

411 

412 default_job_properties[self._job_type].update(self._properties[self._job_type]) 

413 new_job_config._properties = default_job_properties 

414 

415 return new_job_config 

416 

417 @classmethod 

418 def from_api_repr(cls, resource: dict) -> "_JobConfig": 

419 """Factory: construct a job configuration given its API representation 

420 

421 Args: 

422 resource (Dict): 

423 A job configuration in the same representation as is returned 

424 from the API. 

425 

426 Returns: 

427 google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``. 

428 """ 

429 # cls is one of the job config subclasses that provides the job_type argument to 

430 # this base class on instantiation, thus missing-parameter warning is a false 

431 # positive here. 

432 job_config = cls() # type: ignore # pytype: disable=missing-parameter 

433 job_config._properties = resource 

434 return job_config 

435 

436 

437class _AsyncJob(google.api_core.future.polling.PollingFuture): 

438 """Base class for asynchronous jobs. 

439 

440 Args: 

441 job_id (Union[str, _JobReference]): 

442 Job's ID in the project associated with the client or a 

443 fully-qualified job reference. 

444 client (google.cloud.bigquery.client.Client): 

445 Client which holds credentials and project configuration. 

446 """ 

447 

448 _JOB_TYPE = "unknown" 

449 _CONFIG_CLASS: ClassVar 

450 

451 def __init__(self, job_id, client): 

452 super(_AsyncJob, self).__init__() 

453 

454 # The job reference can be either a plain job ID or the full resource. 

455 # Populate the properties dictionary consistently depending on what has 

456 # been passed in. 

457 job_ref = job_id 

458 if not isinstance(job_id, _JobReference): 

459 job_ref = _JobReference(job_id, client.project, None) 

460 self._properties = {"jobReference": job_ref._to_api_repr()} 

461 

462 self._client = client 

463 self._result_set = False 

464 self._completion_lock = threading.Lock() 

465 

466 @property 

467 def configuration(self) -> _JobConfig: 

468 """Job-type specific configurtion.""" 

469 configuration: _JobConfig = self._CONFIG_CLASS() # pytype: disable=not-callable 

470 configuration._properties = self._properties.setdefault("configuration", {}) 

471 return configuration 

472 

473 @property 

474 def job_id(self): 

475 """str: ID of the job.""" 

476 return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"]) 

477 

478 @property 

479 def parent_job_id(self): 

480 """Return the ID of the parent job. 

481 

482 See: 

483 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id 

484 

485 Returns: 

486 Optional[str]: parent job id. 

487 """ 

488 return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"]) 

489 

490 @property 

491 def script_statistics(self) -> Optional["ScriptStatistics"]: 

492 """Statistics for a child job of a script.""" 

493 resource = _helpers._get_sub_prop( 

494 self._properties, ["statistics", "scriptStatistics"] 

495 ) 

496 if resource is None: 

497 return None 

498 return ScriptStatistics(resource) 

499 

500 @property 

501 def session_info(self) -> Optional["SessionInfo"]: 

502 """[Preview] Information of the session if this job is part of one. 

503 

504 .. versionadded:: 2.29.0 

505 """ 

506 resource = _helpers._get_sub_prop( 

507 self._properties, ["statistics", "sessionInfo"] 

508 ) 

509 if resource is None: 

510 return None 

511 return SessionInfo(resource) 

512 

513 @property 

514 def num_child_jobs(self): 

515 """The number of child jobs executed. 

516 

517 See: 

518 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs 

519 

520 Returns: 

521 int 

522 """ 

523 count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"]) 

524 return int(count) if count is not None else 0 

525 

526 @property 

527 def project(self): 

528 """Project bound to the job. 

529 

530 Returns: 

531 str: the project (derived from the client). 

532 """ 

533 return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"]) 

534 

535 @property 

536 def location(self): 

537 """str: Location where the job runs.""" 

538 return _helpers._get_sub_prop(self._properties, ["jobReference", "location"]) 

539 

540 @property 

541 def reservation_id(self): 

542 """str: Name of the primary reservation assigned to this job. 

543 

544 Note that this could be different than reservations reported in 

545 the reservation field if parent reservations were used to execute 

546 this job. 

547 """ 

548 return _helpers._get_sub_prop( 

549 self._properties, ["statistics", "reservation_id"] 

550 ) 

551 

552 def _require_client(self, client): 

553 """Check client or verify over-ride. 

554 

555 Args: 

556 client (Optional[google.cloud.bigquery.client.Client]): 

557 the client to use. If not passed, falls back to the 

558 ``client`` stored on the current dataset. 

559 

560 Returns: 

561 google.cloud.bigquery.client.Client: 

562 The client passed in or the currently bound client. 

563 """ 

564 if client is None: 

565 client = self._client 

566 return client 

567 

568 @property 

569 def job_type(self): 

570 """Type of job. 

571 

572 Returns: 

573 str: one of 'load', 'copy', 'extract', 'query'. 

574 """ 

575 return self._JOB_TYPE 

576 

577 @property 

578 def path(self): 

579 """URL path for the job's APIs. 

580 

581 Returns: 

582 str: the path based on project and job ID. 

583 """ 

584 return "/projects/%s/jobs/%s" % (self.project, self.job_id) 

585 

586 @property 

587 def labels(self): 

588 """Dict[str, str]: Labels for the job.""" 

589 return self._properties.setdefault("configuration", {}).setdefault("labels", {}) 

590 

591 @property 

592 def etag(self): 

593 """ETag for the job resource. 

594 

595 Returns: 

596 Optional[str]: the ETag (None until set from the server). 

597 """ 

598 return self._properties.get("etag") 

599 

600 @property 

601 def self_link(self): 

602 """URL for the job resource. 

603 

604 Returns: 

605 Optional[str]: the URL (None until set from the server). 

606 """ 

607 return self._properties.get("selfLink") 

608 

609 @property 

610 def user_email(self): 

611 """E-mail address of user who submitted the job. 

612 

613 Returns: 

614 Optional[str]: the URL (None until set from the server). 

615 """ 

616 return self._properties.get("user_email") 

617 

618 @property 

619 def created(self): 

620 """Datetime at which the job was created. 

621 

622 Returns: 

623 Optional[datetime.datetime]: 

624 the creation time (None until set from the server). 

625 """ 

626 millis = _helpers._get_sub_prop( 

627 self._properties, ["statistics", "creationTime"] 

628 ) 

629 if millis is not None: 

630 return _helpers._datetime_from_microseconds(millis * 1000.0) 

631 

632 @property 

633 def started(self): 

634 """Datetime at which the job was started. 

635 

636 Returns: 

637 Optional[datetime.datetime]: 

638 the start time (None until set from the server). 

639 """ 

640 millis = _helpers._get_sub_prop(self._properties, ["statistics", "startTime"]) 

641 if millis is not None: 

642 return _helpers._datetime_from_microseconds(millis * 1000.0) 

643 

644 @property 

645 def ended(self): 

646 """Datetime at which the job finished. 

647 

648 Returns: 

649 Optional[datetime.datetime]: 

650 the end time (None until set from the server). 

651 """ 

652 millis = _helpers._get_sub_prop(self._properties, ["statistics", "endTime"]) 

653 if millis is not None: 

654 return _helpers._datetime_from_microseconds(millis * 1000.0) 

655 

656 def _job_statistics(self): 

657 """Helper for job-type specific statistics-based properties.""" 

658 statistics = self._properties.get("statistics", {}) 

659 return statistics.get(self._JOB_TYPE, {}) 

660 

661 @property 

662 def reservation_usage(self): 

663 """Job resource usage breakdown by reservation. 

664 

665 Returns: 

666 List[google.cloud.bigquery.job.ReservationUsage]: 

667 Reservation usage stats. Can be empty if not set from the server. 

668 """ 

669 usage_stats_raw = _helpers._get_sub_prop( 

670 self._properties, ["statistics", "reservationUsage"], default=() 

671 ) 

672 return [ 

673 ReservationUsage(name=usage["name"], slot_ms=int(usage["slotMs"])) 

674 for usage in usage_stats_raw 

675 ] 

676 

677 @property 

678 def transaction_info(self) -> Optional[TransactionInfo]: 

679 """Information of the multi-statement transaction if this job is part of one. 

680 

681 Since a scripting query job can execute multiple transactions, this 

682 property is only expected on child jobs. Use the 

683 :meth:`google.cloud.bigquery.client.Client.list_jobs` method with the 

684 ``parent_job`` parameter to iterate over child jobs. 

685 

686 .. versionadded:: 2.24.0 

687 """ 

688 info = self._properties.get("statistics", {}).get("transactionInfo") 

689 if info is None: 

690 return None 

691 else: 

692 return TransactionInfo.from_api_repr(info) 

693 

694 @property 

695 def error_result(self): 

696 """Output only. Final error result of the job. 

697 

698 If present, indicates that the job has completed and was unsuccessful. 

699 

700 See: 

701 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result 

702 

703 Returns: 

704 Optional[Mapping]: the error information (None until set from the server). 

705 """ 

706 status = self._properties.get("status") 

707 if status is not None: 

708 return status.get("errorResult") 

709 

710 @property 

711 def errors(self): 

712 """Output only. The first errors encountered during the running of the job. 

713 

714 The final message includes the number of errors that caused the process to stop. 

715 Errors here do not necessarily mean that the job has not completed or was unsuccessful. 

716 

717 See: 

718 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.errors 

719 

720 Returns: 

721 Optional[List[Mapping]]: 

722 the error information (None until set from the server). 

723 """ 

724 status = self._properties.get("status") 

725 if status is not None: 

726 return status.get("errors") 

727 

728 @property 

729 def state(self): 

730 """Output only. Running state of the job. 

731 

732 Valid states include 'PENDING', 'RUNNING', and 'DONE'. 

733 

734 See: 

735 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.state 

736 

737 Returns: 

738 Optional[str]: 

739 the state (None until set from the server). 

740 """ 

741 status = self._properties.get("status", {}) 

742 return status.get("state") 

743 

744 def _set_properties(self, api_response): 

745 """Update properties from resource in body of ``api_response`` 

746 

747 Args: 

748 api_response (Dict): response returned from an API call. 

749 """ 

750 cleaned = api_response.copy() 

751 statistics = cleaned.setdefault("statistics", {}) 

752 if "creationTime" in statistics: 

753 statistics["creationTime"] = float(statistics["creationTime"]) 

754 if "startTime" in statistics: 

755 statistics["startTime"] = float(statistics["startTime"]) 

756 if "endTime" in statistics: 

757 statistics["endTime"] = float(statistics["endTime"]) 

758 

759 self._properties = cleaned 

760 

761 # For Future interface 

762 self._set_future_result() 

763 

764 @classmethod 

765 def _check_resource_config(cls, resource): 

766 """Helper for :meth:`from_api_repr` 

767 

768 Args: 

769 resource (Dict): resource for the job. 

770 

771 Raises: 

772 KeyError: 

773 If the resource has no identifier, or 

774 is missing the appropriate configuration. 

775 """ 

776 if "jobReference" not in resource or "jobId" not in resource["jobReference"]: 

777 raise KeyError( 

778 "Resource lacks required identity information: " 

779 '["jobReference"]["jobId"]' 

780 ) 

781 if ( 

782 "configuration" not in resource 

783 or cls._JOB_TYPE not in resource["configuration"] 

784 ): 

785 raise KeyError( 

786 "Resource lacks required configuration: " 

787 '["configuration"]["%s"]' % cls._JOB_TYPE 

788 ) 

789 

790 def to_api_repr(self): 

791 """Generate a resource for the job.""" 

792 return copy.deepcopy(self._properties) 

793 

794 _build_resource = to_api_repr # backward-compatibility alias 

795 

796 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): 

797 """API call: begin the job via a POST request 

798 

799 See 

800 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert 

801 

802 Args: 

803 client (Optional[google.cloud.bigquery.client.Client]): 

804 The client to use. If not passed, falls back to the ``client`` 

805 associated with the job object or``NoneType`` 

806 retry (Optional[google.api_core.retry.Retry]): 

807 How to retry the RPC. 

808 timeout (Optional[float]): 

809 The number of seconds to wait for the underlying HTTP transport 

810 before using ``retry``. 

811 

812 Raises: 

813 ValueError: 

814 If the job has already begun. 

815 """ 

816 if self.state is not None: 

817 raise ValueError("Job already begun.") 

818 

819 client = self._require_client(client) 

820 path = "/projects/%s/jobs" % (self.project,) 

821 

822 # jobs.insert is idempotent because we ensure that every new 

823 # job has an ID. 

824 span_attributes = {"path": path} 

825 api_response = client._call_api( 

826 retry, 

827 span_name="BigQuery.job.begin", 

828 span_attributes=span_attributes, 

829 job_ref=self, 

830 method="POST", 

831 path=path, 

832 data=self.to_api_repr(), 

833 timeout=timeout, 

834 ) 

835 self._set_properties(api_response) 

836 

837 def exists( 

838 self, 

839 client=None, 

840 retry: "retries.Retry" = DEFAULT_RETRY, 

841 timeout: Optional[float] = None, 

842 ) -> bool: 

843 """API call: test for the existence of the job via a GET request 

844 

845 See 

846 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get 

847 

848 Args: 

849 client (Optional[google.cloud.bigquery.client.Client]): 

850 the client to use. If not passed, falls back to the 

851 ``client`` stored on the current dataset. 

852 

853 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

854 timeout (Optional[float]): 

855 The number of seconds to wait for the underlying HTTP transport 

856 before using ``retry``. 

857 

858 Returns: 

859 bool: Boolean indicating existence of the job. 

860 """ 

861 client = self._require_client(client) 

862 

863 extra_params = {"fields": "id"} 

864 if self.location: 

865 extra_params["location"] = self.location 

866 

867 try: 

868 span_attributes = {"path": self.path} 

869 

870 client._call_api( 

871 retry, 

872 span_name="BigQuery.job.exists", 

873 span_attributes=span_attributes, 

874 job_ref=self, 

875 method="GET", 

876 path=self.path, 

877 query_params=extra_params, 

878 timeout=timeout, 

879 ) 

880 except exceptions.NotFound: 

881 return False 

882 else: 

883 return True 

884 

885 def reload( 

886 self, 

887 client=None, 

888 retry: "retries.Retry" = DEFAULT_RETRY, 

889 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT, 

890 ): 

891 """API call: refresh job properties via a GET request. 

892 

893 See 

894 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get 

895 

896 Args: 

897 client (Optional[google.cloud.bigquery.client.Client]): 

898 the client to use. If not passed, falls back to the 

899 ``client`` stored on the current dataset. 

900 

901 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

902 timeout (Optional[float]): 

903 The number of seconds to wait for the underlying HTTP transport 

904 before using ``retry``. 

905 """ 

906 client = self._require_client(client) 

907 

908 got_job = client.get_job( 

909 self, 

910 project=self.project, 

911 location=self.location, 

912 retry=retry, 

913 timeout=timeout, 

914 ) 

915 self._set_properties(got_job._properties) 

916 

917 def cancel( 

918 self, 

919 client=None, 

920 retry: Optional[retries.Retry] = DEFAULT_RETRY, 

921 timeout: Optional[float] = None, 

922 ) -> bool: 

923 """API call: cancel job via a POST request 

924 

925 See 

926 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel 

927 

928 Args: 

929 client (Optional[google.cloud.bigquery.client.Client]): 

930 the client to use. If not passed, falls back to the 

931 ``client`` stored on the current dataset. 

932 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

933 timeout (Optional[float]): 

934 The number of seconds to wait for the underlying HTTP transport 

935 before using ``retry`` 

936 

937 Returns: 

938 bool: Boolean indicating that the cancel request was sent. 

939 """ 

940 client = self._require_client(client) 

941 

942 extra_params = {} 

943 if self.location: 

944 extra_params["location"] = self.location 

945 

946 path = "{}/cancel".format(self.path) 

947 span_attributes = {"path": path} 

948 

949 api_response = client._call_api( 

950 retry, 

951 span_name="BigQuery.job.cancel", 

952 span_attributes=span_attributes, 

953 job_ref=self, 

954 method="POST", 

955 path=path, 

956 query_params=extra_params, 

957 timeout=timeout, 

958 ) 

959 self._set_properties(api_response["job"]) 

960 # The Future interface requires that we return True if the *attempt* 

961 # to cancel was successful. 

962 return True 

963 

964 # The following methods implement the PollingFuture interface. Note that 

965 # the methods above are from the pre-Future interface and are left for 

966 # compatibility. The only "overloaded" method is :meth:`cancel`, which 

967 # satisfies both interfaces. 

968 

969 def _set_future_result(self): 

970 """Set the result or exception from the job if it is complete.""" 

971 # This must be done in a lock to prevent the polling thread 

972 # and main thread from both executing the completion logic 

973 # at the same time. 

974 with self._completion_lock: 

975 # If the operation isn't complete or if the result has already been 

976 # set, do not call set_result/set_exception again. 

977 # Note: self._result_set is set to True in set_result and 

978 # set_exception, in case those methods are invoked directly. 

979 if not self.done(reload=False) or self._result_set: 

980 return 

981 

982 if self.error_result is not None: 

983 exception = _error_result_to_exception( 

984 self.error_result, self.errors or () 

985 ) 

986 self.set_exception(exception) 

987 else: 

988 self.set_result(self) 

989 

990 def done( 

991 self, 

992 retry: "retries.Retry" = DEFAULT_RETRY, 

993 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT, 

994 reload: bool = True, 

995 ) -> bool: 

996 """Checks if the job is complete. 

997 

998 Args: 

999 retry (Optional[google.api_core.retry.Retry]): 

1000 How to retry the RPC. If the job state is ``DONE``, retrying is aborted 

1001 early, as the job will not change anymore. 

1002 timeout (Optional[float]): 

1003 The number of seconds to wait for the underlying HTTP transport 

1004 before using ``retry``. 

1005 reload (Optional[bool]): 

1006 If ``True``, make an API call to refresh the job state of 

1007 unfinished jobs before checking. Default ``True``. 

1008 

1009 Returns: 

1010 bool: True if the job is complete, False otherwise. 

1011 """ 

1012 # Do not refresh is the state is already done, as the job will not 

1013 # change once complete. 

1014 if self.state != _DONE_STATE and reload: 

1015 self.reload(retry=retry, timeout=timeout) 

1016 return self.state == _DONE_STATE 

1017 

1018 def result( # type: ignore # (incompatible with supertype) 

1019 self, 

1020 retry: Optional[retries.Retry] = DEFAULT_RETRY, 

1021 timeout: Optional[float] = None, 

1022 ) -> "_AsyncJob": 

1023 """Start the job and wait for it to complete and get the result. 

1024 

1025 Args: 

1026 retry (Optional[google.api_core.retry.Retry]): 

1027 How to retry the RPC. If the job state is ``DONE``, retrying is aborted 

1028 early, as the job will not change anymore. 

1029 timeout (Optional[float]): 

1030 The number of seconds to wait for the underlying HTTP transport 

1031 before using ``retry``. 

1032 If multiple requests are made under the hood, ``timeout`` 

1033 applies to each individual request. 

1034 

1035 Returns: 

1036 _AsyncJob: This instance. 

1037 

1038 Raises: 

1039 google.cloud.exceptions.GoogleAPICallError: 

1040 if the job failed. 

1041 concurrent.futures.TimeoutError: 

1042 if the job did not complete in the given timeout. 

1043 """ 

1044 if self.state is None: 

1045 self._begin(retry=retry, timeout=timeout) 

1046 

1047 return super(_AsyncJob, self).result(timeout=timeout, retry=retry) 

1048 

1049 def cancelled(self): 

1050 """Check if the job has been cancelled. 

1051 

1052 This always returns False. It's not possible to check if a job was 

1053 cancelled in the API. This method is here to satisfy the interface 

1054 for :class:`google.api_core.future.Future`. 

1055 

1056 Returns: 

1057 bool: False 

1058 """ 

1059 return ( 

1060 self.error_result is not None 

1061 and self.error_result.get("reason") == _STOPPED_REASON 

1062 ) 

1063 

1064 def __repr__(self): 

1065 result = ( 

1066 f"{self.__class__.__name__}<" 

1067 f"project={self.project}, location={self.location}, id={self.job_id}" 

1068 ">" 

1069 ) 

1070 return result 

1071 

1072 

1073class ScriptStackFrame(object): 

1074 """Stack frame showing the line/column/procedure name where the current 

1075 evaluation happened. 

1076 

1077 Args: 

1078 resource (Map[str, Any]): JSON representation of object. 

1079 """ 

1080 

1081 def __init__(self, resource): 

1082 self._properties = resource 

1083 

1084 @property 

1085 def procedure_id(self): 

1086 """Optional[str]: Name of the active procedure. 

1087 

1088 Omitted if in a top-level script. 

1089 """ 

1090 return self._properties.get("procedureId") 

1091 

1092 @property 

1093 def text(self): 

1094 """str: Text of the current statement/expression.""" 

1095 return self._properties.get("text") 

1096 

1097 @property 

1098 def start_line(self): 

1099 """int: One-based start line.""" 

1100 return _helpers._int_or_none(self._properties.get("startLine")) 

1101 

1102 @property 

1103 def start_column(self): 

1104 """int: One-based start column.""" 

1105 return _helpers._int_or_none(self._properties.get("startColumn")) 

1106 

1107 @property 

1108 def end_line(self): 

1109 """int: One-based end line.""" 

1110 return _helpers._int_or_none(self._properties.get("endLine")) 

1111 

1112 @property 

1113 def end_column(self): 

1114 """int: One-based end column.""" 

1115 return _helpers._int_or_none(self._properties.get("endColumn")) 

1116 

1117 

1118class ScriptStatistics(object): 

1119 """Statistics for a child job of a script. 

1120 

1121 Args: 

1122 resource (Map[str, Any]): JSON representation of object. 

1123 """ 

1124 

1125 def __init__(self, resource): 

1126 self._properties = resource 

1127 

1128 @property 

1129 def stack_frames(self) -> Sequence[ScriptStackFrame]: 

1130 """Stack trace where the current evaluation happened. 

1131 

1132 Shows line/column/procedure name of each frame on the stack at the 

1133 point where the current evaluation happened. 

1134 

1135 The leaf frame is first, the primary script is last. 

1136 """ 

1137 return [ 

1138 ScriptStackFrame(frame) for frame in self._properties.get("stackFrames", []) 

1139 ] 

1140 

1141 @property 

1142 def evaluation_kind(self) -> Optional[str]: 

1143 """str: Indicates the type of child job. 

1144 

1145 Possible values include ``STATEMENT`` and ``EXPRESSION``. 

1146 """ 

1147 return self._properties.get("evaluationKind") 

1148 

1149 

1150class SessionInfo: 

1151 """[Preview] Information of the session if this job is part of one. 

1152 

1153 .. versionadded:: 2.29.0 

1154 

1155 Args: 

1156 resource (Map[str, Any]): JSON representation of object. 

1157 """ 

1158 

1159 def __init__(self, resource): 

1160 self._properties = resource 

1161 

1162 @property 

1163 def session_id(self) -> Optional[str]: 

1164 """The ID of the session.""" 

1165 return self._properties.get("sessionId") 

1166 

1167 

1168class UnknownJob(_AsyncJob): 

1169 """A job whose type cannot be determined.""" 

1170 

1171 @classmethod 

1172 def from_api_repr(cls, resource: dict, client) -> "UnknownJob": 

1173 """Construct an UnknownJob from the JSON representation. 

1174 

1175 Args: 

1176 resource (Dict): JSON representation of a job. 

1177 client (google.cloud.bigquery.client.Client): 

1178 Client connected to BigQuery API. 

1179 

1180 Returns: 

1181 UnknownJob: Job corresponding to the resource. 

1182 """ 

1183 job_ref_properties = resource.get( 

1184 "jobReference", {"projectId": client.project, "jobId": None} 

1185 ) 

1186 job_ref = _JobReference._from_api_repr(job_ref_properties) 

1187 job = cls(job_ref, client) 

1188 # Populate the job reference with the project, even if it has been 

1189 # redacted, because we know it should equal that of the request. 

1190 resource["jobReference"] = job_ref_properties 

1191 job._properties = resource 

1192 return job