Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

379 statements  

1# Copyright 2015 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Base classes and helpers for job classes.""" 

16 

17from collections import namedtuple 

18import copy 

19import http 

20import threading 

21import typing 

22from typing import ClassVar, Dict, Optional, Sequence 

23 

24from google.api_core import retry as retries 

25from google.api_core import exceptions 

26import google.api_core.future.polling 

27 

28from google.cloud.bigquery import _helpers 

29from google.cloud.bigquery._helpers import _int_or_none 

30from google.cloud.bigquery.retry import ( 

31 DEFAULT_GET_JOB_TIMEOUT, 

32 DEFAULT_RETRY, 

33) 

34 

35 

36_DONE_STATE = "DONE" 

37_STOPPED_REASON = "stopped" 

38_ERROR_REASON_TO_EXCEPTION = { 

39 "accessDenied": http.client.FORBIDDEN, 

40 "backendError": http.client.INTERNAL_SERVER_ERROR, 

41 "billingNotEnabled": http.client.FORBIDDEN, 

42 "billingTierLimitExceeded": http.client.BAD_REQUEST, 

43 "blocked": http.client.FORBIDDEN, 

44 "duplicate": http.client.CONFLICT, 

45 "internalError": http.client.INTERNAL_SERVER_ERROR, 

46 "invalid": http.client.BAD_REQUEST, 

47 "invalidQuery": http.client.BAD_REQUEST, 

48 "notFound": http.client.NOT_FOUND, 

49 "notImplemented": http.client.NOT_IMPLEMENTED, 

50 "policyViolation": http.client.FORBIDDEN, 

51 "quotaExceeded": http.client.FORBIDDEN, 

52 "rateLimitExceeded": http.client.TOO_MANY_REQUESTS, 

53 "resourceInUse": http.client.BAD_REQUEST, 

54 "resourcesExceeded": http.client.BAD_REQUEST, 

55 "responseTooLarge": http.client.FORBIDDEN, 

56 "stopped": http.client.OK, 

57 "tableUnavailable": http.client.BAD_REQUEST, 

58} 

59 

60 

61def _error_result_to_exception(error_result, errors=None): 

62 """Maps BigQuery error reasons to an exception. 

63 

64 The reasons and their matching HTTP status codes are documented on 

65 the `troubleshooting errors`_ page. 

66 

67 .. _troubleshooting errors: https://cloud.google.com/bigquery\ 

68 /troubleshooting-errors 

69 

70 Args: 

71 error_result (Mapping[str, str]): The error result from BigQuery. 

72 errors (Union[Iterable[str], None]): The detailed error messages. 

73 

74 Returns: 

75 google.cloud.exceptions.GoogleAPICallError: The mapped exception. 

76 """ 

77 reason = error_result.get("reason") 

78 status_code = _ERROR_REASON_TO_EXCEPTION.get( 

79 reason, http.client.INTERNAL_SERVER_ERROR 

80 ) 

81 # Manually create error message to preserve both error_result and errors. 

82 # Can be removed once b/310544564 and b/318889899 are resolved. 

83 concatenated_errors = "" 

84 if errors: 

85 concatenated_errors = "; " 

86 for err in errors: 

87 concatenated_errors += ", ".join( 

88 [f"{key}: {value}" for key, value in err.items()] 

89 ) 

90 concatenated_errors += "; " 

91 

92 # strips off the last unneeded semicolon and space 

93 concatenated_errors = concatenated_errors[:-2] 

94 

95 error_message = error_result.get("message", "") + concatenated_errors 

96 

97 return exceptions.from_http_status( 

98 status_code, error_message, errors=[error_result] 

99 ) 

100 

101 

102ReservationUsage = namedtuple("ReservationUsage", "name slot_ms") 

103ReservationUsage.__doc__ = "Job resource usage for a reservation." 

104ReservationUsage.name.__doc__ = ( 

105 'Reservation name or "unreserved" for on-demand resources usage.' 

106) 

107ReservationUsage.slot_ms.__doc__ = ( 

108 "Total slot milliseconds used by the reservation for a particular job." 

109) 

110 

111 

112class TransactionInfo(typing.NamedTuple): 

113 """[Alpha] Information of a multi-statement transaction. 

114 

115 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo 

116 

117 .. versionadded:: 2.24.0 

118 """ 

119 

120 transaction_id: str 

121 """Output only. ID of the transaction.""" 

122 

123 @classmethod 

124 def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo": 

125 return cls(transaction_info["transactionId"]) 

126 

127 

128class _JobReference(object): 

129 """A reference to a job. 

130 

131 Args: 

132 job_id (str): ID of the job to run. 

133 project (str): ID of the project where the job runs. 

134 location (str): Location of where the job runs. 

135 """ 

136 

137 def __init__(self, job_id, project, location): 

