Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/_job_helpers.py: 14%

92 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Helpers for interacting with the job REST APIs from the client.""" 

16 

17import copy 

18import uuid 

19from typing import Any, Dict, TYPE_CHECKING, Optional 

20 

21import google.api_core.exceptions as core_exceptions 

22from google.api_core import retry as retries 

23 

24from google.cloud.bigquery import job 

25 

26# Avoid circular imports 

27if TYPE_CHECKING: # pragma: NO COVER 

28 from google.cloud.bigquery.client import Client 

29 

30 

31# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to 

32# happen before the client-side timeout. This is not strictly neccessary, as the 

33# client retries client-side timeouts, but the hope by making the server-side 

34# timeout slightly shorter is that it can save the server from some unncessary 

35# processing time. 

36# 

37# 250 milliseconds is chosen arbitrarily, though should be about the right 

38# order of magnitude for network latency and switching delays. It is about the 

39# amount of time for light to circumnavigate the world twice. 

40_TIMEOUT_BUFFER_MILLIS = 250 

41 

42 

43def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str: 

44 """Construct an ID for a new job. 

45 

46 Args: 

47 job_id: the user-provided job ID. 

48 prefix: the user-provided prefix for a job ID. 

49 

50 Returns: 

51 str: A job ID 

52 """ 

53 if job_id is not None: 

54 return job_id 

55 elif prefix is not None: 

56 return str(prefix) + str(uuid.uuid4()) 

57 else: 

58 return str(uuid.uuid4()) 

59 

60 

61def query_jobs_insert( 

62 client: "Client", 

63 query: str, 

64 job_config: Optional[job.QueryJobConfig], 

65 job_id: Optional[str], 

66 job_id_prefix: Optional[str], 

67 location: str, 

68 project: str, 

69 retry: retries.Retry, 

70 timeout: Optional[float], 

71 job_retry: retries.Retry, 

72) -> job.QueryJob: 

73 """Initiate a query using jobs.insert. 

74 

75 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert 

76 """ 

77 job_id_given = job_id is not None 

78 job_id_save = job_id 

79 job_config_save = job_config 

80 

81 def do_query(): 

82 # Make a copy now, so that original doesn't get changed by the process 

83 # below and to facilitate retry 

84 job_config = copy.deepcopy(job_config_save) 

85 

86 job_id = make_job_id(job_id_save, job_id_prefix) 

87 job_ref = job._JobReference(job_id, project=project, location=location) 

88 query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config) 

89 

90 try: 

91 query_job._begin(retry=retry, timeout=timeout) 

92 except core_exceptions.Conflict as create_exc: 

93 # The thought is if someone is providing their own job IDs and they get 

94 # their job ID generation wrong, this could end up returning results for 

95 # the wrong query. We thus only try to recover if job ID was not given. 

96 if job_id_given: 

97 raise create_exc 

98 

99 try: 

100 query_job = client.get_job( 

101 job_id, 

102 project=project, 

103 location=location, 

104 retry=retry, 

105 timeout=timeout, 

106 ) 

107 except core_exceptions.GoogleAPIError: # (includes RetryError) 

108 raise create_exc 

109 else: 

110 return query_job 

111 else: 

112 return query_job 

113 

114 future = do_query() 

115 # The future might be in a failed state now, but if it's 

116 # unrecoverable, we'll find out when we ask for it's result, at which 

117 # point, we may retry. 

118 if not job_id_given: 

119 future._retry_do_query = do_query # in case we have to retry later 

120 future._job_retry = job_retry 

121 

122 return future 

123 

124 

125def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]: 

126 """Transform from Job resource to QueryRequest resource. 

127 

128 Most of the keys in job.configuration.query are in common with 

129 QueryRequest. If any configuration property is set that is not available in 

130 jobs.query, it will result in a server-side error. 

