Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/magics/magics.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

276 statements  

1# Copyright 2018 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""IPython Magics 

16 

17Install ``bigquery-magics`` and call ``%load_ext bigquery_magics`` to use the 

18``%%bigquery`` cell magic. 

19 

20See the `BigQuery Magics reference documentation 

21<https://googleapis.dev/python/bigquery-magics/latest/>`_. 

22""" 

23 

24from __future__ import print_function 

25 

26import re 

27import ast 

28import copy 

29import functools 

30import sys 

31import time 

32import warnings 

33from concurrent import futures 

34 

35try: 

36 import IPython # type: ignore 

37 from IPython import display # type: ignore 

38 from IPython.core import magic_arguments # type: ignore 

39except ImportError: 

40 raise ImportError("This module can only be loaded in IPython.") 

41 

42from google.api_core import client_info 

43from google.api_core import client_options 

44from google.api_core.exceptions import NotFound 

45import google.auth # type: ignore 

46from google.cloud import bigquery 

47import google.cloud.bigquery.dataset 

48from google.cloud.bigquery import _versions_helpers 

49from google.cloud.bigquery import exceptions 

50from google.cloud.bigquery.dbapi import _helpers 

51from google.cloud.bigquery.magics import line_arg_parser as lap 

52 

53try: 

54 import bigquery_magics # type: ignore 

55except ImportError: 

56 bigquery_magics = None 

57 

58IPYTHON_USER_AGENT = "ipython-{}".format(IPython.__version__) # type: ignore 

59 

60 

61class Context(object): 

62 """Storage for objects to be used throughout an IPython notebook session. 

63 

64 A Context object is initialized when the ``magics`` module is imported, 

65 and can be found at ``google.cloud.bigquery.magics.context``. 

66 """ 

67 

68 def __init__(self): 

69 self._credentials = None 

70 self._project = None 

71 self._connection = None 

72 self._default_query_job_config = bigquery.QueryJobConfig() 

73 self._bigquery_client_options = client_options.ClientOptions() 

74 self._bqstorage_client_options = client_options.ClientOptions() 

75 self._progress_bar_type = "tqdm_notebook" 

76 

77 @property 

78 def credentials(self): 

79 """google.auth.credentials.Credentials: Credentials to use for queries 

80 performed through IPython magics. 

81 

82 Note: 

83 These credentials do not need to be explicitly defined if you are 

84 using Application Default Credentials. If you are not using 

85 Application Default Credentials, manually construct a 

86 :class:`google.auth.credentials.Credentials` object and set it as 

87 the context credentials as demonstrated in the example below. See 

88 `auth docs`_ for more information on obtaining credentials. 

89 

90 Example: 

91 Manually setting the context credentials: 

92 

93 >>> from google.cloud.bigquery import magics 

94 >>> from google.oauth2 import service_account 

95 >>> credentials = (service_account 

96 ... .Credentials.from_service_account_file( 

97 ... '/path/to/key.json')) 

98 >>> magics.context.credentials = credentials 

99 

100 

101 .. _auth docs: http://google-auth.readthedocs.io 

102 /en/latest/user-guide.html#obtaining-credentials 

103 """ 

104 if self._credentials is None: 

105 self._credentials, _ = google.auth.default() 

106 return self._credentials 

107 

108 @credentials.setter 

109 def credentials(self, value): 

110 self._credentials = value 

111 

112 @property 

113 def project(self): 

114 """str: Default project to use for queries performed through IPython 

115 magics. 

116 

117 Note: 

118 The project does not need to be explicitly defined if you have an 

119 environment default project set. If you do not have a default 

120 project set in your environment, manually assign the project as 

121 demonstrated in the example below. 

122 

123 Example: 

124 Manually setting the context project: 

125 

126 >>> from google.cloud.bigquery import magics 

127 >>> magics.context.project = 'my-project' 

128 """ 

129 if self._project is None: 

130 _, self._project = google.auth.default() 

131 return self._project 

132 

133 @project.setter 

134 def project(self, value): 

135 self._project = value 

136 

137 @property 

138 def bigquery_client_options(self): 

139 """google.api_core.client_options.ClientOptions: client options to be 

140 used through IPython magics. 

141 

142 Note:: 

143 The client options do not need to be explicitly defined if no 

144 special network connections are required. Normally you would be 

145 using the https://bigquery.googleapis.com/ end point. 

146 