138 self._properties = {"jobId": job_id, "projectId": project} 

139 # The location field must not be populated if it is None. 

140 if location: 

141 self._properties["location"] = location 

142 

143 @property 

144 def job_id(self): 

145 """str: ID of the job.""" 

146 return self._properties.get("jobId") 

147 

148 @property 

149 def project(self): 

150 """str: ID of the project where the job runs.""" 

151 return self._properties.get("projectId") 

152 

153 @property 

154 def location(self): 

155 """str: Location where the job runs.""" 

156 return self._properties.get("location") 

157 

158 def _to_api_repr(self): 

159 """Returns the API resource representation of the job reference.""" 

160 return copy.deepcopy(self._properties) 

161 

162 @classmethod 

163 def _from_api_repr(cls, resource): 

164 """Returns a job reference for an API resource representation.""" 

165 job_id = resource.get("jobId") 

166 project = resource.get("projectId") 

167 location = resource.get("location") 

168 job_ref = cls(job_id, project, location) 

169 return job_ref 

170 

171 

172class _JobConfig(object): 

173 """Abstract base class for job configuration objects. 

174 

175 Args: 

176 job_type (str): The key to use for the job configuration. 

177 """ 

178 

179 def __init__(self, job_type, **kwargs): 

180 self._job_type = job_type 

181 self._properties = {job_type: {}} 

182 for prop, val in kwargs.items(): 

183 setattr(self, prop, val) 

184 

185 def __setattr__(self, name, value): 

186 """Override to be able to raise error if an unknown property is being set""" 

187 if not name.startswith("_") and not hasattr(type(self), name): 

188 raise AttributeError( 

189 "Property {} is unknown for {}.".format(name, type(self)) 

190 ) 

191 super(_JobConfig, self).__setattr__(name, value) 

192 

193 @property 

194 def job_timeout_ms(self): 

195 """Optional parameter. Job timeout in milliseconds. If this time limit is exceeded, BigQuery might attempt to stop the job. 

196 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms 

197 e.g. 

198 

199 job_config = bigquery.QueryJobConfig( job_timeout_ms = 5000 ) 

200 or 

201 job_config.job_timeout_ms = 5000 

202 

203 Raises: 

204 ValueError: If ``value`` type is invalid. 

205 """ 

206 

207 # None as this is an optional parameter. 

208 if self._properties.get("jobTimeoutMs"): 

209 return self._properties["jobTimeoutMs"] 

210 return None 

211 

212 @job_timeout_ms.setter 

213 def job_timeout_ms(self, value): 

214 try: 

215 value = _int_or_none(value) 

216 except ValueError as err: 

217 raise ValueError("Pass an int for jobTimeoutMs, e.g. 5000").with_traceback( 

218 err.__traceback__ 

219 ) 

220 

221 if value is not None: 

222 # docs indicate a string is expected by the API 

223 self._properties["jobTimeoutMs"] = str(value) 

224 else: 

225 self._properties.pop("jobTimeoutMs", None) 

226 

227 @property 

228 def max_slots(self) -> Optional[int]: 

229 """The maximum rate of slot consumption to allow for this job. 

230 

231 If set, the number of slots used to execute the job will be throttled 

232 to try and keep its slot consumption below the requested rate. 

233 This feature is not generally available. 

234 """ 

235 

236 max_slots = self._properties.get("maxSlots") 

237 if max_slots is not None: 

238 if isinstance(max_slots, str): 

239 return int(max_slots) 

240 if isinstance(max_slots, int): 

241 return max_slots 

242 return None 

243 

244 @max_slots.setter 

245 def max_slots(self, value): 

246 try: 

247 value = _int_or_none(value) 

248 except ValueError as err: 

249 raise ValueError("Pass an int for max slots, e.g. 100").with_traceback( 

250 err.__traceback__ 

251 ) 

252 

253 if value is not None: 

254 self._properties["maxSlots"] = str(value) 

255 else: 

256 self._properties.pop("maxSlots", None) 

257 

258 @property 

259 def reservation(self): 

260 """str: Optional. The reservation that job would use. 

261 

262 User can specify a reservation to execute the job. If reservation is 

263 not set, reservation is determined based on the rules defined by the 

264 reservation assignments. The expected format is 

265 projects/{project}/locations/{location}/reservations/{reservation}. 

266 

267 Raises: 

268 ValueError: If ``value`` type is not None or of string type. 

269 """ 

270 return self._properties.setdefault("reservation", None) 

271 

272 @reservation.setter 

273 def reservation(self, value): 

274 if value and not isinstance(value, str): 

275 raise ValueError("Reservation must be None or a string.") 

276 self._properties["reservation"] = value 

277 

278 @property 

279 def labels(self): 

