Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/magics/magics.py: 31%

260 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1# Copyright 2018 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""IPython Magics 

16 

17.. function:: %%bigquery 

18 

19 IPython cell magic to run a query and display the result as a DataFrame 

20 

21 .. code-block:: python 

22 

23 %%bigquery [<destination_var>] [--project <project>] [--use_legacy_sql] 

24 [--verbose] [--params <params>] 

25 <query> 

26 

27 Parameters: 

28 

29 * ``<destination_var>`` (Optional[line argument]): 

30 variable to store the query results. The results are not displayed if 

31 this parameter is used. If an error occurs during the query execution, 

32 the corresponding ``QueryJob`` instance (if available) is stored in 

33 the variable instead. 

34 * ``--destination_table`` (Optional[line argument]): 

35 A dataset and table to store the query results. If table does not exists, 

36 it will be created. If table already exists, its data will be overwritten. 

37 Variable should be in a format <dataset_id>.<table_id>. 

38 * ``--no_query_cache`` (Optional[line argument]): 

39 Do not use cached query results. 

40 * ``--project <project>`` (Optional[line argument]): 

41 Project to use for running the query. Defaults to the context 

42 :attr:`~google.cloud.bigquery.magics.Context.project`. 

43 * ``--use_bqstorage_api`` (Optional[line argument]): 

44 [Deprecated] Not used anymore, as BigQuery Storage API is used by default. 

45 * ``--use_rest_api`` (Optional[line argument]): 

46 Use the BigQuery REST API instead of the Storage API. 

47 * ``--use_legacy_sql`` (Optional[line argument]): 

48 Runs the query using Legacy SQL syntax. Defaults to Standard SQL if 

49 this argument not used. 

50 * ``--verbose`` (Optional[line argument]): 

51 If this flag is used, information including the query job ID and the 

52 amount of time for the query to complete will not be cleared after the 

53 query is finished. By default, this information will be displayed but 

54 will be cleared after the query is finished. 

55 * ``--params <params>`` (Optional[line argument]): 

56 If present, the argument following the ``--params`` flag must be 

57 either: 

58 

59 * :class:`str` - A JSON string representation of a dictionary in the 

60 format ``{"param_name": "param_value"}`` (ex. ``{"num": 17}``). Use 

61 of the parameter in the query should be indicated with 

62 ``@param_name``. See ``In[5]`` in the Examples section below. 

63 

64 * :class:`dict` reference - A reference to a ``dict`` in the format 

65 ``{"param_name": "param_value"}``, where the value types must be JSON 

66 serializable. The variable reference is indicated by a ``$`` before 

67 the variable name (ex. ``$my_dict_var``). See ``In[6]`` and ``In[7]`` 

68 in the Examples section below. 

69 

70 * ``<query>`` (required, cell argument): 

71 SQL query to run. If the query does not contain any whitespace (aside 

72 from leading and trailing whitespace), it is assumed to represent a 

73 fully-qualified table ID, and the latter's data will be fetched. 

74 

75 Returns: 

76 A :class:`pandas.DataFrame` with the query results. 

77 

78 .. note:: 

79 All queries run using this magic will run using the context 

80 :attr:`~google.cloud.bigquery.magics.Context.credentials`. 

81""" 

82 

83from __future__ import print_function 

84 

85import re 

86import ast 

87import copy 

88import functools 

89import sys 

90import time 

91import warnings 

92from concurrent import futures 

93 

94try: 

95 import IPython # type: ignore 

96 from IPython import display # type: ignore 

97 from IPython.core import magic_arguments # type: ignore 

98except ImportError: # pragma: NO COVER 

99 raise ImportError("This module can only be loaded in IPython.") 

100 

101from google.api_core import client_info 

102from google.api_core import client_options 

103from google.api_core.exceptions import NotFound 

104import google.auth # type: ignore 

105from google.cloud import bigquery 

106import google.cloud.bigquery.dataset 

107from google.cloud.bigquery.dbapi import _helpers 

108from google.cloud.bigquery.magics import line_arg_parser as lap 

109 

110 

111IPYTHON_USER_AGENT = "ipython-{}".format(IPython.__version__) 

112 

113 

114class Context(object): 

115 """Storage for objects to be used throughout an IPython notebook session. 

