Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/magics/magics.py: 31%
260 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1# Copyright 2018 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""IPython Magics
17.. function:: %%bigquery
19 IPython cell magic to run a query and display the result as a DataFrame
21 .. code-block:: python
23 %%bigquery [<destination_var>] [--project <project>] [--use_legacy_sql]
24 [--verbose] [--params <params>]
25 <query>
27 Parameters:
29 * ``<destination_var>`` (Optional[line argument]):
30 variable to store the query results. The results are not displayed if
31 this parameter is used. If an error occurs during the query execution,
32 the corresponding ``QueryJob`` instance (if available) is stored in
33 the variable instead.
34 * ``--destination_table`` (Optional[line argument]):
35 A dataset and table to store the query results. If table does not exists,
36 it will be created. If table already exists, its data will be overwritten.
37 Variable should be in a format <dataset_id>.<table_id>.
38 * ``--no_query_cache`` (Optional[line argument]):
39 Do not use cached query results.
40 * ``--project <project>`` (Optional[line argument]):
41 Project to use for running the query. Defaults to the context
42 :attr:`~google.cloud.bigquery.magics.Context.project`.
43 * ``--use_bqstorage_api`` (Optional[line argument]):
44 [Deprecated] Not used anymore, as BigQuery Storage API is used by default.
45 * ``--use_rest_api`` (Optional[line argument]):
46 Use the BigQuery REST API instead of the Storage API.
47 * ``--use_legacy_sql`` (Optional[line argument]):
48 Runs the query using Legacy SQL syntax. Defaults to Standard SQL if
49 this argument not used.
50 * ``--verbose`` (Optional[line argument]):
51 If this flag is used, information including the query job ID and the
52 amount of time for the query to complete will not be cleared after the
53 query is finished. By default, this information will be displayed but
54 will be cleared after the query is finished.
55 * ``--params <params>`` (Optional[line argument]):
56 If present, the argument following the ``--params`` flag must be
57 either:
59 * :class:`str` - A JSON string representation of a dictionary in the
60 format ``{"param_name": "param_value"}`` (ex. ``{"num": 17}``). Use
61 of the parameter in the query should be indicated with
62 ``@param_name``. See ``In[5]`` in the Examples section below.
64 * :class:`dict` reference - A reference to a ``dict`` in the format
65 ``{"param_name": "param_value"}``, where the value types must be JSON
66 serializable. The variable reference is indicated by a ``$`` before
67 the variable name (ex. ``$my_dict_var``). See ``In[6]`` and ``In[7]``
68 in the Examples section below.
70 * ``<query>`` (required, cell argument):
71 SQL query to run. If the query does not contain any whitespace (aside
72 from leading and trailing whitespace), it is assumed to represent a
73 fully-qualified table ID, and the latter's data will be fetched.
75 Returns:
76 A :class:`pandas.DataFrame` with the query results.
78 .. note::
79 All queries run using this magic will run using the context
80 :attr:`~google.cloud.bigquery.magics.Context.credentials`.
81"""
83from __future__ import print_function
85import re
86import ast
87import copy
88import functools
89import sys
90import time
91import warnings
92from concurrent import futures
94try:
95 import IPython # type: ignore
96 from IPython import display # type: ignore
97 from IPython.core import magic_arguments # type: ignore
98except ImportError: # pragma: NO COVER
99 raise ImportError("This module can only be loaded in IPython.")
101from google.api_core import client_info
102from google.api_core import client_options
103from google.api_core.exceptions import NotFound
104import google.auth # type: ignore
105from google.cloud import bigquery
106import google.cloud.bigquery.dataset
107from google.cloud.bigquery.dbapi import _helpers
108from google.cloud.bigquery.magics import line_arg_parser as lap
111IPYTHON_USER_AGENT = "ipython-{}".format(IPython.__version__)
114class Context(object):
115 """Storage for objects to be used throughout an IPython notebook session.
117 A Context object is initialized when the ``magics`` module is imported,
118 and can be found at ``google.cloud.bigquery.magics.context``.
119 """
121 def __init__(self):
122 self._credentials = None
123 self._project = None
124 self._connection = None
125 self._default_query_job_config = bigquery.QueryJobConfig()
126 self._bigquery_client_options = client_options.ClientOptions()
127 self._bqstorage_client_options = client_options.ClientOptions()
128 self._progress_bar_type = "tqdm_notebook"
130 @property
131 def credentials(self):
132 """google.auth.credentials.Credentials: Credentials to use for queries
133 performed through IPython magics.
135 Note:
136 These credentials do not need to be explicitly defined if you are
137 using Application Default Credentials. If you are not using
138 Application Default Credentials, manually construct a
139 :class:`google.auth.credentials.Credentials` object and set it as
140 the context credentials as demonstrated in the example below. See
141 `auth docs`_ for more information on obtaining credentials.
143 Example:
144 Manually setting the context credentials:
146 >>> from google.cloud.bigquery import magics
147 >>> from google.oauth2 import service_account
148 >>> credentials = (service_account
149 ... .Credentials.from_service_account_file(
150 ... '/path/to/key.json'))
151 >>> magics.context.credentials = credentials
154 .. _auth docs: http://google-auth.readthedocs.io
155 /en/latest/user-guide.html#obtaining-credentials
156 """
157 if self._credentials is None:
158 self._credentials, _ = google.auth.default()
159 return self._credentials
161 @credentials.setter
162 def credentials(self, value):
163 self._credentials = value
165 @property
166 def project(self):
167 """str: Default project to use for queries performed through IPython
168 magics.
170 Note:
171 The project does not need to be explicitly defined if you have an
172 environment default project set. If you do not have a default
173 project set in your environment, manually assign the project as
174 demonstrated in the example below.
176 Example:
177 Manually setting the context project:
179 >>> from google.cloud.bigquery import magics
180 >>> magics.context.project = 'my-project'
181 """
182 if self._project is None:
183 _, self._project = google.auth.default()
184 return self._project
186 @project.setter
187 def project(self, value):
188 self._project = value
190 @property
191 def bigquery_client_options(self):
192 """google.api_core.client_options.ClientOptions: client options to be
193 used through IPython magics.
195 Note::
196 The client options do not need to be explicitly defined if no
197 special network connections are required. Normally you would be
198 using the https://bigquery.googleapis.com/ end point.
200 Example:
201 Manually setting the endpoint:
203 >>> from google.cloud.bigquery import magics
204 >>> client_options = {}
205 >>> client_options['api_endpoint'] = "https://some.special.url"
206 >>> magics.context.bigquery_client_options = client_options
207 """
208 return self._bigquery_client_options
210 @bigquery_client_options.setter
211 def bigquery_client_options(self, value):
212 self._bigquery_client_options = value
214 @property
215 def bqstorage_client_options(self):
216 """google.api_core.client_options.ClientOptions: client options to be
217 used through IPython magics for the storage client.
219 Note::
220 The client options do not need to be explicitly defined if no
221 special network connections are required. Normally you would be
222 using the https://bigquerystorage.googleapis.com/ end point.
224 Example:
225 Manually setting the endpoint:
227 >>> from google.cloud.bigquery import magics
228 >>> client_options = {}
229 >>> client_options['api_endpoint'] = "https://some.special.url"
230 >>> magics.context.bqstorage_client_options = client_options
231 """
232 return self._bqstorage_client_options
234 @bqstorage_client_options.setter
235 def bqstorage_client_options(self, value):
236 self._bqstorage_client_options = value
238 @property
239 def default_query_job_config(self):
240 """google.cloud.bigquery.job.QueryJobConfig: Default job
241 configuration for queries.
243 The context's :class:`~google.cloud.bigquery.job.QueryJobConfig` is
244 used for queries. Some properties can be overridden with arguments to
245 the magics.
247 Example:
248 Manually setting the default value for ``maximum_bytes_billed``
249 to 100 MB:
251 >>> from google.cloud.bigquery import magics
252 >>> magics.context.default_query_job_config.maximum_bytes_billed = 100000000
253 """
254 return self._default_query_job_config
256 @default_query_job_config.setter
257 def default_query_job_config(self, value):
258 self._default_query_job_config = value
260 @property
261 def progress_bar_type(self):
262 """str: Default progress bar type to use to display progress bar while
263 executing queries through IPython magics.
265 Note::
266 Install the ``tqdm`` package to use this feature.
268 Example:
269 Manually setting the progress_bar_type:
271 >>> from google.cloud.bigquery import magics
272 >>> magics.context.progress_bar_type = "tqdm_notebook"
273 """
274 return self._progress_bar_type
276 @progress_bar_type.setter
277 def progress_bar_type(self, value):
278 self._progress_bar_type = value
281context = Context()
284def _handle_error(error, destination_var=None):
285 """Process a query execution error.
287 Args:
288 error (Exception):
289 An exception that ocurred during the query execution.
290 destination_var (Optional[str]):
291 The name of the IPython session variable to store the query job.
292 """
293 if destination_var:
294 query_job = getattr(error, "query_job", None)
296 if query_job is not None:
297 IPython.get_ipython().push({destination_var: query_job})
298 else:
299 # this is the case when previewing table rows by providing just
300 # table ID to cell magic
301 print(
302 "Could not save output to variable '{}'.".format(destination_var),
303 file=sys.stderr,
304 )
306 print("\nERROR:\n", str(error), file=sys.stderr)
309def _run_query(client, query, job_config=None):
310 """Runs a query while printing status updates
312 Args:
313 client (google.cloud.bigquery.client.Client):
314 Client to bundle configuration needed for API requests.
315 query (str):
316 SQL query to be executed. Defaults to the standard SQL dialect.
317 Use the ``job_config`` parameter to change dialects.
318 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
319 Extra configuration options for the job.
321 Returns:
322 google.cloud.bigquery.job.QueryJob: the query job created
324 Example:
325 >>> client = bigquery.Client()
326 >>> _run_query(client, "SELECT 17")
327 Executing query with job ID: bf633912-af2c-4780-b568-5d868058632b
328 Query executing: 1.66s
329 Query complete after 2.07s
330 'bf633912-af2c-4780-b568-5d868058632b'
331 """
332 start_time = time.perf_counter()
333 query_job = client.query(query, job_config=job_config)
335 if job_config and job_config.dry_run:
336 return query_job
338 print(f"Executing query with job ID: {query_job.job_id}")
340 while True:
341 print(
342 f"\rQuery executing: {time.perf_counter() - start_time:.2f}s".format(),
343 end="",
344 )
345 try:
346 query_job.result(timeout=0.5)
347 break
348 except futures.TimeoutError:
349 continue
350 print(f"\nJob ID {query_job.job_id} successfully executed")
351 return query_job
354def _create_dataset_if_necessary(client, dataset_id):
355 """Create a dataset in the current project if it doesn't exist.
357 Args:
358 client (google.cloud.bigquery.client.Client):
359 Client to bundle configuration needed for API requests.
360 dataset_id (str):
361 Dataset id.
362 """
363 dataset_reference = bigquery.dataset.DatasetReference(client.project, dataset_id)
364 try:
365 dataset = client.get_dataset(dataset_reference)
366 return
367 except NotFound:
368 pass
369 dataset = bigquery.Dataset(dataset_reference)
370 dataset.location = client.location
371 print(f"Creating dataset: {dataset_id}")
372 dataset = client.create_dataset(dataset)
375@magic_arguments.magic_arguments()
376@magic_arguments.argument(
377 "destination_var",
378 nargs="?",
379 help=("If provided, save the output to this variable instead of displaying it."),
380)
381@magic_arguments.argument(
382 "--destination_table",
383 type=str,
384 default=None,
385 help=(
386 "If provided, save the output of the query to a new BigQuery table. "
387 "Variable should be in a format <dataset_id>.<table_id>. "
388 "If table does not exists, it will be created. "
389 "If table already exists, its data will be overwritten."
390 ),
391)
392@magic_arguments.argument(
393 "--project",
394 type=str,
395 default=None,
396 help=("Project to use for executing this query. Defaults to the context project."),
397)
398@magic_arguments.argument(
399 "--max_results",
400 default=None,
401 help=(
402 "Maximum number of rows in dataframe returned from executing the query."
403 "Defaults to returning all rows."
404 ),
405)
406@magic_arguments.argument(
407 "--maximum_bytes_billed",
408 default=None,
409 help=(
410 "maximum_bytes_billed to use for executing this query. Defaults to "
411 "the context default_query_job_config.maximum_bytes_billed."
412 ),
413)
414@magic_arguments.argument(
415 "--dry_run",
416 action="store_true",
417 default=False,
418 help=(
419 "Sets query to be a dry run to estimate costs. "
420 "Defaults to executing the query instead of dry run if this argument is not used."
421 ),
422)
423@magic_arguments.argument(
424 "--use_legacy_sql",
425 action="store_true",
426 default=False,
427 help=(
428 "Sets query to use Legacy SQL instead of Standard SQL. Defaults to "
429 "Standard SQL if this argument is not used."
430 ),
431)
432@magic_arguments.argument(
433 "--bigquery_api_endpoint",
434 type=str,
435 default=None,
436 help=(
437 "The desired API endpoint, e.g., bigquery.googlepis.com. Defaults to this "
438 "option's value in the context bigquery_client_options."
439 ),
440)
441@magic_arguments.argument(
442 "--bqstorage_api_endpoint",
443 type=str,
444 default=None,
445 help=(
446 "The desired API endpoint, e.g., bigquerystorage.googlepis.com. Defaults to "
447 "this option's value in the context bqstorage_client_options."
448 ),
449)
450@magic_arguments.argument(
451 "--no_query_cache",
452 action="store_true",
453 default=False,
454 help=("Do not use cached query results."),
455)
456@magic_arguments.argument(
457 "--use_bqstorage_api",
458 action="store_true",
459 default=None,
460 help=(
461 "[Deprecated] The BigQuery Storage API is already used by default to "
462 "download large query results, and this option has no effect. "
463 "If you want to switch to the classic REST API instead, use the "
464 "--use_rest_api option."
465 ),
466)
467@magic_arguments.argument(
468 "--use_rest_api",
469 action="store_true",
470 default=False,
471 help=(
472 "Use the classic REST API instead of the BigQuery Storage API to "
473 "download query results."
474 ),
475)
476@magic_arguments.argument(
477 "--verbose",
478 action="store_true",
479 default=False,
480 help=(
481 "If set, print verbose output, including the query job ID and the "
482 "amount of time for the query to finish. By default, this "
483 "information will be displayed as the query runs, but will be "
484 "cleared after the query is finished."
485 ),
486)
487@magic_arguments.argument(
488 "--params",
489 nargs="+",
490 default=None,
491 help=(
492 "Parameters to format the query string. If present, the --params "
493 "flag should be followed by a string representation of a dictionary "
494 "in the format {'param_name': 'param_value'} (ex. {\"num\": 17}), "
495 "or a reference to a dictionary in the same format. The dictionary "
496 "reference can be made by including a '$' before the variable "
497 "name (ex. $my_dict_var)."
498 ),
499)
500@magic_arguments.argument(
501 "--progress_bar_type",
502 type=str,
503 default=None,
504 help=(
505 "Sets progress bar type to display a progress bar while executing the query."
506 "Defaults to use tqdm_notebook. Install the ``tqdm`` package to use this feature."
507 ),
508)
509def _cell_magic(line, query):
510 """Underlying function for bigquery cell magic
512 Note:
513 This function contains the underlying logic for the 'bigquery' cell
514 magic. This function is not meant to be called directly.
516 Args:
517 line (str): "%%bigquery" followed by arguments as required
518 query (str): SQL query to run
520 Returns:
521 pandas.DataFrame: the query results.
522 """
523 # The built-in parser does not recognize Python structures such as dicts, thus
524 # we extract the "--params" option and inteprpret it separately.
525 try:
526 params_option_value, rest_of_args = _split_args_line(line)
527 except lap.exceptions.QueryParamsParseError as exc:
528 rebranded_error = SyntaxError(
529 "--params is not a correctly formatted JSON string or a JSON "
530 "serializable dictionary"
531 )
532 raise rebranded_error from exc
533 except lap.exceptions.DuplicateQueryParamsError as exc:
534 rebranded_error = ValueError("Duplicate --params option.")
535 raise rebranded_error from exc
536 except lap.exceptions.ParseError as exc:
537 rebranded_error = ValueError(
538 "Unrecognized input, are option values correct? "
539 "Error details: {}".format(exc.args[0])
540 )
541 raise rebranded_error from exc
543 args = magic_arguments.parse_argstring(_cell_magic, rest_of_args)
545 if args.use_bqstorage_api is not None:
546 warnings.warn(
547 "Deprecated option --use_bqstorage_api, the BigQuery "
548 "Storage API is already used by default.",
549 category=DeprecationWarning,
550 )
551 use_bqstorage_api = not args.use_rest_api
553 params = []
554 if params_option_value:
555 # A non-existing params variable is not expanded and ends up in the input
556 # in its raw form, e.g. "$query_params".
557 if params_option_value.startswith("$"):
558 msg = 'Parameter expansion failed, undefined variable "{}".'.format(
559 params_option_value[1:]
560 )
561 raise NameError(msg)
563 params = _helpers.to_query_parameters(ast.literal_eval(params_option_value), {})
565 project = args.project or context.project
567 bigquery_client_options = copy.deepcopy(context.bigquery_client_options)
568 if args.bigquery_api_endpoint:
569 if isinstance(bigquery_client_options, dict):
570 bigquery_client_options["api_endpoint"] = args.bigquery_api_endpoint
571 else:
572 bigquery_client_options.api_endpoint = args.bigquery_api_endpoint
574 client = bigquery.Client(
575 project=project,
576 credentials=context.credentials,
577 default_query_job_config=context.default_query_job_config,
578 client_info=client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
579 client_options=bigquery_client_options,
580 )
581 if context._connection:
582 client._connection = context._connection
584 bqstorage_client_options = copy.deepcopy(context.bqstorage_client_options)
585 if args.bqstorage_api_endpoint:
586 if isinstance(bqstorage_client_options, dict):
587 bqstorage_client_options["api_endpoint"] = args.bqstorage_api_endpoint
588 else:
589 bqstorage_client_options.api_endpoint = args.bqstorage_api_endpoint
591 bqstorage_client = _make_bqstorage_client(
592 client,
593 use_bqstorage_api,
594 bqstorage_client_options,
595 )
597 close_transports = functools.partial(_close_transports, client, bqstorage_client)
599 try:
600 if args.max_results:
601 max_results = int(args.max_results)
602 else:
603 max_results = None
605 query = query.strip()
607 if not query:
608 error = ValueError("Query is missing.")
609 _handle_error(error, args.destination_var)
610 return
612 # Check if query is given as a reference to a variable.
613 if query.startswith("$"):
614 query_var_name = query[1:]
616 if not query_var_name:
617 missing_msg = 'Missing query variable name, empty "$" is not allowed.'
618 raise NameError(missing_msg)
620 if query_var_name.isidentifier():
621 ip = IPython.get_ipython()
622 query = ip.user_ns.get(query_var_name, ip) # ip serves as a sentinel
624 if query is ip:
625 raise NameError(
626 f"Unknown query, variable {query_var_name} does not exist."
627 )
628 else:
629 if not isinstance(query, (str, bytes)):
630 raise TypeError(
631 f"Query variable {query_var_name} must be a string "
632 "or a bytes-like value."
633 )
635 # Any query that does not contain whitespace (aside from leading and trailing whitespace)
636 # is assumed to be a table id
637 if not re.search(r"\s", query):
638 try:
639 rows = client.list_rows(query, max_results=max_results)
640 except Exception as ex:
641 _handle_error(ex, args.destination_var)
642 return
644 result = rows.to_dataframe(
645 bqstorage_client=bqstorage_client,
646 create_bqstorage_client=False,
647 )
648 if args.destination_var:
649 IPython.get_ipython().push({args.destination_var: result})
650 return
651 else:
652 return result
654 job_config = bigquery.job.QueryJobConfig()
655 job_config.query_parameters = params
656 job_config.use_legacy_sql = args.use_legacy_sql
657 job_config.dry_run = args.dry_run
659 # Don't override context job config unless --no_query_cache is explicitly set.
660 if args.no_query_cache:
661 job_config.use_query_cache = False
663 if args.destination_table:
664 split = args.destination_table.split(".")
665 if len(split) != 2:
666 raise ValueError(
667 "--destination_table should be in a <dataset_id>.<table_id> format."
668 )
669 dataset_id, table_id = split
670 job_config.allow_large_results = True
671 dataset_ref = bigquery.dataset.DatasetReference(client.project, dataset_id)
672 destination_table_ref = dataset_ref.table(table_id)
673 job_config.destination = destination_table_ref
674 job_config.create_disposition = "CREATE_IF_NEEDED"
675 job_config.write_disposition = "WRITE_TRUNCATE"
676 _create_dataset_if_necessary(client, dataset_id)
678 if args.maximum_bytes_billed == "None":
679 job_config.maximum_bytes_billed = 0
680 elif args.maximum_bytes_billed is not None:
681 value = int(args.maximum_bytes_billed)
682 job_config.maximum_bytes_billed = value
684 try:
685 query_job = _run_query(client, query, job_config=job_config)
686 except Exception as ex:
687 _handle_error(ex, args.destination_var)
688 return
690 if not args.verbose:
691 display.clear_output()
693 if args.dry_run and args.destination_var:
694 IPython.get_ipython().push({args.destination_var: query_job})
695 return
696 elif args.dry_run:
697 print(
698 "Query validated. This query will process {} bytes.".format(
699 query_job.total_bytes_processed
700 )
701 )
702 return query_job
704 progress_bar = context.progress_bar_type or args.progress_bar_type
706 if max_results:
707 result = query_job.result(max_results=max_results).to_dataframe(
708 bqstorage_client=None,
709 create_bqstorage_client=False,
710 progress_bar_type=progress_bar,
711 )
712 else:
713 result = query_job.to_dataframe(
714 bqstorage_client=bqstorage_client,
715 create_bqstorage_client=False,
716 progress_bar_type=progress_bar,
717 )
719 if args.destination_var:
720 IPython.get_ipython().push({args.destination_var: result})
721 else:
722 return result
723 finally:
724 close_transports()
727def _split_args_line(line):
728 """Split out the --params option value from the input line arguments.
730 Args:
731 line (str): The line arguments passed to the cell magic.
733 Returns:
734 Tuple[str, str]
735 """
736 lexer = lap.Lexer(line)
737 scanner = lap.Parser(lexer)
738 tree = scanner.input_line()
740 extractor = lap.QueryParamsExtractor()
741 params_option_value, rest_of_args = extractor.visit(tree)
743 return params_option_value, rest_of_args
746def _make_bqstorage_client(client, use_bqstorage_api, client_options):
747 if not use_bqstorage_api:
748 return None
750 try:
751 from google.cloud import bigquery_storage # type: ignore # noqa: F401
752 except ImportError as err:
753 customized_error = ImportError(
754 "The default BigQuery Storage API client cannot be used, install "
755 "the missing google-cloud-bigquery-storage and pyarrow packages "
756 "to use it. Alternatively, use the classic REST API by specifying "
757 "the --use_rest_api magic option."
758 )
759 raise customized_error from err
761 try:
762 from google.api_core.gapic_v1 import client_info as gapic_client_info
763 except ImportError as err:
764 customized_error = ImportError(
765 "Install the grpcio package to use the BigQuery Storage API."
766 )
767 raise customized_error from err
769 return client._ensure_bqstorage_client(
770 client_options=client_options,
771 client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
772 )
775def _close_transports(client, bqstorage_client):
776 """Close the given clients' underlying transport channels.
778 Closing the transport is needed to release system resources, namely open
779 sockets.
781 Args:
782 client (:class:`~google.cloud.bigquery.client.Client`):
783 bqstorage_client
784 (Optional[:class:`~google.cloud.bigquery_storage.BigQueryReadClient`]):
785 A client for the BigQuery Storage API.
787 """
788 client.close()
789 if bqstorage_client is not None:
790 bqstorage_client._transport.grpc_channel.close()