280 """Dict[str, str]: Labels for the job. 

281 

282 This method always returns a dict. Once a job has been created on the 

283 server, its labels cannot be modified anymore. 

284 

285 Raises: 

286 ValueError: If ``value`` type is invalid. 

287 """ 

288 return self._properties.setdefault("labels", {}) 

289 

290 @labels.setter 

291 def labels(self, value): 

292 if not isinstance(value, dict): 

293 raise ValueError("Pass a dict") 

294 self._properties["labels"] = value 

295 

296 def _get_sub_prop(self, key, default=None): 

297 """Get a value in the ``self._properties[self._job_type]`` dictionary. 

298 

299 Most job properties are inside the dictionary related to the job type 

300 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access 

301 those properties:: 

302 

303 self._get_sub_prop('destinationTable') 

304 

305 This is equivalent to using the ``_helpers._get_sub_prop`` function:: 

306 

307 _helpers._get_sub_prop( 

308 self._properties, ['query', 'destinationTable']) 

309 

310 Args: 

311 key (str): 

312 Key for the value to get in the 

313 ``self._properties[self._job_type]`` dictionary. 

314 default (Optional[object]): 

315 Default value to return if the key is not found. 

316 Defaults to :data:`None`. 

317 

318 Returns: 

319 object: The value if present or the default. 

320 """ 

321 return _helpers._get_sub_prop( 

322 self._properties, [self._job_type, key], default=default 

323 ) 

324 

325 def _set_sub_prop(self, key, value): 

326 """Set a value in the ``self._properties[self._job_type]`` dictionary. 

327 

328 Most job properties are inside the dictionary related to the job type 

329 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set 

330 those properties:: 

331 

332 self._set_sub_prop('useLegacySql', False) 

333 

334 This is equivalent to using the ``_helper._set_sub_prop`` function:: 

335 

336 _helper._set_sub_prop( 

337 self._properties, ['query', 'useLegacySql'], False) 

338 

339 Args: 

340 key (str): 

341 Key to set in the ``self._properties[self._job_type]`` 

342 dictionary. 

343 value (object): Value to set. 

344 """ 

345 _helpers._set_sub_prop(self._properties, [self._job_type, key], value) 

346 

347 def _del_sub_prop(self, key): 

348 """Remove ``key`` from the ``self._properties[self._job_type]`` dict. 

349 

350 Most job properties are inside the dictionary related to the job type 

351 (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear 

352 those properties:: 

353 

354 self._del_sub_prop('useLegacySql') 

355 

356 This is equivalent to using the ``_helper._del_sub_prop`` function:: 

357 

358 _helper._del_sub_prop( 

359 self._properties, ['query', 'useLegacySql']) 

360 

361 Args: 

362 key (str): 

363 Key to remove in the ``self._properties[self._job_type]`` 

364 dictionary. 

365 """ 

366 _helpers._del_sub_prop(self._properties, [self._job_type, key]) 

367 

368 def to_api_repr(self) -> dict: 

369 """Build an API representation of the job config. 

370 

371 Returns: 

372 Dict: A dictionary in the format used by the BigQuery API. 

373 """ 

374 return copy.deepcopy(self._properties) 

375 

376 def _fill_from_default(self, default_job_config=None): 

377 """Merge this job config with a default job config. 

378 

379 The keys in this object take precedence over the keys in the default 

380 config. The merge is done at the top-level as well as for keys one 

381 level below the job type. 

382 

383 Args: 

384 default_job_config (google.cloud.bigquery.job._JobConfig): 

385 The default job config that will be used to fill in self. 

386 

387 Returns: 

388 google.cloud.bigquery.job._JobConfig: A new (merged) job config. 

389 """ 

390 if not default_job_config: 

391 new_job_config = copy.deepcopy(self) 

392 return new_job_config 

393 

394 if self._job_type != default_job_config._job_type: 

395 raise TypeError( 

396 "attempted to merge two incompatible job types: " 

397 + repr(self._job_type) 

398 + ", " 

399 + repr(default_job_config._job_type) 

400 ) 

401 

402 # cls is one of the job config subclasses that provides the job_type argument to 

403 # this base class on instantiation, thus missing-parameter warning is a false 

404 # positive here. 

405 new_job_config = self.__class__() # pytype: disable=missing-parameter 

406 

407 default_job_properties = copy.deepcopy(default_job_config._properties) 

408 for key in self._properties: 

409 if key != self._job_type: 

410 default_job_properties[key] = self._properties[key] 

411 

412 default_job_properties[self._job_type].update(self._properties[self._job_type]) 

413 new_job_config._properties = default_job_properties 

414 

415 return new_job_config 

416 

417 @classmethod 

418 def from_api_repr(cls, resource: dict) -> "_JobConfig": 

419 """Factory: construct a job configuration given its API representation 

420 

421 Args: 

422 resource (Dict): 

423 A job configuration in the same representation as is returned 

424 from the API. 

425 

426 Returns: 

427 google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``. 

428 """ 