116 

117 A Context object is initialized when the ``magics`` module is imported, 

118 and can be found at ``google.cloud.bigquery.magics.context``. 

119 """ 

120 

121 def __init__(self): 

122 self._credentials = None 

123 self._project = None 

124 self._connection = None 

125 self._default_query_job_config = bigquery.QueryJobConfig() 

126 self._bigquery_client_options = client_options.ClientOptions() 

127 self._bqstorage_client_options = client_options.ClientOptions() 

128 self._progress_bar_type = "tqdm_notebook" 

129 

130 @property 

131 def credentials(self): 

132 """google.auth.credentials.Credentials: Credentials to use for queries 

133 performed through IPython magics. 

134 

135 Note: 

136 These credentials do not need to be explicitly defined if you are 

137 using Application Default Credentials. If you are not using 

138 Application Default Credentials, manually construct a 

139 :class:`google.auth.credentials.Credentials` object and set it as 

140 the context credentials as demonstrated in the example below. See 

141 `auth docs`_ for more information on obtaining credentials. 

142 

143 Example: 

144 Manually setting the context credentials: 

145 

146 >>> from google.cloud.bigquery import magics 

147 >>> from google.oauth2 import service_account 

148 >>> credentials = (service_account 

149 ... .Credentials.from_service_account_file( 

150 ... '/path/to/key.json')) 

151 >>> magics.context.credentials = credentials 

152 

153 

154 .. _auth docs: http://google-auth.readthedocs.io 

155 /en/latest/user-guide.html#obtaining-credentials 

156 """ 

157 if self._credentials is None: 

158 self._credentials, _ = google.auth.default() 

159 return self._credentials 

160 

161 @credentials.setter 

162 def credentials(self, value): 

163 self._credentials = value 

164 

165 @property 

166 def project(self): 

167 """str: Default project to use for queries performed through IPython 

168 magics. 

169 

170 Note: 

171 The project does not need to be explicitly defined if you have an 

172 environment default project set. If you do not have a default 

173 project set in your environment, manually assign the project as 

174 demonstrated in the example below. 

175 

176 Example: 

177 Manually setting the context project: 

178 

179 >>> from google.cloud.bigquery import magics 

180 >>> magics.context.project = 'my-project' 

181 """ 

182 if self._project is None: 

183 _, self._project = google.auth.default() 

184 return self._project 

185 

186 @project.setter 

187 def project(self, value): 

188 self._project = value 

189 

190 @property 

191 def bigquery_client_options(self): 

192 """google.api_core.client_options.ClientOptions: client options to be 

193 used through IPython magics. 

194 

195 Note:: 

196 The client options do not need to be explicitly defined if no 

197 special network connections are required. Normally you would be 

198 using the https://bigquery.googleapis.com/ end point. 

199 

200 Example: 

201 Manually setting the endpoint: 

202 

203 >>> from google.cloud.bigquery import magics 

204 >>> client_options = {} 

205 >>> client_options['api_endpoint'] = "https://some.special.url" 

206 >>> magics.context.bigquery_client_options = client_options 

207 """ 

208 return self._bigquery_client_options 

209 

210 @bigquery_client_options.setter 

211 def bigquery_client_options(self, value): 

212 self._bigquery_client_options = value 

213 

214 @property 

215 def bqstorage_client_options(self): 

216 """google.api_core.client_options.ClientOptions: client options to be 

217 used through IPython magics for the storage client. 

218 

219 Note:: 

220 The client options do not need to be explicitly defined if no 

221 special network connections are required. Normally you would be 

222 using the https://bigquerystorage.googleapis.com/ end point. 

