Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/_tqdm_helpers.py: 24%

55 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1# Copyright 2019 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Shared helper functions for tqdm progress bar.""" 

16 

17import concurrent.futures 

18import sys 

19import time 

20import typing 

21from typing import Optional 

22import warnings 

23 

24try: 

25 import tqdm # type: ignore 

26 import tqdm.notebook as notebook # type: ignore 

27 

28except ImportError: # pragma: NO COVER 

29 tqdm = None 

30 

31if typing.TYPE_CHECKING: # pragma: NO COVER 

32 from google.cloud.bigquery import QueryJob 

33 from google.cloud.bigquery.table import RowIterator 

34 

35_NO_TQDM_ERROR = ( 

36 "A progress bar was requested, but there was an error loading the tqdm " 

37 "library. Please install tqdm to use the progress bar functionality." 

38) 

39 

40_PROGRESS_BAR_UPDATE_INTERVAL = 0.5 

41 

42 

43def get_progress_bar(progress_bar_type, description, total, unit): 

44 """Construct a tqdm progress bar object, if tqdm is installed.""" 

45 if tqdm is None: 

46 if progress_bar_type is not None: 

47 warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) 

48 return None 

49 

50 try: 

51 if progress_bar_type == "tqdm": 

52 return tqdm.tqdm( 

53 bar_format="{l_bar}{bar}|", 

54 colour="green", 

55 desc=description, 

56 file=sys.stdout, 

57 total=total, 

58 unit=unit, 

59 ) 

60 elif progress_bar_type == "tqdm_notebook": 

61 return notebook.tqdm( 

62 bar_format="{l_bar}{bar}|", 

63 desc=description, 

64 file=sys.stdout, 

65 total=total, 

66 unit=unit, 

67 ) 

68 elif progress_bar_type == "tqdm_gui": 

69 return tqdm.tqdm_gui(desc=description, total=total, unit=unit) 

70 except (KeyError, TypeError): 

71 # Protect ourselves from any tqdm errors. In case of 

72 # unexpected tqdm behavior, just fall back to showing 

73 # no progress bar. 

74 warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) 

75 return None 

76 

77 

78def wait_for_query( 

79 query_job: "QueryJob", 

80 progress_bar_type: Optional[str] = None, 

81 max_results: Optional[int] = None, 

82) -> "RowIterator": 

83 """Return query result and display a progress bar while the query running, if tqdm is installed. 

84 

85 Args: 

86 query_job: 

87 The job representing the execution of the query on the server. 

88 progress_bar_type: 

89 The type of progress bar to use to show query progress. 

90 max_results: 

91 The maximum number of rows the row iterator should return. 

92 

93 Returns: 

94 A row iterator over the query results. 

95 """ 

96 default_total = 1 

97 current_stage = None 

98 start_time = time.perf_counter() 

99 

100 progress_bar = get_progress_bar( 

101 progress_bar_type, "Query is running", default_total, "query" 

102 ) 

103 if progress_bar is None: 

104 return query_job.result(max_results=max_results) 

105 

106 i = 0 

107 while True: 

108 if query_job.query_plan: 

109 default_total = len(query_job.query_plan) 

110 current_stage = query_job.query_plan[i] 

111 progress_bar.total = len(query_job.query_plan) 

112 progress_bar.set_description( 

113 f"Query executing stage {current_stage.name} and status {current_stage.status} : {time.perf_counter() - start_time:.2f}s" 

114 ) 

115 try: 

116 query_result = query_job.result( 

117 timeout=_PROGRESS_BAR_UPDATE_INTERVAL, max_results=max_results 

118 ) 

119 progress_bar.update(default_total) 

120 progress_bar.set_description( 

121 f"Job ID {query_job.job_id} successfully executed", 

122 ) 

123 break 

124 except concurrent.futures.TimeoutError: 

125 query_job.reload() # Refreshes the state via a GET request. 

126 if current_stage: 

127 if current_stage.status == "COMPLETE": 

128 if i < default_total - 1: 

129 progress_bar.update(i + 1) 

130 i += 1 

131 continue 

132 

133 progress_bar.close() 

134 return query_result