429 # cls is one of the job config subclasses that provides the job_type argument to 

430 # this base class on instantiation, thus missing-parameter warning is a false 

431 # positive here. 

432 job_config = cls() # type: ignore # pytype: disable=missing-parameter 

433 job_config._properties = resource 

434 return job_config 

435 

436 

437class _AsyncJob(google.api_core.future.polling.PollingFuture): 

438 """Base class for asynchronous jobs. 

439 

440 Args: 

441 job_id (Union[str, _JobReference]): 

442 Job's ID in the project associated with the client or a 

443 fully-qualified job reference. 

444 client (google.cloud.bigquery.client.Client): 

445 Client which holds credentials and project configuration. 

446 """ 

447 

448 _JOB_TYPE = "unknown" 

449 _CONFIG_CLASS: ClassVar 

450 

451 def __init__(self, job_id, client): 

452 super(_AsyncJob, self).__init__() 

453 

454 # The job reference can be either a plain job ID or the full resource. 

455 # Populate the properties dictionary consistently depending on what has 

456 # been passed in. 

457 job_ref = job_id 

458 if not isinstance(job_id, _JobReference): 

459 job_ref = _JobReference(job_id, client.project, None) 

460 self._properties = {"jobReference": job_ref._to_api_repr()} 

461 

462 self._client = client 

463 self._result_set = False 

464 self._completion_lock = threading.Lock() 

465 

466 @property 

467 def configuration(self) -> _JobConfig: 

468 """Job-type specific configurtion.""" 

469 configuration: _JobConfig = self._CONFIG_CLASS() # pytype: disable=not-callable 

470 configuration._properties = self._properties.setdefault("configuration", {}) 

471 return configuration 

472 

473 @property 

474 def job_id(self): 

475 """str: ID of the job.""" 

476 return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"]) 

477 

478 @property 

479 def parent_job_id(self): 

480 """Return the ID of the parent job. 

481 

482 See: 

483 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id 

484 

485 Returns: 

486 Optional[str]: parent job id. 

487 """ 

488 return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"]) 

489 

490 @property 

491 def script_statistics(self) -> Optional["ScriptStatistics"]: 

492 """Statistics for a child job of a script.""" 

493 resource = _helpers._get_sub_prop( 

494 self._properties, ["statistics", "scriptStatistics"] 

495 ) 

496 if resource is None: 

497 return None 

498 return ScriptStatistics(resource) 

499 

500 @property 

501 def session_info(self) -> Optional["SessionInfo"]: 

502 """[Preview] Information of the session if this job is part of one. 

503 

504 .. versionadded:: 2.29.0 

505 """ 

506 resource = _helpers._get_sub_prop( 

507 self._properties, ["statistics", "sessionInfo"] 

508 ) 

509 if resource is None: 

510 return None 

511 return SessionInfo(resource) 

512 

513 @property 

514 def num_child_jobs(self): 

515 """The number of child jobs executed. 

516 

517 See: 

518 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs 

519 

520 Returns: 

521 int 

522 """ 

523 count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"]) 

524 return int(count) if count is not None else 0 

525 

526 @property 

527 def project(self): 

528 """Project bound to the job. 

529 

530 Returns: 

531 str: the project (derived from the client). 

532 """ 

533 return _helpers._get_sub_prop(self._properties, ["jobReference", "projectId"]) 

534 

535 @property 

536 def location(self): 

537 """str: Location where the job runs.""" 

538 return _helpers._get_sub_prop(self._properties, ["jobReference", "location"]) 

539 

540 @property 

541 def reservation_id(self): 

542 """str: Name of the primary reservation assigned to this job. 

543 

544 Note that this could be different than reservations reported in 

545 the reservation field if parent reservations were used to execute 

546 this job. 

547 """ 

548 return _helpers._get_sub_prop( 

549 self._properties, ["statistics", "reservation_id"] 

550 ) 

551 

552 def _require_client(self, client): 

553 """Check client or verify over-ride. 

554 

555 Args: 

556 client (Optional[google.cloud.bigquery.client.Client]): 

557 the client to use. If not passed, falls back to the 

558 ``client`` stored on the current dataset. 

559 

560 Returns: 

561 google.cloud.bigquery.client.Client: 

562 The client passed in or the currently bound client. 

563 """ 

564 if client is None: 

565 client = self._client 

566 return client 

567 

568 @property 

569 def job_type(self): 

570 """Type of job. 

571 

572 Returns: 

573 str: one of 'load', 'copy', 'extract', 'query'. 

574 """ 

575 return self._JOB_TYPE 

576 

577 @property 

578 def path(self): 

579 """URL path for the job's APIs. 

580 

581 Returns: 

582 str: the path based on project and job ID. 

583 """ 

584 return "/projects/%s/jobs/%s" % (self.project, self.job_id) 