147 Example: 

148 Manually setting the endpoint: 

149 

150 >>> from google.cloud.bigquery import magics 

151 >>> client_options = {} 

152 >>> client_options['api_endpoint'] = "https://some.special.url" 

153 >>> magics.context.bigquery_client_options = client_options 

154 """ 

155 return self._bigquery_client_options 

156 

157 @bigquery_client_options.setter 

158 def bigquery_client_options(self, value): 

159 self._bigquery_client_options = value 

160 

161 @property 

162 def bqstorage_client_options(self): 

163 """google.api_core.client_options.ClientOptions: client options to be 

164 used through IPython magics for the storage client. 

165 

166 Note:: 

167 The client options do not need to be explicitly defined if no 

168 special network connections are required. Normally you would be 

169 using the https://bigquerystorage.googleapis.com/ end point. 

170 

171 Example: 

172 Manually setting the endpoint: 

173 

174 >>> from google.cloud.bigquery import magics 

175 >>> client_options = {} 

176 >>> client_options['api_endpoint'] = "https://some.special.url" 

177 >>> magics.context.bqstorage_client_options = client_options 

178 """ 

179 return self._bqstorage_client_options 

180 

181 @bqstorage_client_options.setter 

182 def bqstorage_client_options(self, value): 

183 self._bqstorage_client_options = value 

184 

185 @property 

186 def default_query_job_config(self): 

187 """google.cloud.bigquery.job.QueryJobConfig: Default job 

188 configuration for queries. 

189 

190 The context's :class:`~google.cloud.bigquery.job.QueryJobConfig` is 

191 used for queries. Some properties can be overridden with arguments to 

192 the magics. 

193 

194 Example: 

195 Manually setting the default value for ``maximum_bytes_billed`` 

196 to 100 MB: 

197 

198 >>> from google.cloud.bigquery import magics 

199 >>> magics.context.default_query_job_config.maximum_bytes_billed = 100000000 

200 """ 

201 return self._default_query_job_config 

202 

203 @default_query_job_config.setter 

204 def default_query_job_config(self, value): 

205 self._default_query_job_config = value 

206 

207 @property 

208 def progress_bar_type(self): 

209 """str: Default progress bar type to use to display progress bar while 

210 executing queries through IPython magics. 

211 

212 Note:: 

213 Install the ``tqdm`` package to use this feature. 

214 

215 Example: 

216 Manually setting the progress_bar_type: 

217 

218 >>> from google.cloud.bigquery import magics 

219 >>> magics.context.progress_bar_type = "tqdm_notebook" 

220 """ 

221 return self._progress_bar_type 

222 

223 @progress_bar_type.setter 

224 def progress_bar_type(self, value): 

225 self._progress_bar_type = value 

226 

227 

228# If bigquery_magics is available, we load that extension rather than this one. 

229# Ensure google.cloud.bigquery.magics.context setters are on the correct magics 

230# implementation in case the user has installed the package but hasn't updated 

231# their code. 

232if bigquery_magics is not None: 

233 context = bigquery_magics.context 

234else: 

235 context = Context() 

236 

237 

238def _handle_error(error, destination_var=None): 

239 """Process a query execution error. 

240 

241 Args: 

242 error (Exception): 

243 An exception that occurred during the query execution. 

244 destination_var (Optional[str]): 

245 The name of the IPython session variable to store the query job. 

246 """ 

247 if destination_var: 

248 query_job = getattr(error, "query_job", None) 

249 

250 if query_job is not None: 

251 IPython.get_ipython().push({destination_var: query_job}) 

252 else: 

253 # this is the case when previewing table rows by providing just 

254 # table ID to cell magic 

255 print( 

256 "Could not save output to variable '{}'.".format(destination_var), 

257 file=sys.stderr, 

258 ) 

259 

260 print("\nERROR:\n", str(error), file=sys.stderr) 

261 

262 

263def _run_query(client, query, job_config=None): 

264 """Runs a query while printing status updates 

265 

266 Args: 

267 client (google.cloud.bigquery.client.Client): 

268 Client to bundle configuration needed for API requests. 

269 query (str): 

270 SQL query to be executed. Defaults to the standard SQL dialect. 

271 Use the ``job_config`` parameter to change dialects. 

272 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): 

273 Extra configuration options for the job. 

274 

275 Returns: 

276 google.cloud.bigquery.job.QueryJob: the query job created 