223 

224 Example: 

225 Manually setting the endpoint: 

226 

227 >>> from google.cloud.bigquery import magics 

228 >>> client_options = {} 

229 >>> client_options['api_endpoint'] = "https://some.special.url" 

230 >>> magics.context.bqstorage_client_options = client_options 

231 """ 

232 return self._bqstorage_client_options 

233 

234 @bqstorage_client_options.setter 

235 def bqstorage_client_options(self, value): 

236 self._bqstorage_client_options = value 

237 

238 @property 

239 def default_query_job_config(self): 

240 """google.cloud.bigquery.job.QueryJobConfig: Default job 

241 configuration for queries. 

242 

243 The context's :class:`~google.cloud.bigquery.job.QueryJobConfig` is 

244 used for queries. Some properties can be overridden with arguments to 

245 the magics. 

246 

247 Example: 

248 Manually setting the default value for ``maximum_bytes_billed`` 

249 to 100 MB: 

250 

251 >>> from google.cloud.bigquery import magics 

252 >>> magics.context.default_query_job_config.maximum_bytes_billed = 100000000 

253 """ 

254 return self._default_query_job_config 

255 

256 @default_query_job_config.setter 

257 def default_query_job_config(self, value): 

258 self._default_query_job_config = value 

259 

260 @property 

261 def progress_bar_type(self): 

262 """str: Default progress bar type to use to display progress bar while 

263 executing queries through IPython magics. 

264 

265 Note:: 

266 Install the ``tqdm`` package to use this feature. 

267 

268 Example: 

269 Manually setting the progress_bar_type: 

270 

271 >>> from google.cloud.bigquery import magics 

272 >>> magics.context.progress_bar_type = "tqdm_notebook" 

273 """ 

274 return self._progress_bar_type 

275 

276 @progress_bar_type.setter 

277 def progress_bar_type(self, value): 

278 self._progress_bar_type = value 

279 

280 

281context = Context() 

282 

283 

284def _handle_error(error, destination_var=None): 

285 """Process a query execution error. 

286 

287 Args: 

288 error (Exception): 

289 An exception that ocurred during the query execution. 

290 destination_var (Optional[str]): 

291 The name of the IPython session variable to store the query job. 

292 """ 

293 if destination_var: 

294 query_job = getattr(error, "query_job", None) 

295 

296 if query_job is not None: 

297 IPython.get_ipython().push({destination_var: query_job}) 

298 else: 

299 # this is the case when previewing table rows by providing just 

300 # table ID to cell magic 

301 print( 

302 "Could not save output to variable '{}'.".format(destination_var), 

303 file=sys.stderr, 

304 ) 

305 

306 print("\nERROR:\n", str(error), file=sys.stderr) 

307 

308 

309def _run_query(client, query, job_config=None): 

310 """Runs a query while printing status updates 

311 

312 Args: 

313 client (google.cloud.bigquery.client.Client): 

314 Client to bundle configuration needed for API requests. 

315 query (str): 

316 SQL query to be executed. Defaults to the standard SQL dialect. 

317 Use the ``job_config`` parameter to change dialects. 

318 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]): 

319 Extra configuration options for the job. 

320 

321 Returns: 

322 google.cloud.bigquery.job.QueryJob: the query job created 

323 

324 Example: 

325 >>> client = bigquery.Client() 

326 >>> _run_query(client, "SELECT 17") 

327 Executing query with job ID: bf633912-af2c-4780-b568-5d868058632b 

328 Query executing: 1.66s 

329 Query complete after 2.07s 

330 'bf633912-af2c-4780-b568-5d868058632b' 