585 

586 @property 

587 def labels(self): 

588 """Dict[str, str]: Labels for the job.""" 

589 return self._properties.setdefault("configuration", {}).setdefault("labels", {}) 

590 

591 @property 

592 def etag(self): 

593 """ETag for the job resource. 

594 

595 Returns: 

596 Optional[str]: the ETag (None until set from the server). 

597 """ 

598 return self._properties.get("etag") 

599 

600 @property 

601 def self_link(self): 

602 """URL for the job resource. 

603 

604 Returns: 

605 Optional[str]: the URL (None until set from the server). 

606 """ 

607 return self._properties.get("selfLink") 

608 

609 @property 

610 def user_email(self): 

611 """E-mail address of user who submitted the job. 

612 

613 Returns: 

614 Optional[str]: the URL (None until set from the server). 

615 """ 

616 return self._properties.get("user_email") 

617 

618 @property 

619 def created(self): 

620 """Datetime at which the job was created. 

621 

622 Returns: 

623 Optional[datetime.datetime]: 

624 the creation time (None until set from the server). 

625 """ 

626 millis = _helpers._get_sub_prop( 

627 self._properties, ["statistics", "creationTime"] 

628 ) 

629 if millis is not None: 

630 return _helpers._datetime_from_microseconds(millis * 1000.0) 

631 

632 @property 

633 def started(self): 

634 """Datetime at which the job was started. 

635 

636 Returns: 

637 Optional[datetime.datetime]: 

638 the start time (None until set from the server). 

639 """ 

640 millis = _helpers._get_sub_prop(self._properties, ["statistics", "startTime"]) 

641 if millis is not None: 

642 return _helpers._datetime_from_microseconds(millis * 1000.0) 

643 

644 @property 

645 def ended(self): 

646 """Datetime at which the job finished. 

647 

648 Returns: 

649 Optional[datetime.datetime]: 

650 the end time (None until set from the server). 

651 """ 

652 millis = _helpers._get_sub_prop(self._properties, ["statistics", "endTime"]) 

653 if millis is not None: 

654 return _helpers._datetime_from_microseconds(millis * 1000.0) 

655 

656 def _job_statistics(self): 

657 """Helper for job-type specific statistics-based properties.""" 

658 statistics = self._properties.get("statistics", {}) 

659 return statistics.get(self._JOB_TYPE, {}) 

660 

661 @property 

662 def reservation_usage(self): 

663 """Job resource usage breakdown by reservation. 

664 

665 Returns: 

666 List[google.cloud.bigquery.job.ReservationUsage]: 

667 Reservation usage stats. Can be empty if not set from the server. 

668 """ 

669 usage_stats_raw = _helpers._get_sub_prop( 

670 self._properties, ["statistics", "reservationUsage"], default=() 

671 ) 

672 return [ 

673 ReservationUsage(name=usage["name"], slot_ms=int(usage["slotMs"])) 

674 for usage in usage_stats_raw 

675 ] 

676 

677 @property 

678 def transaction_info(self) -> Optional[TransactionInfo]: 

679 """Information of the multi-statement transaction if this job is part of one. 

680 

681 Since a scripting query job can execute multiple transactions, this 

682 property is only expected on child jobs. Use the 

683 :meth:`google.cloud.bigquery.client.Client.list_jobs` method with the 

684 ``parent_job`` parameter to iterate over child jobs. 

685 

686 .. versionadded:: 2.24.0 

687 """ 

688 info = self._properties.get("statistics", {}).get("transactionInfo") 

689 if info is None: 

690 return None 

691 else: 

692 return TransactionInfo.from_api_repr(info) 

693 

694 @property 

695 def error_result(self): 

696 """Error information about the job as a whole. 

697 

698 Returns: 

699 Optional[Mapping]: the error information (None until set from the server). 

700 """ 

701 status = self._properties.get("status") 

702 if status is not None: 

703 return status.get("errorResult") 

704 

705 @property 

706 def errors(self): 

707 """Information about individual errors generated by the job. 

708 

709 Returns: 

710 Optional[List[Mapping]]: 

711 the error information (None until set from the server). 

712 """ 

713 status = self._properties.get("status") 

714 if status is not None: 

715 return status.get("errors") 

716 

717 @property 

718 def state(self): 

719 """Status of the job. 

720 

721 Returns: 

722 Optional[str]: 

723 the state (None until set from the server). 

724 """ 

725 status = self._properties.get("status", {}) 

726 return status.get("state") 

727 

728 def _set_properties(self, api_response): 

729 """Update properties from resource in body of ``api_response`` 

730 

731 Args: 

732 api_response (Dict): response returned from an API call. 

733 """ 

734 cleaned = api_response.copy() 

735 statistics = cleaned.setdefault("statistics", {}) 

736 if "creationTime" in statistics: 