277 

278 Example: 

279 >>> client = bigquery.Client() 

280 >>> _run_query(client, "SELECT 17") 

281 Executing query with job ID: bf633912-af2c-4780-b568-5d868058632b 

282 Query executing: 1.66s 

283 Query complete after 2.07s 

284 'bf633912-af2c-4780-b568-5d868058632b' 

285 """ 

286 start_time = time.perf_counter() 

287 query_job = client.query(query, job_config=job_config) 

288 

289 if job_config and job_config.dry_run: 

290 return query_job 

291 

292 print(f"Executing query with job ID: {query_job.job_id}") 

293 

294 while True: 

295 print( 

296 f"\rQuery executing: {time.perf_counter() - start_time:.2f}s".format(), 

297 end="", 

298 ) 

299 try: 

300 query_job.result(timeout=0.5) 

301 break 

302 except futures.TimeoutError: 

303 continue 

304 print(f"\nJob ID {query_job.job_id} successfully executed") 

305 return query_job 

306 

307 

308def _create_dataset_if_necessary(client, dataset_id): 

309 """Create a dataset in the current project if it doesn't exist. 

310 

311 Args: 

312 client (google.cloud.bigquery.client.Client): 

313 Client to bundle configuration needed for API requests. 

314 dataset_id (str): 

315 Dataset id. 

316 """ 

317 dataset_reference = bigquery.dataset.DatasetReference(client.project, dataset_id) 

318 try: 

319 dataset = client.get_dataset(dataset_reference) 

320 return 

321 except NotFound: 

322 pass 

323 dataset = bigquery.Dataset(dataset_reference) 

324 dataset.location = client.location 

325 print(f"Creating dataset: {dataset_id}") 

326 dataset = client.create_dataset(dataset) 

327 

328 

329@magic_arguments.magic_arguments() 

330@magic_arguments.argument( 

331 "destination_var", 

332 nargs="?", 

333 help=("If provided, save the output to this variable instead of displaying it."), 

334) 

335@magic_arguments.argument( 

336 "--destination_table", 

337 type=str, 

338 default=None, 

339 help=( 

340 "If provided, save the output of the query to a new BigQuery table. " 

341 "Variable should be in a format <dataset_id>.<table_id>. " 

342 "If table does not exists, it will be created. " 

343 "If table already exists, its data will be overwritten." 

344 ), 

345) 

346@magic_arguments.argument( 

347 "--project", 

348 type=str, 

349 default=None, 

350 help=("Project to use for executing this query. Defaults to the context project."), 

351) 

352@magic_arguments.argument( 

353 "--max_results", 

354 default=None, 

355 help=( 

356 "Maximum number of rows in dataframe returned from executing the query." 

357 "Defaults to returning all rows." 

358 ), 

359) 

360@magic_arguments.argument( 

361 "--maximum_bytes_billed", 

362 default=None, 

363 help=( 

364 "maximum_bytes_billed to use for executing this query. Defaults to " 

365 "the context default_query_job_config.maximum_bytes_billed." 

366 ), 

367) 

368@magic_arguments.argument( 

369 "--dry_run", 

370 action="store_true", 

371 default=False, 

372 help=( 

373 "Sets query to be a dry run to estimate costs. " 

374 "Defaults to executing the query instead of dry run if this argument is not used." 

375 ), 

376) 

377@magic_arguments.argument( 

378 "--use_legacy_sql", 

379 action="store_true", 

380 default=False, 

381 help=( 

382 "Sets query to use Legacy SQL instead of Standard SQL. Defaults to " 

383 "Standard SQL if this argument is not used." 

384 ), 

385) 

386@magic_arguments.argument( 

387 "--bigquery_api_endpoint", 

388 type=str, 

389 default=None, 

390 help=( 

391 "The desired API endpoint, e.g., bigquery.googlepis.com. Defaults to this " 

392 "option's value in the context bigquery_client_options." 

393 ), 

394) 

395@magic_arguments.argument( 

396 "--bqstorage_api_endpoint", 

397 type=str, 

398 default=None, 

399 help=( 

400 "The desired API endpoint, e.g., bigquerystorage.googlepis.com. Defaults to " 

401 "this option's value in the context bqstorage_client_options." 

402 ), 

403) 

404@magic_arguments.argument( 

405 "--no_query_cache", 

406 action="store_true", 

407 default=False, 

408 help=("Do not use cached query results."), 

409) 

410@magic_arguments.argument( 

411 "--use_bqstorage_api", 

412 action="store_true", 

413 default=None, 

414 help=( 

415 "[Deprecated] The BigQuery Storage API is already used by default to " 

416 "download large query results, and this option has no effect. " 

417 "If you want to switch to the classic REST API instead, use the " 

418 "--use_rest_api option." 

419 ), 

420) 

421@magic_arguments.argument( 

422 "--use_rest_api", 

423 action="store_true", 

424 default=False, 

425 help=( 

426 "Use the classic REST API instead of the BigQuery Storage API to " 

427 "download query results." 

428 ), 

429) 

430@magic_arguments.argument( 

431 "--verbose", 

432 action="store_true", 

433 default=False, 

434 help=( 

435 "If set, print verbose output, including the query job ID and the " 

436 "amount of time for the query to finish. By default, this " 

437 "information will be displayed as the query runs, but will be " 

438 "cleared after the query is finished." 

439 ), 

440) 

441@magic_arguments.argument( 

442 "--params", 

443 nargs="+", 

444 default=None, 

445 help=( 

446 "Parameters to format the query string. If present, the --params " 

447 "flag should be followed by a string representation of a dictionary " 

448 "in the format {'param_name': 'param_value'} (ex. {\"num\": 17}), " 

449 "or a reference to a dictionary in the same format. The dictionary " 

450 "reference can be made by including a '$' before the variable " 

451 "name (ex. $my_dict_var)." 

452 ), 

453) 

454@magic_arguments.argument( 

455 "--progress_bar_type", 

456 type=str, 

457 default=None, 

458 help=( 

459 "Sets progress bar type to display a progress bar while executing the query." 

460 "Defaults to use tqdm_notebook. Install the ``tqdm`` package to use this feature." 

461 ), 

462) 

463@magic_arguments.argument( 

464 "--location", 

465 type=str, 

466 default=None, 

467 help=( 

468 "Set the location to execute query." 

469 "Defaults to location set in query setting in console." 

470 ), 

471) 

472def _cell_magic(line, query): 

473 """Underlying function for bigquery cell magic 