331 """ 

332 start_time = time.perf_counter() 

333 query_job = client.query(query, job_config=job_config) 

334 

335 if job_config and job_config.dry_run: 

336 return query_job 

337 

338 print(f"Executing query with job ID: {query_job.job_id}") 

339 

340 while True: 

341 print( 

342 f"\rQuery executing: {time.perf_counter() - start_time:.2f}s".format(), 

343 end="", 

344 ) 

345 try: 

346 query_job.result(timeout=0.5) 

347 break 

348 except futures.TimeoutError: 

349 continue 

350 print(f"\nJob ID {query_job.job_id} successfully executed") 

351 return query_job 

352 

353 

354def _create_dataset_if_necessary(client, dataset_id): 

355 """Create a dataset in the current project if it doesn't exist. 

356 

357 Args: 

358 client (google.cloud.bigquery.client.Client): 

359 Client to bundle configuration needed for API requests. 

360 dataset_id (str): 

361 Dataset id. 

362 """ 

363 dataset_reference = bigquery.dataset.DatasetReference(client.project, dataset_id) 

364 try: 

365 dataset = client.get_dataset(dataset_reference) 

366 return 

367 except NotFound: 

368 pass 

369 dataset = bigquery.Dataset(dataset_reference) 

370 dataset.location = client.location 

371 print(f"Creating dataset: {dataset_id}") 

372 dataset = client.create_dataset(dataset) 

373 

374 

375@magic_arguments.magic_arguments() 

376@magic_arguments.argument( 

377 "destination_var", 

378 nargs="?", 

379 help=("If provided, save the output to this variable instead of displaying it."), 

380) 

381@magic_arguments.argument( 

382 "--destination_table", 

383 type=str, 

384 default=None, 

385 help=( 

386 "If provided, save the output of the query to a new BigQuery table. " 

387 "Variable should be in a format <dataset_id>.<table_id>. " 

388 "If table does not exists, it will be created. " 

389 "If table already exists, its data will be overwritten." 

390 ), 

391) 

392@magic_arguments.argument( 

393 "--project", 

394 type=str, 

395 default=None, 

396 help=("Project to use for executing this query. Defaults to the context project."), 

397) 

398@magic_arguments.argument( 

399 "--max_results", 

400 default=None, 

401 help=( 

402 "Maximum number of rows in dataframe returned from executing the query." 

403 "Defaults to returning all rows." 

404 ), 

405) 

406@magic_arguments.argument( 

407 "--maximum_bytes_billed", 

408 default=None, 

409 help=( 

410 "maximum_bytes_billed to use for executing this query. Defaults to " 

411 "the context default_query_job_config.maximum_bytes_billed." 

412 ), 

413) 

414@magic_arguments.argument( 

415 "--dry_run", 

416 action="store_true", 

417 default=False, 

418 help=( 

419 "Sets query to be a dry run to estimate costs. " 

420 "Defaults to executing the query instead of dry run if this argument is not used." 

421 ), 

422) 

423@magic_arguments.argument( 

424 "--use_legacy_sql", 

425 action="store_true", 

426 default=False, 

427 help=( 

428 "Sets query to use Legacy SQL instead of Standard SQL. Defaults to " 

429 "Standard SQL if this argument is not used." 

430 ), 

431) 

432@magic_arguments.argument( 

433 "--bigquery_api_endpoint", 

434 type=str, 

435 default=None, 

436 help=( 

437 "The desired API endpoint, e.g., bigquery.googlepis.com. Defaults to this " 

438 "option's value in the context bigquery_client_options." 

439 ), 

440) 

441@magic_arguments.argument( 

442 "--bqstorage_api_endpoint", 

443 type=str, 

444 default=None, 

445 help=( 

446 "The desired API endpoint, e.g., bigquerystorage.googlepis.com. Defaults to " 

447 "this option's value in the context bqstorage_client_options." 

448 ), 

449) 

450@magic_arguments.argument( 

451 "--no_query_cache", 

452 action="store_true", 

453 default=False, 

454 help=("Do not use cached query results."), 

455) 

456@magic_arguments.argument( 

457 "--use_bqstorage_api", 

458 action="store_true", 

459 default=None, 

460 help=( 

461 "[Deprecated] The BigQuery Storage API is already used by default to " 

462 "download large query results, and this option has no effect. " 

463 "If you want to switch to the classic REST API instead, use the " 

464 "--use_rest_api option." 

465 ), 

466) 

467@magic_arguments.argument( 

468 "--use_rest_api", 

469 action="store_true", 

470 default=False, 

471 help=( 

472 "Use the classic REST API instead of the BigQuery Storage API to " 

473 "download query results." 

474 ), 

475) 

476@magic_arguments.argument( 

477 "--verbose", 

478 action="store_true", 

479 default=False, 

480 help=( 

481 "If set, print verbose output, including the query job ID and the " 

482 "amount of time for the query to finish. By default, this " 

483 "information will be displayed as the query runs, but will be " 

484 "cleared after the query is finished." 

485 ), 

486) 

487@magic_arguments.argument( 

488 "--params", 

489 nargs="+", 

490 default=None, 

491 help=( 

492 "Parameters to format the query string. If present, the --params " 

493 "flag should be followed by a string representation of a dictionary " 

494 "in the format {'param_name': 'param_value'} (ex. {\"num\": 17}), " 

495 "or a reference to a dictionary in the same format. The dictionary " 

496 "reference can be made by including a '$' before the variable " 

497 "name (ex. $my_dict_var)." 

498 ), 

499) 

500@magic_arguments.argument( 

501 "--progress_bar_type", 

502 type=str, 

503 default=None, 

504 help=( 

505 "Sets progress bar type to display a progress bar while executing the query." 

506 "Defaults to use tqdm_notebook. Install the ``tqdm`` package to use this feature." 

507 ), 

508) 

509def _cell_magic(line, query): 

510 """Underlying function for bigquery cell magic 