737 statistics["creationTime"] = float(statistics["creationTime"]) 

738 if "startTime" in statistics: 

739 statistics["startTime"] = float(statistics["startTime"]) 

740 if "endTime" in statistics: 

741 statistics["endTime"] = float(statistics["endTime"]) 

742 

743 self._properties = cleaned 

744 

745 # For Future interface 

746 self._set_future_result() 

747 

748 @classmethod 

749 def _check_resource_config(cls, resource): 

750 """Helper for :meth:`from_api_repr` 

751 

752 Args: 

753 resource (Dict): resource for the job. 

754 

755 Raises: 

756 KeyError: 

757 If the resource has no identifier, or 

758 is missing the appropriate configuration. 

759 """ 

760 if "jobReference" not in resource or "jobId" not in resource["jobReference"]: 

761 raise KeyError( 

762 "Resource lacks required identity information: " 

763 '["jobReference"]["jobId"]' 

764 ) 

765 if ( 

766 "configuration" not in resource 

767 or cls._JOB_TYPE not in resource["configuration"] 

768 ): 

769 raise KeyError( 

770 "Resource lacks required configuration: " 

771 '["configuration"]["%s"]' % cls._JOB_TYPE 

772 ) 

773 

774 def to_api_repr(self): 

775 """Generate a resource for the job.""" 

776 return copy.deepcopy(self._properties) 

777 

778 _build_resource = to_api_repr # backward-compatibility alias 

779 

780 def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): 

781 """API call: begin the job via a POST request 

782 

783 See 

784 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert 

785 

786 Args: 

787 client (Optional[google.cloud.bigquery.client.Client]): 

788 The client to use. If not passed, falls back to the ``client`` 

789 associated with the job object or``NoneType`` 

790 retry (Optional[google.api_core.retry.Retry]): 

791 How to retry the RPC. 

792 timeout (Optional[float]): 

793 The number of seconds to wait for the underlying HTTP transport 

794 before using ``retry``. 

795 

796 Raises: 

797 ValueError: 

798 If the job has already begun. 

799 """ 

800 if self.state is not None: 

801 raise ValueError("Job already begun.") 

802 

803 client = self._require_client(client) 

804 path = "/projects/%s/jobs" % (self.project,) 

805 

806 # jobs.insert is idempotent because we ensure that every new 

807 # job has an ID. 

808 span_attributes = {"path": path} 

809 api_response = client._call_api( 

810 retry, 

811 span_name="BigQuery.job.begin", 

812 span_attributes=span_attributes, 

813 job_ref=self, 

814 method="POST", 

815 path=path, 

816 data=self.to_api_repr(), 

817 timeout=timeout, 

818 ) 

819 self._set_properties(api_response) 

820 

821 def exists( 

822 self, 

823 client=None, 

824 retry: "retries.Retry" = DEFAULT_RETRY, 

825 timeout: Optional[float] = None, 

826 ) -> bool: 

827 """API call: test for the existence of the job via a GET request 

828 

829 See 

830 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get 

831 

832 Args: 

833 client (Optional[google.cloud.bigquery.client.Client]): 

834 the client to use. If not passed, falls back to the 

835 ``client`` stored on the current dataset. 

836 

837 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

838 timeout (Optional[float]): 

839 The number of seconds to wait for the underlying HTTP transport 

840 before using ``retry``. 

841 

842 Returns: 

843 bool: Boolean indicating existence of the job. 

844 """ 

845 client = self._require_client(client) 

846 

847 extra_params = {"fields": "id"} 

848 if self.location: 

849 extra_params["location"] = self.location 

850 

851 try: 

852 span_attributes = {"path": self.path} 

853 

854 client._call_api( 

855 retry, 

856 span_name="BigQuery.job.exists", 

857 span_attributes=span_attributes, 

858 job_ref=self, 

859 method="GET", 

860 path=self.path, 

861 query_params=extra_params, 

862 timeout=timeout, 

863 ) 

864 except exceptions.NotFound: 

865 return False 

866 else: 

867 return True 

868 

869 def reload( 

870 self, 

871 client=None, 

872 retry: "retries.Retry" = DEFAULT_RETRY, 

873 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT, 

874 ): 

875 """API call: refresh job properties via a GET request. 

876 

877 See 

878 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get 

879 

880 Args: 

881 client (Optional[google.cloud.bigquery.client.Client]): 

882 the client to use. If not passed, falls back to the 

883 ``client`` stored on the current dataset. 

884 

885 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

886 timeout (Optional[float]): 

887 The number of seconds to wait for the underlying HTTP transport 

888 before using ``retry``. 

889 """ 

890 client = self._require_client(client) 

891 

892 got_job = client.get_job( 

893 self, 

894 project=self.project, 

895 location=self.location, 

896 retry=retry, 

897 timeout=timeout, 

898 ) 