131 """ 

132 request_body = {} 

133 job_config_resource = job_config.to_api_repr() if job_config else {} 

134 query_config_resource = job_config_resource.get("query", {}) 

135 

136 request_body.update(query_config_resource) 

137 

138 # These keys are top level in job resource and query resource. 

139 if "labels" in job_config_resource: 

140 request_body["labels"] = job_config_resource["labels"] 

141 if "dryRun" in job_config_resource: 

142 request_body["dryRun"] = job_config_resource["dryRun"] 

143 

144 # Default to standard SQL. 

145 request_body.setdefault("useLegacySql", False) 

146 

147 # Since jobs.query can return results, ensure we use the lossless timestamp 

148 # format. See: https://github.com/googleapis/python-bigquery/issues/395 

149 request_body.setdefault("formatOptions", {}) 

150 request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore 

151 

152 return request_body 

153 

154 

155def _to_query_job( 

156 client: "Client", 

157 query: str, 

158 request_config: Optional[job.QueryJobConfig], 

159 query_response: Dict[str, Any], 

160) -> job.QueryJob: 

161 job_ref_resource = query_response["jobReference"] 

162 job_ref = job._JobReference._from_api_repr(job_ref_resource) 

163 query_job = job.QueryJob(job_ref, query, client=client) 

164 query_job._properties.setdefault("configuration", {}) 

165 

166 # Not all relevant properties are in the jobs.query response. Populate some 

167 # expected properties based on the job configuration. 

168 if request_config is not None: 

169 query_job._properties["configuration"].update(request_config.to_api_repr()) 

170 

171 query_job._properties["configuration"].setdefault("query", {}) 

172 query_job._properties["configuration"]["query"]["query"] = query 

173 query_job._properties["configuration"]["query"].setdefault("useLegacySql", False) 

174 

175 query_job._properties.setdefault("statistics", {}) 

176 query_job._properties["statistics"].setdefault("query", {}) 

177 query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get( 

178 "cacheHit" 

179 ) 

180 query_job._properties["statistics"]["query"]["schema"] = query_response.get( 

181 "schema" 

182 ) 

183 query_job._properties["statistics"]["query"][ 

184 "totalBytesProcessed" 

185 ] = query_response.get("totalBytesProcessed") 

186 

187 # Set errors if any were encountered. 

188 query_job._properties.setdefault("status", {}) 

189 if "errors" in query_response: 

190 # Set errors but not errorResult. If there was an error that failed 

191 # the job, jobs.query behaves like jobs.getQueryResults and returns a 

192 # non-success HTTP status code. 

193 errors = query_response["errors"] 

194 query_job._properties["status"]["errors"] = errors 

195 

196 # Transform job state so that QueryJob doesn't try to restart the query. 

197 job_complete = query_response.get("jobComplete") 

198 if job_complete: 

199 query_job._properties["status"]["state"] = "DONE" 

200 # TODO: https://github.com/googleapis/python-bigquery/issues/589 

201 # Set the first page of results if job is "complete" and there is 

202 # only 1 page of results. Otherwise, use the existing logic that 

203 # refreshes the job stats. 

204 # 

205 # This also requires updates to `to_dataframe` and the DB API connector 

206 # so that they don't try to read from a destination table if all the 

207 # results are present. 

208 else: 

209 query_job._properties["status"]["state"] = "PENDING" 

210 

211 return query_job 

212 

213 

214def query_jobs_query( 

215 client: "Client", 

216 query: str, 

217 job_config: Optional[job.QueryJobConfig], 

218 location: str, 

219 project: str, 

220 retry: retries.Retry, 

221 timeout: Optional[float], 

222 job_retry: retries.Retry, 

223) -> job.QueryJob: 

224 """Initiate a query using jobs.query. 

225 

226 See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query 

227 """ 

228 path = f"/projects/{project}/queries" 

229 request_body = _to_query_request(job_config) 

230 

231 if timeout is not None: 

232 # Subtract a buffer for context switching, network latency, etc. 

233 request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) 

234 request_body["location"] = location 

235 request_body["query"] = query 

236 

237 def do_query(): 

238 request_body["requestId"] = make_job_id() 

239 span_attributes = {"path": path} 

240 api_response = client._call_api( 

241 retry, 

242 span_name="BigQuery.query", 

243 span_attributes=span_attributes, 

244 method="POST", 

245 path=path, 

246 data=request_body, 

247 timeout=timeout, 

248 ) 

249 return _to_query_job(client, query, job_config, api_response) 

250 

251 future = do_query() 

252 

253 # The future might be in a failed state now, but if it's 

254 # unrecoverable, we'll find out when we ask for it's result, at which 

255 # point, we may retry. 

256 future._retry_do_query = do_query # in case we have to retry later 

257 future._job_retry = job_retry 

258 

259 return future