511 

512 Note: 

513 This function contains the underlying logic for the 'bigquery' cell 

514 magic. This function is not meant to be called directly. 

515 

516 Args: 

517 line (str): "%%bigquery" followed by arguments as required 

518 query (str): SQL query to run 

519 

520 Returns: 

521 pandas.DataFrame: the query results. 

522 """ 

523 # The built-in parser does not recognize Python structures such as dicts, thus 

524 # we extract the "--params" option and inteprpret it separately. 

525 try: 

526 params_option_value, rest_of_args = _split_args_line(line) 

527 except lap.exceptions.QueryParamsParseError as exc: 

528 rebranded_error = SyntaxError( 

529 "--params is not a correctly formatted JSON string or a JSON " 

530 "serializable dictionary" 

531 ) 

532 raise rebranded_error from exc 

533 except lap.exceptions.DuplicateQueryParamsError as exc: 

534 rebranded_error = ValueError("Duplicate --params option.") 

535 raise rebranded_error from exc 

536 except lap.exceptions.ParseError as exc: 

537 rebranded_error = ValueError( 

538 "Unrecognized input, are option values correct? " 

539 "Error details: {}".format(exc.args[0]) 

540 ) 

541 raise rebranded_error from exc 

542 

543 args = magic_arguments.parse_argstring(_cell_magic, rest_of_args) 

544 

545 if args.use_bqstorage_api is not None: 

546 warnings.warn( 

547 "Deprecated option --use_bqstorage_api, the BigQuery " 

548 "Storage API is already used by default.", 

549 category=DeprecationWarning, 

550 ) 

551 use_bqstorage_api = not args.use_rest_api 

552 

553 params = [] 

554 if params_option_value: 

555 # A non-existing params variable is not expanded and ends up in the input 

556 # in its raw form, e.g. "$query_params". 

557 if params_option_value.startswith("$"): 

558 msg = 'Parameter expansion failed, undefined variable "{}".'.format( 

559 params_option_value[1:] 

560 ) 

561 raise NameError(msg) 

562 

563 params = _helpers.to_query_parameters(ast.literal_eval(params_option_value), {}) 

564 

565 project = args.project or context.project 

566 

567 bigquery_client_options = copy.deepcopy(context.bigquery_client_options) 

568 if args.bigquery_api_endpoint: 

569 if isinstance(bigquery_client_options, dict): 

570 bigquery_client_options["api_endpoint"] = args.bigquery_api_endpoint 

571 else: 

572 bigquery_client_options.api_endpoint = args.bigquery_api_endpoint 

573 

574 client = bigquery.Client( 

575 project=project, 

576 credentials=context.credentials, 

577 default_query_job_config=context.default_query_job_config, 

578 client_info=client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT), 

579 client_options=bigquery_client_options, 

580 ) 

581 if context._connection: 

582 client._connection = context._connection 

583 

584 bqstorage_client_options = copy.deepcopy(context.bqstorage_client_options) 

585 if args.bqstorage_api_endpoint: 

586 if isinstance(bqstorage_client_options, dict): 

587 bqstorage_client_options["api_endpoint"] = args.bqstorage_api_endpoint 

588 else: 

589 bqstorage_client_options.api_endpoint = args.bqstorage_api_endpoint 

590 

591 bqstorage_client = _make_bqstorage_client( 

592 client, 

593 use_bqstorage_api, 

594 bqstorage_client_options, 

595 ) 

596 

597 close_transports = functools.partial(_close_transports, client, bqstorage_client) 

598 

599 try: 

600 if args.max_results: 

601 max_results = int(args.max_results) 

602 else: 

603 max_results = None 

604 

605 query = query.strip() 

606 

607 if not query: 

608 error = ValueError("Query is missing.") 

609 _handle_error(error, args.destination_var) 

610 return 

611 

612 # Check if query is given as a reference to a variable. 

613 if query.startswith("$"): 

614 query_var_name = query[1:] 

615 

616 if not query_var_name: 

617 missing_msg = 'Missing query variable name, empty "$" is not allowed.' 

618 raise NameError(missing_msg) 

619 

620 if query_var_name.isidentifier(): 

621 ip = IPython.get_ipython() 

622 query = ip.user_ns.get(query_var_name, ip) # ip serves as a sentinel 

623 

624 if query is ip: 

625 raise NameError( 

626 f"Unknown query, variable {query_var_name} does not exist." 

627 ) 

628 else: 

629 if not isinstance(query, (str, bytes)): 

630 raise TypeError( 

631 f"Query variable {query_var_name} must be a string " 

632 "or a bytes-like value." 

633 ) 

634 

635 # Any query that does not contain whitespace (aside from leading and trailing whitespace) 

636 # is assumed to be a table id 

637 if not re.search(r"\s", query): 

638 try: 

639 rows = client.list_rows(query, max_results=max_results) 

640 except Exception as ex: 

641 _handle_error(ex, args.destination_var) 

642 return 

643 

644 result = rows.to_dataframe( 

645 bqstorage_client=bqstorage_client, 

646 create_bqstorage_client=False, 

647 ) 

648 if args.destination_var: 

649 IPython.get_ipython().push({args.destination_var: result}) 

650 return 

651 else: 

652 return result 

653 

654 job_config = bigquery.job.QueryJobConfig() 

655 job_config.query_parameters = params 

656 job_config.use_legacy_sql = args.use_legacy_sql 

657 job_config.dry_run = args.dry_run 

658 

659 # Don't override context job config unless --no_query_cache is explicitly set. 

660 if args.no_query_cache: 

661 job_config.use_query_cache = False 

662 

663 if args.destination_table: 

664 split = args.destination_table.split(".") 

665 if len(split) != 2: 

666 raise ValueError( 

667 "--destination_table should be in a <dataset_id>.<table_id> format." 

668 ) 

669 dataset_id, table_id = split 

670 job_config.allow_large_results = True 

671 dataset_ref = bigquery.dataset.DatasetReference(client.project, dataset_id) 

672 destination_table_ref = dataset_ref.table(table_id) 

673 job_config.destination = destination_table_ref 

674 job_config.create_disposition = "CREATE_IF_NEEDED" 

675 job_config.write_disposition = "WRITE_TRUNCATE" 

676 _create_dataset_if_necessary(client, dataset_id) 

677 

678 if args.maximum_bytes_billed == "None": 

679 job_config.maximum_bytes_billed = 0 

680 elif args.maximum_bytes_billed is not None: 

681 value = int(args.maximum_bytes_billed) 

682 job_config.maximum_bytes_billed = value 

683 

684 try: 

685 query_job = _run_query(client, query, job_config=job_config) 

686 except Exception as ex: 

687 _handle_error(ex, args.destination_var) 

688 return 

689 

690 if not args.verbose: 

691 display.clear_output() 

692 

693 if args.dry_run and args.destination_var: 

694 IPython.get_ipython().push({args.destination_var: query_job}) 

695 return 

696 elif args.dry_run: 

697 print( 

698 "Query validated. This query will process {} bytes.".format( 

699 query_job.total_bytes_processed 

700 ) 

701 ) 

702 return query_job 

703 

704 progress_bar = context.progress_bar_type or args.progress_bar_type 

705 

706 if max_results: 

707 result = query_job.result(max_results=max_results).to_dataframe( 

708 bqstorage_client=None, 

709 create_bqstorage_client=False, 

710 progress_bar_type=progress_bar, 

711 ) 

712 else: 

713 result = query_job.to_dataframe( 

714 bqstorage_client=bqstorage_client, 

715 create_bqstorage_client=False, 

716 progress_bar_type=progress_bar, 

717 ) 

718 

719 if args.destination_var: 

720 IPython.get_ipython().push({args.destination_var: result}) 

721 else: 

722 return result 

723 finally: 

724 close_transports() 

725 

726 

727def _split_args_line(line): 

728 """Split out the --params option value from the input line arguments. 

