1# Copyright 2018 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""IPython Magics
16
17Install ``bigquery-magics`` and call ``%load_ext bigquery_magics`` to use the
18``%%bigquery`` cell magic.
19
20See the `BigQuery Magics reference documentation
21<https://googleapis.dev/python/bigquery-magics/latest/>`_.
22"""
23
24from __future__ import print_function
25
26import re
27import ast
28import copy
29import functools
30import sys
31import time
32import warnings
33from concurrent import futures
34
35try:
36 import IPython # type: ignore
37 from IPython import display # type: ignore
38 from IPython.core import magic_arguments # type: ignore
39except ImportError:
40 raise ImportError("This module can only be loaded in IPython.")
41
42from google.api_core import client_info
43from google.api_core import client_options
44from google.api_core.exceptions import NotFound
45import google.auth # type: ignore
46from google.cloud import bigquery
47import google.cloud.bigquery.dataset
48from google.cloud.bigquery import _versions_helpers
49from google.cloud.bigquery import exceptions
50from google.cloud.bigquery.dbapi import _helpers
51from google.cloud.bigquery.magics import line_arg_parser as lap
52
53try:
54 import bigquery_magics # type: ignore
55except ImportError:
56 bigquery_magics = None
57
58IPYTHON_USER_AGENT = "ipython-{}".format(IPython.__version__) # type: ignore
59
60
61class Context(object):
62 """Storage for objects to be used throughout an IPython notebook session.
63
64 A Context object is initialized when the ``magics`` module is imported,
65 and can be found at ``google.cloud.bigquery.magics.context``.
66 """
67
68 def __init__(self):
69 self._credentials = None
70 self._project = None
71 self._connection = None
72 self._default_query_job_config = bigquery.QueryJobConfig()
73 self._bigquery_client_options = client_options.ClientOptions()
74 self._bqstorage_client_options = client_options.ClientOptions()
75 self._progress_bar_type = "tqdm_notebook"
76
77 @property
78 def credentials(self):
79 """google.auth.credentials.Credentials: Credentials to use for queries
80 performed through IPython magics.
81
82 Note:
83 These credentials do not need to be explicitly defined if you are
84 using Application Default Credentials. If you are not using
85 Application Default Credentials, manually construct a
86 :class:`google.auth.credentials.Credentials` object and set it as
87 the context credentials as demonstrated in the example below. See
88 `auth docs`_ for more information on obtaining credentials.
89
90 Example:
91 Manually setting the context credentials:
92
93 >>> from google.cloud.bigquery import magics
94 >>> from google.oauth2 import service_account
95 >>> credentials = (service_account
96 ... .Credentials.from_service_account_file(
97 ... '/path/to/key.json'))
98 >>> magics.context.credentials = credentials
99
100
101 .. _auth docs: http://google-auth.readthedocs.io
102 /en/latest/user-guide.html#obtaining-credentials
103 """
104 if self._credentials is None:
105 self._credentials, _ = google.auth.default()
106 return self._credentials
107
108 @credentials.setter
109 def credentials(self, value):
110 self._credentials = value
111
112 @property
113 def project(self):
114 """str: Default project to use for queries performed through IPython
115 magics.
116
117 Note:
118 The project does not need to be explicitly defined if you have an
119 environment default project set. If you do not have a default
120 project set in your environment, manually assign the project as
121 demonstrated in the example below.
122
123 Example:
124 Manually setting the context project:
125
126 >>> from google.cloud.bigquery import magics
127 >>> magics.context.project = 'my-project'
128 """
129 if self._project is None:
130 _, self._project = google.auth.default()
131 return self._project
132
133 @project.setter
134 def project(self, value):
135 self._project = value
136
137 @property
138 def bigquery_client_options(self):
139 """google.api_core.client_options.ClientOptions: client options to be
140 used through IPython magics.
141
142 Note::
143 The client options do not need to be explicitly defined if no
144 special network connections are required. Normally you would be
145 using the https://bigquery.googleapis.com/ end point.
146
147 Example:
148 Manually setting the endpoint:
149
150 >>> from google.cloud.bigquery import magics
151 >>> client_options = {}
152 >>> client_options['api_endpoint'] = "https://some.special.url"
153 >>> magics.context.bigquery_client_options = client_options
154 """
155 return self._bigquery_client_options
156
157 @bigquery_client_options.setter
158 def bigquery_client_options(self, value):
159 self._bigquery_client_options = value
160
161 @property
162 def bqstorage_client_options(self):
163 """google.api_core.client_options.ClientOptions: client options to be
164 used through IPython magics for the storage client.
165
166 Note::
167 The client options do not need to be explicitly defined if no
168 special network connections are required. Normally you would be
169 using the https://bigquerystorage.googleapis.com/ end point.
170
171 Example:
172 Manually setting the endpoint:
173
174 >>> from google.cloud.bigquery import magics
175 >>> client_options = {}
176 >>> client_options['api_endpoint'] = "https://some.special.url"
177 >>> magics.context.bqstorage_client_options = client_options
178 """
179 return self._bqstorage_client_options
180
181 @bqstorage_client_options.setter
182 def bqstorage_client_options(self, value):
183 self._bqstorage_client_options = value
184
185 @property
186 def default_query_job_config(self):
187 """google.cloud.bigquery.job.QueryJobConfig: Default job
188 configuration for queries.
189
190 The context's :class:`~google.cloud.bigquery.job.QueryJobConfig` is
191 used for queries. Some properties can be overridden with arguments to
192 the magics.
193
194 Example:
195 Manually setting the default value for ``maximum_bytes_billed``
196 to 100 MB:
197
198 >>> from google.cloud.bigquery import magics
199 >>> magics.context.default_query_job_config.maximum_bytes_billed = 100000000
200 """
201 return self._default_query_job_config
202
203 @default_query_job_config.setter
204 def default_query_job_config(self, value):
205 self._default_query_job_config = value
206
207 @property
208 def progress_bar_type(self):
209 """str: Default progress bar type to use to display progress bar while
210 executing queries through IPython magics.
211
212 Note::
213 Install the ``tqdm`` package to use this feature.
214
215 Example:
216 Manually setting the progress_bar_type:
217
218 >>> from google.cloud.bigquery import magics
219 >>> magics.context.progress_bar_type = "tqdm_notebook"
220 """
221 return self._progress_bar_type
222
223 @progress_bar_type.setter
224 def progress_bar_type(self, value):
225 self._progress_bar_type = value
226
227
228# If bigquery_magics is available, we load that extension rather than this one.
229# Ensure google.cloud.bigquery.magics.context setters are on the correct magics
230# implementation in case the user has installed the package but hasn't updated
231# their code.
232if bigquery_magics is not None:
233 context = bigquery_magics.context
234else:
235 context = Context()
236
237
238def _handle_error(error, destination_var=None):
239 """Process a query execution error.
240
241 Args:
242 error (Exception):
243 An exception that occurred during the query execution.
244 destination_var (Optional[str]):
245 The name of the IPython session variable to store the query job.
246 """
247 if destination_var:
248 query_job = getattr(error, "query_job", None)
249
250 if query_job is not None:
251 IPython.get_ipython().push({destination_var: query_job})
252 else:
253 # this is the case when previewing table rows by providing just
254 # table ID to cell magic
255 print(
256 "Could not save output to variable '{}'.".format(destination_var),
257 file=sys.stderr,
258 )
259
260 print("\nERROR:\n", str(error), file=sys.stderr)
261
262
263def _run_query(client, query, job_config=None):
264 """Runs a query while printing status updates
265
266 Args:
267 client (google.cloud.bigquery.client.Client):
268 Client to bundle configuration needed for API requests.
269 query (str):
270 SQL query to be executed. Defaults to the standard SQL dialect.
271 Use the ``job_config`` parameter to change dialects.
272 job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
273 Extra configuration options for the job.
274
275 Returns:
276 google.cloud.bigquery.job.QueryJob: the query job created
277
278 Example:
279 >>> client = bigquery.Client()
280 >>> _run_query(client, "SELECT 17")
281 Executing query with job ID: bf633912-af2c-4780-b568-5d868058632b
282 Query executing: 1.66s
283 Query complete after 2.07s
284 'bf633912-af2c-4780-b568-5d868058632b'
285 """
286 start_time = time.perf_counter()
287 query_job = client.query(query, job_config=job_config)
288
289 if job_config and job_config.dry_run:
290 return query_job
291
292 print(f"Executing query with job ID: {query_job.job_id}")
293
294 while True:
295 print(
296 f"\rQuery executing: {time.perf_counter() - start_time:.2f}s".format(),
297 end="",
298 )
299 try:
300 query_job.result(timeout=0.5)
301 break
302 except futures.TimeoutError:
303 continue
304 print(f"\nJob ID {query_job.job_id} successfully executed")
305 return query_job
306
307
308def _create_dataset_if_necessary(client, dataset_id):
309 """Create a dataset in the current project if it doesn't exist.
310
311 Args:
312 client (google.cloud.bigquery.client.Client):
313 Client to bundle configuration needed for API requests.
314 dataset_id (str):
315 Dataset id.
316 """
317 dataset_reference = bigquery.dataset.DatasetReference(client.project, dataset_id)
318 try:
319 dataset = client.get_dataset(dataset_reference)
320 return
321 except NotFound:
322 pass
323 dataset = bigquery.Dataset(dataset_reference)
324 dataset.location = client.location
325 print(f"Creating dataset: {dataset_id}")
326 dataset = client.create_dataset(dataset)
327
328
329@magic_arguments.magic_arguments()
330@magic_arguments.argument(
331 "destination_var",
332 nargs="?",
333 help=("If provided, save the output to this variable instead of displaying it."),
334)
335@magic_arguments.argument(
336 "--destination_table",
337 type=str,
338 default=None,
339 help=(
340 "If provided, save the output of the query to a new BigQuery table. "
341 "Variable should be in a format <dataset_id>.<table_id>. "
342 "If table does not exists, it will be created. "
343 "If table already exists, its data will be overwritten."
344 ),
345)
346@magic_arguments.argument(
347 "--project",
348 type=str,
349 default=None,
350 help=("Project to use for executing this query. Defaults to the context project."),
351)
352@magic_arguments.argument(
353 "--max_results",
354 default=None,
355 help=(
356 "Maximum number of rows in dataframe returned from executing the query."
357 "Defaults to returning all rows."
358 ),
359)
360@magic_arguments.argument(
361 "--maximum_bytes_billed",
362 default=None,
363 help=(
364 "maximum_bytes_billed to use for executing this query. Defaults to "
365 "the context default_query_job_config.maximum_bytes_billed."
366 ),
367)
368@magic_arguments.argument(
369 "--dry_run",
370 action="store_true",
371 default=False,
372 help=(
373 "Sets query to be a dry run to estimate costs. "
374 "Defaults to executing the query instead of dry run if this argument is not used."
375 ),
376)
377@magic_arguments.argument(
378 "--use_legacy_sql",
379 action="store_true",
380 default=False,
381 help=(
382 "Sets query to use Legacy SQL instead of Standard SQL. Defaults to "
383 "Standard SQL if this argument is not used."
384 ),
385)
386@magic_arguments.argument(
387 "--bigquery_api_endpoint",
388 type=str,
389 default=None,
390 help=(
391 "The desired API endpoint, e.g., bigquery.googlepis.com. Defaults to this "
392 "option's value in the context bigquery_client_options."
393 ),
394)
395@magic_arguments.argument(
396 "--bqstorage_api_endpoint",
397 type=str,
398 default=None,
399 help=(
400 "The desired API endpoint, e.g., bigquerystorage.googlepis.com. Defaults to "
401 "this option's value in the context bqstorage_client_options."
402 ),
403)
404@magic_arguments.argument(
405 "--no_query_cache",
406 action="store_true",
407 default=False,
408 help=("Do not use cached query results."),
409)
410@magic_arguments.argument(
411 "--use_bqstorage_api",
412 action="store_true",
413 default=None,
414 help=(
415 "[Deprecated] The BigQuery Storage API is already used by default to "
416 "download large query results, and this option has no effect. "
417 "If you want to switch to the classic REST API instead, use the "
418 "--use_rest_api option."
419 ),
420)
421@magic_arguments.argument(
422 "--use_rest_api",
423 action="store_true",
424 default=False,
425 help=(
426 "Use the classic REST API instead of the BigQuery Storage API to "
427 "download query results."
428 ),
429)
430@magic_arguments.argument(
431 "--verbose",
432 action="store_true",
433 default=False,
434 help=(
435 "If set, print verbose output, including the query job ID and the "
436 "amount of time for the query to finish. By default, this "
437 "information will be displayed as the query runs, but will be "
438 "cleared after the query is finished."
439 ),
440)
441@magic_arguments.argument(
442 "--params",
443 nargs="+",
444 default=None,
445 help=(
446 "Parameters to format the query string. If present, the --params "
447 "flag should be followed by a string representation of a dictionary "
448 "in the format {'param_name': 'param_value'} (ex. {\"num\": 17}), "
449 "or a reference to a dictionary in the same format. The dictionary "
450 "reference can be made by including a '$' before the variable "
451 "name (ex. $my_dict_var)."
452 ),
453)
454@magic_arguments.argument(
455 "--progress_bar_type",
456 type=str,
457 default=None,
458 help=(
459 "Sets progress bar type to display a progress bar while executing the query."
460 "Defaults to use tqdm_notebook. Install the ``tqdm`` package to use this feature."
461 ),
462)
463@magic_arguments.argument(
464 "--location",
465 type=str,
466 default=None,
467 help=(
468 "Set the location to execute query."
469 "Defaults to location set in query setting in console."
470 ),
471)
472def _cell_magic(line, query):
473 """Underlying function for bigquery cell magic
474
475 Note:
476 This function contains the underlying logic for the 'bigquery' cell
477 magic. This function is not meant to be called directly.
478
479 Args:
480 line (str): "%%bigquery" followed by arguments as required
481 query (str): SQL query to run
482
483 Returns:
484 pandas.DataFrame: the query results.
485 """
486 # The built-in parser does not recognize Python structures such as dicts, thus
487 # we extract the "--params" option and inteprpret it separately.
488 try:
489 params_option_value, rest_of_args = _split_args_line(line)
490 except lap.exceptions.QueryParamsParseError as exc:
491 rebranded_error = SyntaxError(
492 "--params is not a correctly formatted JSON string or a JSON "
493 "serializable dictionary"
494 )
495 raise rebranded_error from exc
496 except lap.exceptions.DuplicateQueryParamsError as exc:
497 rebranded_error = ValueError("Duplicate --params option.")
498 raise rebranded_error from exc
499 except lap.exceptions.ParseError as exc:
500 rebranded_error = ValueError(
501 "Unrecognized input, are option values correct? "
502 "Error details: {}".format(exc.args[0])
503 )
504 raise rebranded_error from exc
505
506 args = magic_arguments.parse_argstring(_cell_magic, rest_of_args)
507
508 if args.use_bqstorage_api is not None:
509 warnings.warn(
510 "Deprecated option --use_bqstorage_api, the BigQuery "
511 "Storage API is already used by default.",
512 category=DeprecationWarning,
513 )
514 use_bqstorage_api = not args.use_rest_api
515 location = args.location
516
517 params = []
518 if params_option_value:
519 # A non-existing params variable is not expanded and ends up in the input
520 # in its raw form, e.g. "$query_params".
521 if params_option_value.startswith("$"):
522 msg = 'Parameter expansion failed, undefined variable "{}".'.format(
523 params_option_value[1:]
524 )
525 raise NameError(msg)
526
527 params = _helpers.to_query_parameters(ast.literal_eval(params_option_value), {})
528
529 project = args.project or context.project
530
531 bigquery_client_options = copy.deepcopy(context.bigquery_client_options)
532 if args.bigquery_api_endpoint:
533 if isinstance(bigquery_client_options, dict):
534 bigquery_client_options["api_endpoint"] = args.bigquery_api_endpoint
535 else:
536 bigquery_client_options.api_endpoint = args.bigquery_api_endpoint
537
538 client = bigquery.Client(
539 project=project,
540 credentials=context.credentials,
541 default_query_job_config=context.default_query_job_config,
542 client_info=client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
543 client_options=bigquery_client_options,
544 location=location,
545 )
546 if context._connection:
547 client._connection = context._connection
548
549 bqstorage_client_options = copy.deepcopy(context.bqstorage_client_options)
550 if args.bqstorage_api_endpoint:
551 if isinstance(bqstorage_client_options, dict):
552 bqstorage_client_options["api_endpoint"] = args.bqstorage_api_endpoint
553 else:
554 bqstorage_client_options.api_endpoint = args.bqstorage_api_endpoint
555
556 bqstorage_client = _make_bqstorage_client(
557 client,
558 use_bqstorage_api,
559 bqstorage_client_options,
560 )
561
562 close_transports = functools.partial(_close_transports, client, bqstorage_client)
563
564 try:
565 if args.max_results:
566 max_results = int(args.max_results)
567 else:
568 max_results = None
569
570 query = query.strip()
571
572 if not query:
573 error = ValueError("Query is missing.")
574 _handle_error(error, args.destination_var)
575 return
576
577 # Check if query is given as a reference to a variable.
578 if query.startswith("$"):
579 query_var_name = query[1:]
580
581 if not query_var_name:
582 missing_msg = 'Missing query variable name, empty "$" is not allowed.'
583 raise NameError(missing_msg)
584
585 if query_var_name.isidentifier():
586 ip = IPython.get_ipython()
587 query = ip.user_ns.get(query_var_name, ip) # ip serves as a sentinel
588
589 if query is ip:
590 raise NameError(
591 f"Unknown query, variable {query_var_name} does not exist."
592 )
593 else:
594 if not isinstance(query, (str, bytes)):
595 raise TypeError(
596 f"Query variable {query_var_name} must be a string "
597 "or a bytes-like value."
598 )
599
600 # Any query that does not contain whitespace (aside from leading and trailing whitespace)
601 # is assumed to be a table id
602 if not re.search(r"\s", query):
603 try:
604 rows = client.list_rows(query, max_results=max_results)
605 except Exception as ex:
606 _handle_error(ex, args.destination_var)
607 return
608
609 result = rows.to_dataframe(
610 bqstorage_client=bqstorage_client,
611 create_bqstorage_client=False,
612 )
613 if args.destination_var:
614 IPython.get_ipython().push({args.destination_var: result})
615 return
616 else:
617 return result
618
619 job_config = bigquery.job.QueryJobConfig()
620 job_config.query_parameters = params
621 job_config.use_legacy_sql = args.use_legacy_sql
622 job_config.dry_run = args.dry_run
623
624 # Don't override context job config unless --no_query_cache is explicitly set.
625 if args.no_query_cache:
626 job_config.use_query_cache = False
627
628 if args.destination_table:
629 split = args.destination_table.split(".")
630 if len(split) != 2:
631 raise ValueError(
632 "--destination_table should be in a <dataset_id>.<table_id> format."
633 )
634 dataset_id, table_id = split
635 job_config.allow_large_results = True
636 dataset_ref = bigquery.dataset.DatasetReference(client.project, dataset_id)
637 destination_table_ref = dataset_ref.table(table_id)
638 job_config.destination = destination_table_ref
639 job_config.create_disposition = "CREATE_IF_NEEDED"
640 job_config.write_disposition = "WRITE_TRUNCATE"
641 _create_dataset_if_necessary(client, dataset_id)
642
643 if args.maximum_bytes_billed == "None":
644 job_config.maximum_bytes_billed = 0
645 elif args.maximum_bytes_billed is not None:
646 value = int(args.maximum_bytes_billed)
647 job_config.maximum_bytes_billed = value
648
649 try:
650 query_job = _run_query(client, query, job_config=job_config)
651 except Exception as ex:
652 _handle_error(ex, args.destination_var)
653 return
654
655 if not args.verbose:
656 display.clear_output()
657
658 if args.dry_run and args.destination_var:
659 IPython.get_ipython().push({args.destination_var: query_job})
660 return
661 elif args.dry_run:
662 print(
663 "Query validated. This query will process {} bytes.".format(
664 query_job.total_bytes_processed
665 )
666 )
667 return query_job
668
669 progress_bar = context.progress_bar_type or args.progress_bar_type
670
671 if max_results:
672 result = query_job.result(max_results=max_results).to_dataframe(
673 bqstorage_client=None,
674 create_bqstorage_client=False,
675 progress_bar_type=progress_bar,
676 )
677 else:
678 result = query_job.to_dataframe(
679 bqstorage_client=bqstorage_client,
680 create_bqstorage_client=False,
681 progress_bar_type=progress_bar,
682 )
683
684 if args.destination_var:
685 IPython.get_ipython().push({args.destination_var: result})
686 else:
687 return result
688 finally:
689 close_transports()
690
691
692def _split_args_line(line):
693 """Split out the --params option value from the input line arguments.
694
695 Args:
696 line (str): The line arguments passed to the cell magic.
697
698 Returns:
699 Tuple[str, str]
700 """
701 lexer = lap.Lexer(line)
702 scanner = lap.Parser(lexer)
703 tree = scanner.input_line()
704
705 extractor = lap.QueryParamsExtractor()
706 params_option_value, rest_of_args = extractor.visit(tree)
707
708 return params_option_value, rest_of_args
709
710
711def _make_bqstorage_client(client, use_bqstorage_api, client_options):
712 """Creates a BigQuery Storage client.
713
714 Args:
715 client (:class:`~google.cloud.bigquery.client.Client`): BigQuery client.
716 use_bqstorage_api (bool): whether BigQuery Storage API is used or not.
717 client_options (:class:`google.api_core.client_options.ClientOptions`):
718 Custom options used with a new BigQuery Storage client instance
719 if one is created.
720
721 Raises:
722 ImportError: if google-cloud-bigquery-storage is not installed, or
723 grpcio package is not installed.
724
725
726 Returns:
727 None: if ``use_bqstorage_api == False``, or google-cloud-bigquery-storage
728 is outdated.
729 BigQuery Storage Client:
730 """
731 if not use_bqstorage_api:
732 return None
733
734 try:
735 _versions_helpers.BQ_STORAGE_VERSIONS.try_import(raise_if_error=True)
736 except exceptions.BigQueryStorageNotFoundError as err:
737 customized_error = ImportError(
738 "The default BigQuery Storage API client cannot be used, install "
739 "the missing google-cloud-bigquery-storage and pyarrow packages "
740 "to use it. Alternatively, use the classic REST API by specifying "
741 "the --use_rest_api magic option."
742 )
743 raise customized_error from err
744 except exceptions.LegacyBigQueryStorageError:
745 pass
746
747 try:
748 from google.api_core.gapic_v1 import client_info as gapic_client_info
749 except ImportError as err:
750 customized_error = ImportError(
751 "Install the grpcio package to use the BigQuery Storage API."
752 )
753 raise customized_error from err
754
755 return client._ensure_bqstorage_client(
756 client_options=client_options,
757 client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
758 )
759
760
761def _close_transports(client, bqstorage_client):
762 """Close the given clients' underlying transport channels.
763
764 Closing the transport is needed to release system resources, namely open
765 sockets.
766
767 Args:
768 client (:class:`~google.cloud.bigquery.client.Client`):
769 bqstorage_client
770 (Optional[:class:`~google.cloud.bigquery_storage.BigQueryReadClient`]):
771 A client for the BigQuery Storage API.
772
773 """
774 client.close()
775 if bqstorage_client is not None:
776 bqstorage_client._transport.grpc_channel.close()