474 

475 Note: 

476 This function contains the underlying logic for the 'bigquery' cell 

477 magic. This function is not meant to be called directly. 

478 

479 Args: 

480 line (str): "%%bigquery" followed by arguments as required 

481 query (str): SQL query to run 

482 

483 Returns: 

484 pandas.DataFrame: the query results. 

485 """ 

486 # The built-in parser does not recognize Python structures such as dicts, thus 

487 # we extract the "--params" option and inteprpret it separately. 

488 try: 

489 params_option_value, rest_of_args = _split_args_line(line) 

490 except lap.exceptions.QueryParamsParseError as exc: 

491 rebranded_error = SyntaxError( 

492 "--params is not a correctly formatted JSON string or a JSON " 

493 "serializable dictionary" 

494 ) 

495 raise rebranded_error from exc 

496 except lap.exceptions.DuplicateQueryParamsError as exc: 

497 rebranded_error = ValueError("Duplicate --params option.") 

498 raise rebranded_error from exc 

499 except lap.exceptions.ParseError as exc: 

500 rebranded_error = ValueError( 

501 "Unrecognized input, are option values correct? " 

502 "Error details: {}".format(exc.args[0]) 

503 ) 

504 raise rebranded_error from exc 

505 

506 args = magic_arguments.parse_argstring(_cell_magic, rest_of_args) 

507 

508 if args.use_bqstorage_api is not None: 

509 warnings.warn( 

510 "Deprecated option --use_bqstorage_api, the BigQuery " 

511 "Storage API is already used by default.", 

512 category=DeprecationWarning, 

513 ) 

514 use_bqstorage_api = not args.use_rest_api 

515 location = args.location 

516 

517 params = [] 

518 if params_option_value: 

519 # A non-existing params variable is not expanded and ends up in the input 

520 # in its raw form, e.g. "$query_params". 

521 if params_option_value.startswith("$"): 

522 msg = 'Parameter expansion failed, undefined variable "{}".'.format( 

523 params_option_value[1:] 

524 ) 

525 raise NameError(msg) 

526 

527 params = _helpers.to_query_parameters(ast.literal_eval(params_option_value), {}) 

528 

529 project = args.project or context.project 

530 

531 bigquery_client_options = copy.deepcopy(context.bigquery_client_options) 

532 if args.bigquery_api_endpoint: 

533 if isinstance(bigquery_client_options, dict): 

534 bigquery_client_options["api_endpoint"] = args.bigquery_api_endpoint 

535 else: 

536 bigquery_client_options.api_endpoint = args.bigquery_api_endpoint 

537 

538 client = bigquery.Client( 

539 project=project, 

540 credentials=context.credentials, 

541 default_query_job_config=context.default_query_job_config, 

542 client_info=client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT), 

543 client_options=bigquery_client_options, 

544 location=location, 

545 ) 

546 if context._connection: 

547 client._connection = context._connection 

548 

549 bqstorage_client_options = copy.deepcopy(context.bqstorage_client_options) 

550 if args.bqstorage_api_endpoint: 

551 if isinstance(bqstorage_client_options, dict): 

552 bqstorage_client_options["api_endpoint"] = args.bqstorage_api_endpoint 

553 else: 

554 bqstorage_client_options.api_endpoint = args.bqstorage_api_endpoint 

555 

556 bqstorage_client = _make_bqstorage_client( 

557 client, 

558 use_bqstorage_api, 

559 bqstorage_client_options, 

560 ) 

561 

562 close_transports = functools.partial(_close_transports, client, bqstorage_client) 

563 

564 try: 

565 if args.max_results: 

566 max_results = int(args.max_results) 

567 else: 

568 max_results = None 

569 

570 query = query.strip() 

571 

572 if not query: 

573 error = ValueError("Query is missing.") 

574 _handle_error(error, args.destination_var) 

575 return 

576 

577 # Check if query is given as a reference to a variable. 

578 if query.startswith("$"): 

579 query_var_name = query[1:] 

580 

581 if not query_var_name: 

582 missing_msg = 'Missing query variable name, empty "$" is not allowed.' 

583 raise NameError(missing_msg) 

584 

585 if query_var_name.isidentifier(): 

586 ip = IPython.get_ipython() 

587 query = ip.user_ns.get(query_var_name, ip) # ip serves as a sentinel 

588 

589 if query is ip: 

590 raise NameError( 

591 f"Unknown query, variable {query_var_name} does not exist." 

592 ) 

593 else: 

594 if not isinstance(query, (str, bytes)): 

595 raise TypeError( 

596 f"Query variable {query_var_name} must be a string " 

597 "or a bytes-like value." 

598 ) 

599 

600 # Any query that does not contain whitespace (aside from leading and trailing whitespace) 

601 # is assumed to be a table id 

602 if not re.search(r"\s", query): 

603 try: 

604 rows = client.list_rows(query, max_results=max_results) 

605 except Exception as ex: 

606 _handle_error(ex, args.destination_var) 

607 return 

608 

609 result = rows.to_dataframe( 

610 bqstorage_client=bqstorage_client, 

611 create_bqstorage_client=False, 

612 ) 

613 if args.destination_var: 

614 IPython.get_ipython().push({args.destination_var: result}) 

615 return 

616 else: 

617 return result 

618 

619 job_config = bigquery.job.QueryJobConfig() 

620 job_config.query_parameters = params 

621 job_config.use_legacy_sql = args.use_legacy_sql 

622 job_config.dry_run = args.dry_run 

623 

624 # Don't override context job config unless --no_query_cache is explicitly set. 

625 if args.no_query_cache: 

626 job_config.use_query_cache = False 

627 

628 if args.destination_table: 

629 split = args.destination_table.split(".") 

630 if len(split) != 2: 

631 raise ValueError( 

632 "--destination_table should be in a <dataset_id>.<table_id> format." 

633 ) 

634 dataset_id, table_id = split 

635 job_config.allow_large_results = True 

636 dataset_ref = bigquery.dataset.DatasetReference(client.project, dataset_id) 

637 destination_table_ref = dataset_ref.table(table_id) 

638 job_config.destination = destination_table_ref 

639 job_config.create_disposition = "CREATE_IF_NEEDED" 

640 job_config.write_disposition = "WRITE_TRUNCATE" 

641 _create_dataset_if_necessary(client, dataset_id) 

642 

643 if args.maximum_bytes_billed == "None": 

644 job_config.maximum_bytes_billed = 0 

645 elif args.maximum_bytes_billed is not None: 

646 value = int(args.maximum_bytes_billed) 

647 job_config.maximum_bytes_billed = value 

648 

649 try: 

650 query_job = _run_query(client, query, job_config=job_config) 

651 except Exception as ex: 

652 _handle_error(ex, args.destination_var) 

653 return 

654 

655 if not args.verbose: 

656 display.clear_output() 

657 

658 if args.dry_run and args.destination_var: 

659 IPython.get_ipython().push({args.destination_var: query_job}) 

660 return 

661 elif args.dry_run: 

662 print( 

663 "Query validated. This query will process {} bytes.".format( 

664 query_job.total_bytes_processed 

665 ) 

666 ) 

667 return query_job 

668 

669 progress_bar = context.progress_bar_type or args.progress_bar_type 

670 

671 if max_results: 

672 result = query_job.result(max_results=max_results).to_dataframe( 

673 bqstorage_client=None, 

674 create_bqstorage_client=False, 

675 progress_bar_type=progress_bar, 

676 ) 

677 else: 

678 result = query_job.to_dataframe( 

679 bqstorage_client=bqstorage_client, 

680 create_bqstorage_client=False, 

681 progress_bar_type=progress_bar, 

682 ) 

683 

684 if args.destination_var: 

685 IPython.get_ipython().push({args.destination_var: result}) 

686 else: 

687 return result 

688 finally: 

689 close_transports() 

690 

691 

692def _split_args_line(line): 

693 """Split out the --params option value from the input line arguments. 