729 

730 Args: 

731 line (str): The line arguments passed to the cell magic. 

732 

733 Returns: 

734 Tuple[str, str] 

735 """ 

736 lexer = lap.Lexer(line) 

737 scanner = lap.Parser(lexer) 

738 tree = scanner.input_line() 

739 

740 extractor = lap.QueryParamsExtractor() 

741 params_option_value, rest_of_args = extractor.visit(tree) 

742 

743 return params_option_value, rest_of_args 

744 

745 

746def _make_bqstorage_client(client, use_bqstorage_api, client_options): 

747 if not use_bqstorage_api: 

748 return None 

749 

750 try: 

751 from google.cloud import bigquery_storage # type: ignore # noqa: F401 

752 except ImportError as err: 

753 customized_error = ImportError( 

754 "The default BigQuery Storage API client cannot be used, install " 

755 "the missing google-cloud-bigquery-storage and pyarrow packages " 

756 "to use it. Alternatively, use the classic REST API by specifying " 

757 "the --use_rest_api magic option." 

758 ) 

759 raise customized_error from err 

760 

761 try: 

762 from google.api_core.gapic_v1 import client_info as gapic_client_info 

763 except ImportError as err: 

764 customized_error = ImportError( 

765 "Install the grpcio package to use the BigQuery Storage API." 

766 ) 

767 raise customized_error from err 

768 

769 return client._ensure_bqstorage_client( 

770 client_options=client_options, 

771 client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT), 

772 ) 

773 

774 

775def _close_transports(client, bqstorage_client): 

776 """Close the given clients' underlying transport channels. 

777 

778 Closing the transport is needed to release system resources, namely open 

779 sockets. 

780 

781 Args: 

782 client (:class:`~google.cloud.bigquery.client.Client`): 

783 bqstorage_client 

784 (Optional[:class:`~google.cloud.bigquery_storage.BigQueryReadClient`]): 

785 A client for the BigQuery Storage API. 

786 

787 """ 

788 client.close() 

789 if bqstorage_client is not None: 

790 bqstorage_client._transport.grpc_channel.close()