899 self._set_properties(got_job._properties) 

900 

901 def cancel( 

902 self, 

903 client=None, 

904 retry: Optional[retries.Retry] = DEFAULT_RETRY, 

905 timeout: Optional[float] = None, 

906 ) -> bool: 

907 """API call: cancel job via a POST request 

908 

909 See 

910 https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel 

911 

912 Args: 

913 client (Optional[google.cloud.bigquery.client.Client]): 

914 the client to use. If not passed, falls back to the 

915 ``client`` stored on the current dataset. 

916 retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. 

917 timeout (Optional[float]): 

918 The number of seconds to wait for the underlying HTTP transport 

919 before using ``retry`` 

920 

921 Returns: 

922 bool: Boolean indicating that the cancel request was sent. 

923 """ 

924 client = self._require_client(client) 

925 

926 extra_params = {} 

927 if self.location: 

928 extra_params["location"] = self.location 

929 

930 path = "{}/cancel".format(self.path) 

931 span_attributes = {"path": path} 

932 

933 api_response = client._call_api( 

934 retry, 

935 span_name="BigQuery.job.cancel", 

936 span_attributes=span_attributes, 

937 job_ref=self, 

938 method="POST", 

939 path=path, 

940 query_params=extra_params, 

941 timeout=timeout, 

942 ) 

943 self._set_properties(api_response["job"]) 

944 # The Future interface requires that we return True if the *attempt* 

945 # to cancel was successful. 

946 return True 

947 

948 # The following methods implement the PollingFuture interface. Note that 

949 # the methods above are from the pre-Future interface and are left for 

950 # compatibility. The only "overloaded" method is :meth:`cancel`, which 

951 # satisfies both interfaces. 

952 

953 def _set_future_result(self): 

954 """Set the result or exception from the job if it is complete.""" 

955 # This must be done in a lock to prevent the polling thread 

956 # and main thread from both executing the completion logic 

957 # at the same time. 

958 with self._completion_lock: 

959 # If the operation isn't complete or if the result has already been 

960 # set, do not call set_result/set_exception again. 

961 # Note: self._result_set is set to True in set_result and 

962 # set_exception, in case those methods are invoked directly. 

963 if not self.done(reload=False) or self._result_set: 

964 return 

965 

966 if self.error_result is not None: 

967 exception = _error_result_to_exception( 

968 self.error_result, self.errors or () 

969 ) 

970 self.set_exception(exception) 

971 else: 

972 self.set_result(self) 

973 

974 def done( 

975 self, 

976 retry: "retries.Retry" = DEFAULT_RETRY, 

977 timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT, 

978 reload: bool = True, 

979 ) -> bool: 

980 """Checks if the job is complete. 

981 

982 Args: 

983 retry (Optional[google.api_core.retry.Retry]): 

984 How to retry the RPC. If the job state is ``DONE``, retrying is aborted 

985 early, as the job will not change anymore. 

986 timeout (Optional[float]): 

987 The number of seconds to wait for the underlying HTTP transport 

988 before using ``retry``. 

989 reload (Optional[bool]): 

990 If ``True``, make an API call to refresh the job state of 

991 unfinished jobs before checking. Default ``True``. 

992 

993 Returns: 

994 bool: True if the job is complete, False otherwise. 

995 """ 

996 # Do not refresh is the state is already done, as the job will not 

997 # change once complete. 

998 if self.state != _DONE_STATE and reload: 

999 self.reload(retry=retry, timeout=timeout) 

1000 return self.state == _DONE_STATE 

1001 

1002 def result( # type: ignore # (incompatible with supertype) 

1003 self, 

1004 retry: Optional[retries.Retry] = DEFAULT_RETRY, 

1005 timeout: Optional[float] = None, 

1006 ) -> "_AsyncJob": 

1007 """Start the job and wait for it to complete and get the result. 

1008 

1009 Args: 

1010 retry (Optional[google.api_core.retry.Retry]): 

1011 How to retry the RPC. If the job state is ``DONE``, retrying is aborted 

1012 early, as the job will not change anymore. 

1013 timeout (Optional[float]): 

1014 The number of seconds to wait for the underlying HTTP transport 

1015 before using ``retry``. 

1016 If multiple requests are made under the hood, ``timeout`` 

1017 applies to each individual request. 

1018 

1019 Returns: 

1020 _AsyncJob: This instance. 

1021 

1022 Raises: 

1023 google.cloud.exceptions.GoogleAPICallError: 

1024 if the job failed. 

1025 concurrent.futures.TimeoutError: 

1026 if the job did not complete in the given timeout. 

1027 """ 

1028 if self.state is None: 

1029 self._begin(retry=retry, timeout=timeout) 

1030 

1031 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry} 

1032 return super(_AsyncJob, self).result(timeout=timeout, **kwargs) 