694 

695 Args: 

696 line (str): The line arguments passed to the cell magic. 

697 

698 Returns: 

699 Tuple[str, str] 

700 """ 

701 lexer = lap.Lexer(line) 

702 scanner = lap.Parser(lexer) 

703 tree = scanner.input_line() 

704 

705 extractor = lap.QueryParamsExtractor() 

706 params_option_value, rest_of_args = extractor.visit(tree) 

707 

708 return params_option_value, rest_of_args 

709 

710 

711def _make_bqstorage_client(client, use_bqstorage_api, client_options): 

712 """Creates a BigQuery Storage client. 

713 

714 Args: 

715 client (:class:`~google.cloud.bigquery.client.Client`): BigQuery client. 

716 use_bqstorage_api (bool): whether BigQuery Storage API is used or not. 

717 client_options (:class:`google.api_core.client_options.ClientOptions`): 

718 Custom options used with a new BigQuery Storage client instance 

719 if one is created. 

720 

721 Raises: 

722 ImportError: if google-cloud-bigquery-storage is not installed, or 

723 grpcio package is not installed. 

724 

725 

726 Returns: 

727 None: if ``use_bqstorage_api == False``, or google-cloud-bigquery-storage 

728 is outdated. 

729 BigQuery Storage Client: 

730 """ 

731 if not use_bqstorage_api: 

732 return None 

733 

734 try: 

735 _versions_helpers.BQ_STORAGE_VERSIONS.try_import(raise_if_error=True) 

736 except exceptions.BigQueryStorageNotFoundError as err: 

737 customized_error = ImportError( 

738 "The default BigQuery Storage API client cannot be used, install " 

739 "the missing google-cloud-bigquery-storage and pyarrow packages " 

740 "to use it. Alternatively, use the classic REST API by specifying " 

741 "the --use_rest_api magic option." 

742 ) 

743 raise customized_error from err 

744 except exceptions.LegacyBigQueryStorageError: 

745 pass 

746 

747 try: 

748 from google.api_core.gapic_v1 import client_info as gapic_client_info 

749 except ImportError as err: 

750 customized_error = ImportError( 

751 "Install the grpcio package to use the BigQuery Storage API." 

752 ) 

753 raise customized_error from err 

754 

755 return client._ensure_bqstorage_client( 

756 client_options=client_options, 

757 client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT), 

758 ) 

759 

760 

761def _close_transports(client, bqstorage_client): 

762 """Close the given clients' underlying transport channels. 

763 

764 Closing the transport is needed to release system resources, namely open 

765 sockets. 

766 

767 Args: 

768 client (:class:`~google.cloud.bigquery.client.Client`): 

769 bqstorage_client 

770 (Optional[:class:`~google.cloud.bigquery_storage.BigQueryReadClient`]): 

771 A client for the BigQuery Storage API. 

772 

773 """ 

774 client.close() 

775 if bqstorage_client is not None: 

776 bqstorage_client._transport.grpc_channel.close()