1033 

1034 def cancelled(self): 

1035 """Check if the job has been cancelled. 

1036 

1037 This always returns False. It's not possible to check if a job was 

1038 cancelled in the API. This method is here to satisfy the interface 

1039 for :class:`google.api_core.future.Future`. 

1040 

1041 Returns: 

1042 bool: False 

1043 """ 

1044 return ( 

1045 self.error_result is not None 

1046 and self.error_result.get("reason") == _STOPPED_REASON 

1047 ) 

1048 

1049 def __repr__(self): 

1050 result = ( 

1051 f"{self.__class__.__name__}<" 

1052 f"project={self.project}, location={self.location}, id={self.job_id}" 

1053 ">" 

1054 ) 

1055 return result 

1056 

1057 

1058class ScriptStackFrame(object): 

1059 """Stack frame showing the line/column/procedure name where the current 

1060 evaluation happened. 

1061 

1062 Args: 

1063 resource (Map[str, Any]): JSON representation of object. 

1064 """ 

1065 

1066 def __init__(self, resource): 

1067 self._properties = resource 

1068 

1069 @property 

1070 def procedure_id(self): 

1071 """Optional[str]: Name of the active procedure. 

1072 

1073 Omitted if in a top-level script. 

1074 """ 

1075 return self._properties.get("procedureId") 

1076 

1077 @property 

1078 def text(self): 

1079 """str: Text of the current statement/expression.""" 

1080 return self._properties.get("text") 

1081 

1082 @property 

1083 def start_line(self): 

1084 """int: One-based start line.""" 

1085 return _helpers._int_or_none(self._properties.get("startLine")) 

1086 

1087 @property 

1088 def start_column(self): 

1089 """int: One-based start column.""" 

1090 return _helpers._int_or_none(self._properties.get("startColumn")) 

1091 

1092 @property 

1093 def end_line(self): 

1094 """int: One-based end line.""" 

1095 return _helpers._int_or_none(self._properties.get("endLine")) 

1096 

1097 @property 

1098 def end_column(self): 

1099 """int: One-based end column.""" 

1100 return _helpers._int_or_none(self._properties.get("endColumn")) 

1101 

1102 

1103class ScriptStatistics(object): 

1104 """Statistics for a child job of a script. 

1105 

1106 Args: 

1107 resource (Map[str, Any]): JSON representation of object. 

1108 """ 

1109 

1110 def __init__(self, resource): 

1111 self._properties = resource 

1112 

1113 @property 

1114 def stack_frames(self) -> Sequence[ScriptStackFrame]: 

1115 """Stack trace where the current evaluation happened. 

1116 

1117 Shows line/column/procedure name of each frame on the stack at the 

1118 point where the current evaluation happened. 

1119 

1120 The leaf frame is first, the primary script is last. 

1121 """ 

1122 return [ 

1123 ScriptStackFrame(frame) for frame in self._properties.get("stackFrames", []) 

1124 ] 

1125 

1126 @property 

1127 def evaluation_kind(self) -> Optional[str]: 

1128 """str: Indicates the type of child job. 

1129 

1130 Possible values include ``STATEMENT`` and ``EXPRESSION``. 

1131 """ 

1132 return self._properties.get("evaluationKind") 

1133 

1134 

1135class SessionInfo: 

1136 """[Preview] Information of the session if this job is part of one. 

1137 

1138 .. versionadded:: 2.29.0 

1139 

1140 Args: 

1141 resource (Map[str, Any]): JSON representation of object. 

1142 """ 

1143 

1144 def __init__(self, resource): 

1145 self._properties = resource 

1146 

1147 @property 

1148 def session_id(self) -> Optional[str]: 

1149 """The ID of the session.""" 

1150 return self._properties.get("sessionId") 

1151 

1152 

1153class UnknownJob(_AsyncJob): 

1154 """A job whose type cannot be determined.""" 

1155 

1156 @classmethod 

1157 def from_api_repr(cls, resource: dict, client) -> "UnknownJob": 

1158 """Construct an UnknownJob from the JSON representation. 

1159 

1160 Args: 

1161 resource (Dict): JSON representation of a job. 

1162 client (google.cloud.bigquery.client.Client): 

1163 Client connected to BigQuery API. 

1164 

1165 Returns: 

1166 UnknownJob: Job corresponding to the resource. 

1167 """ 

1168 job_ref_properties = resource.get( 

1169 "jobReference", {"projectId": client.project, "jobId": None} 

1170 ) 

1171 job_ref = _JobReference._from_api_repr(job_ref_properties) 

1172 job = cls(job_ref, client) 

1173 # Populate the job reference with the project, even if it has been 

1174 # redacted, because we know it should equal that of the request. 

1175 resource["jobReference"] = job_ref_properties 

1176 job._properties = resource 

1177 return job