Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery/dbapi/cursor.py: 21%

176 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Cursor for the Google BigQuery DB-API.""" 

16 

17import collections 

18from collections import abc as collections_abc 

19import copy 

20import logging 

21import re 

22 

23try: 

24 from google.cloud.bigquery_storage import ArrowSerializationOptions 

25except ImportError: 

26 _ARROW_COMPRESSION_SUPPORT = False 

27else: 

28 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. 

29 _ARROW_COMPRESSION_SUPPORT = True 

30 

31from google.cloud.bigquery import job 

32from google.cloud.bigquery.dbapi import _helpers 

33from google.cloud.bigquery.dbapi import exceptions 

34import google.cloud.exceptions # type: ignore 

35 

36 

37_LOGGER = logging.getLogger(__name__) 

38 

39# Per PEP 249: A 7-item sequence containing information describing one result 

40# column. The first two items (name and type_code) are mandatory, the other 

41# five are optional and are set to None if no meaningful values can be 

42# provided. 

43Column = collections.namedtuple( 

44 "Column", 

45 [ 

46 "name", 

47 "type_code", 

48 "display_size", 

49 "internal_size", 

50 "precision", 

51 "scale", 

52 "null_ok", 

53 ], 

54) 

55 

56 

57@_helpers.raise_on_closed("Operating on a closed cursor.") 

58class Cursor(object): 

59 """DB-API Cursor to Google BigQuery. 

60 

61 Args: 

62 connection (google.cloud.bigquery.dbapi.Connection): 

63 A DB-API connection to Google BigQuery. 

64 """ 

65 

66 def __init__(self, connection): 

67 self.connection = connection 

68 self.description = None 

69 # Per PEP 249: The attribute is -1 in case no .execute*() has been 

70 # performed on the cursor or the rowcount of the last operation 

71 # cannot be determined by the interface. 

72 self.rowcount = -1 

73 # Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch 

74 # a single row at a time. However, we deviate from that, and set the 

75 # default to None, allowing the backend to automatically determine the 

76 # most appropriate size. 

77 self.arraysize = None 

78 self._query_data = None 

79 self._query_job = None 

80 self._closed = False 

81 

82 def close(self): 

83 """Mark the cursor as closed, preventing its further use.""" 

84 self._closed = True 

85 

86 def _set_description(self, schema): 

87 """Set description from schema. 

88 

89 Args: 

90 schema (Sequence[google.cloud.bigquery.schema.SchemaField]): 

91 A description of fields in the schema. 

92 """ 

93 if schema is None: 

94 self.description = None 

95 return 

96 

97 self.description = tuple( 

98 Column( 

99 name=field.name, 

100 type_code=field.field_type, 

101 display_size=None, 

102 internal_size=None, 

103 precision=None, 

104 scale=None, 

105 null_ok=field.is_nullable, 

106 ) 

107 for field in schema 

108 ) 

109 

110 def _set_rowcount(self, query_results): 

111 """Set the rowcount from query results. 

112 

113 Normally, this sets rowcount to the number of rows returned by the 

114 query, but if it was a DML statement, it sets rowcount to the number 

115 of modified rows. 

116 

117 Args: 

118 query_results (google.cloud.bigquery.query._QueryResults): 

119 Results of a query. 

120 """ 

121 total_rows = 0 

122 num_dml_affected_rows = query_results.num_dml_affected_rows 

123 

124 if query_results.total_rows is not None and query_results.total_rows > 0: 

125 total_rows = query_results.total_rows 

126 if num_dml_affected_rows is not None and num_dml_affected_rows > 0: 

127 total_rows = num_dml_affected_rows 

128 self.rowcount = total_rows 

129 

130 def execute(self, operation, parameters=None, job_id=None, job_config=None): 

131 """Prepare and execute a database operation. 

132 

133 .. note:: 

134 When setting query parameters, values which are "text" 

135 (``unicode`` in Python2, ``str`` in Python3) will use 

136 the 'STRING' BigQuery type. Values which are "bytes" (``str`` in 

137 Python2, ``bytes`` in Python3), will use using the 'BYTES' type. 

138 

139 A `~datetime.datetime` parameter without timezone information uses 

140 the 'DATETIME' BigQuery type (example: Global Pi Day Celebration 

141 March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with 

142 timezone information uses the 'TIMESTAMP' BigQuery type (example: 

143 a wedding on April 29, 2011 at 11am, British Summer Time). 

144 

145 For more information about BigQuery data types, see: 

146 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types 

147 

148 ``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not 

149 yet supported. See: 

150 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524 

151 

152 Args: 

153 operation (str): A Google BigQuery query string. 

154 

155 parameters (Union[Mapping[str, Any], Sequence[Any]]): 

156 (Optional) dictionary or sequence of parameter values. 

157 

158 job_id (str): 

159 (Optional) The job_id to use. If not set, a job ID 

160 is generated at random. 

161 

162 job_config (google.cloud.bigquery.job.QueryJobConfig): 

163 (Optional) Extra configuration options for the query job. 

164 """ 

165 formatted_operation, parameter_types = _format_operation(operation, parameters) 

166 self._execute( 

167 formatted_operation, parameters, job_id, job_config, parameter_types 

168 ) 

169 

170 def _execute( 

171 self, formatted_operation, parameters, job_id, job_config, parameter_types 

172 ): 

173 self._query_data = None 

174 self._query_job = None 

175 client = self.connection._client 

176 

177 # The DB-API uses the pyformat formatting, since the way BigQuery does 

178 # query parameters was not one of the standard options. Convert both 

179 # the query and the parameters to the format expected by the client 

180 # libraries. 

181 query_parameters = _helpers.to_query_parameters(parameters, parameter_types) 

182 

183 if client._default_query_job_config: 

184 if job_config: 

185 config = job_config._fill_from_default(client._default_query_job_config) 

186 else: 

187 config = copy.deepcopy(client._default_query_job_config) 

188 else: 

189 config = job_config or job.QueryJobConfig(use_legacy_sql=False) 

190 

191 config.query_parameters = query_parameters 

192 self._query_job = client.query( 

193 formatted_operation, job_config=config, job_id=job_id 

194 ) 

195 

196 if self._query_job.dry_run: 

197 self._set_description(schema=None) 

198 self.rowcount = 0 

199 return 

200 

201 # Wait for the query to finish. 

202 try: 

203 self._query_job.result() 

204 except google.cloud.exceptions.GoogleCloudError as exc: 

205 raise exceptions.DatabaseError(exc) 

206 

207 query_results = self._query_job._query_results 

208 self._set_rowcount(query_results) 

209 self._set_description(query_results.schema) 

210 

211 def executemany(self, operation, seq_of_parameters): 

212 """Prepare and execute a database operation multiple times. 

213 

214 Args: 

215 operation (str): A Google BigQuery query string. 

216 

217 seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]): 

218 Sequence of many sets of parameter values. 

219 """ 

220 if seq_of_parameters: 

221 rowcount = 0 

222 # There's no reason to format the line more than once, as 

223 # the operation only barely depends on the parameters. So 

224 # we just use the first set of parameters. If there are 

225 # different numbers or types of parameters, we'll error 

226 # anyway. 

227 formatted_operation, parameter_types = _format_operation( 

228 operation, seq_of_parameters[0] 

229 ) 

230 for parameters in seq_of_parameters: 

231 self._execute( 

232 formatted_operation, parameters, None, None, parameter_types 

233 ) 

234 rowcount += self.rowcount 

235 

236 self.rowcount = rowcount 

237 

238 def _try_fetch(self, size=None): 

239 """Try to start fetching data, if not yet started. 

240 

241 Mutates self to indicate that iteration has started. 

242 """ 

243 if self._query_job is None: 

244 raise exceptions.InterfaceError( 

245 "No query results: execute() must be called before fetch." 

246 ) 

247 

248 if self._query_job.dry_run: 

249 self._query_data = iter([]) 

250 return 

251 

252 if self._query_data is None: 

253 bqstorage_client = self.connection._bqstorage_client 

254 

255 if bqstorage_client is not None: 

256 rows_iterable = self._bqstorage_fetch(bqstorage_client) 

257 self._query_data = _helpers.to_bq_table_rows(rows_iterable) 

258 return 

259 

260 rows_iter = self._query_job.result(page_size=self.arraysize) 

261 self._query_data = iter(rows_iter) 

262 

263 def _bqstorage_fetch(self, bqstorage_client): 

264 """Start fetching data with the BigQuery Storage API. 

265 

266 The method assumes that the data about the relevant query job already 

267 exists internally. 

268 

269 Args: 

270 bqstorage_client(\ 

271 google.cloud.bigquery_storage_v1.BigQueryReadClient \ 

272 ): 

273 A client tha know how to talk to the BigQuery Storage API. 

274 

275 Returns: 

276 Iterable[Mapping]: 

277 A sequence of rows, represented as dictionaries. 

278 """ 

279 # Hitting this code path with a BQ Storage client instance implies that 

280 # bigquery_storage can indeed be imported here without errors. 

281 from google.cloud import bigquery_storage 

282 

283 table_reference = self._query_job.destination 

284 

285 requested_session = bigquery_storage.types.ReadSession( 

286 table=table_reference.to_bqstorage(), 

287 data_format=bigquery_storage.types.DataFormat.ARROW, 

288 ) 

289 

290 if _ARROW_COMPRESSION_SUPPORT: 

291 requested_session.read_options.arrow_serialization_options.buffer_compression = ( 

292 ArrowSerializationOptions.CompressionCodec.LZ4_FRAME 

293 ) 

294 

295 read_session = bqstorage_client.create_read_session( 

296 parent="projects/{}".format(table_reference.project), 

297 read_session=requested_session, 

298 # a single stream only, as DB API is not well-suited for multithreading 

299 max_stream_count=1, 

300 ) 

301 

302 if not read_session.streams: 

303 return iter([]) # empty table, nothing to read 

304 

305 stream_name = read_session.streams[0].name 

306 read_rows_stream = bqstorage_client.read_rows(stream_name) 

307 

308 rows_iterable = read_rows_stream.rows(read_session) 

309 return rows_iterable 

310 

311 def fetchone(self): 

312 """Fetch a single row from the results of the last ``execute*()`` call. 

313 

314 .. note:: 

315 If a dry run query was executed, no rows are returned. 

316 

317 Returns: 

318 Tuple: 

319 A tuple representing a row or ``None`` if no more data is 

320 available. 

321 

322 Raises: 

323 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

324 """ 

325 self._try_fetch() 

326 try: 

327 return next(self._query_data) 

328 except StopIteration: 

329 return None 

330 

331 def fetchmany(self, size=None): 

332 """Fetch multiple results from the last ``execute*()`` call. 

333 

334 .. note:: 

335 If a dry run query was executed, no rows are returned. 

336 

337 .. note:: 

338 The size parameter is not used for the request/response size. 

339 Set the ``arraysize`` attribute before calling ``execute()`` to 

340 set the batch size. 

341 

342 Args: 

343 size (int): 

344 (Optional) Maximum number of rows to return. Defaults to the 

345 ``arraysize`` property value. If ``arraysize`` is not set, it 

346 defaults to ``1``. 

347 

348 Returns: 

349 List[Tuple]: A list of rows. 

350 

351 Raises: 

352 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

353 """ 

354 if size is None: 

355 # Since self.arraysize can be None (a deviation from PEP 249), 

356 # use an actual PEP 249 default of 1 in such case (*some* number 

357 # is needed here). 

358 size = self.arraysize if self.arraysize else 1 

359 

360 self._try_fetch(size=size) 

361 rows = [] 

362 

363 for row in self._query_data: 

364 rows.append(row) 

365 if len(rows) >= size: 

366 break 

367 

368 return rows 

369 

370 def fetchall(self): 

371 """Fetch all remaining results from the last ``execute*()`` call. 

372 

373 .. note:: 

374 If a dry run query was executed, no rows are returned. 

375 

376 Returns: 

377 List[Tuple]: A list of all the rows in the results. 

378 

379 Raises: 

380 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

381 """ 

382 self._try_fetch() 

383 return list(self._query_data) 

384 

385 def setinputsizes(self, sizes): 

386 """No-op, but for consistency raise an error if cursor is closed.""" 

387 

388 def setoutputsize(self, size, column=None): 

389 """No-op, but for consistency raise an error if cursor is closed.""" 

390 

391 def __iter__(self): 

392 self._try_fetch() 

393 return iter(self._query_data) 

394 

395 

396def _format_operation_list(operation, parameters): 

397 """Formats parameters in operation in the way BigQuery expects. 

398 

399 The input operation will be a query like ``SELECT %s`` and the output 

400 will be a query like ``SELECT ?``. 

401 

402 Args: 

403 operation (str): A Google BigQuery query string. 

404 

405 parameters (Sequence[Any]): Sequence of parameter values. 

406 

407 Returns: 

408 str: A formatted query string. 

409 

410 Raises: 

411 google.cloud.bigquery.dbapi.ProgrammingError: 

412 if a parameter used in the operation is not found in the 

413 ``parameters`` argument. 

414 """ 

415 formatted_params = ["?" for _ in parameters] 

416 

417 try: 

418 return operation % tuple(formatted_params) 

419 except (TypeError, ValueError) as exc: 

420 raise exceptions.ProgrammingError(exc) 

421 

422 

423def _format_operation_dict(operation, parameters): 

424 """Formats parameters in operation in the way BigQuery expects. 

425 

426 The input operation will be a query like ``SELECT %(namedparam)s`` and 

427 the output will be a query like ``SELECT @namedparam``. 

428 

429 Args: 

430 operation (str): A Google BigQuery query string. 

431 

432 parameters (Mapping[str, Any]): Dictionary of parameter values. 

433 

434 Returns: 

435 str: A formatted query string. 

436 

437 Raises: 

438 google.cloud.bigquery.dbapi.ProgrammingError: 

439 if a parameter used in the operation is not found in the 

440 ``parameters`` argument. 

441 """ 

442 formatted_params = {} 

443 for name in parameters: 

444 escaped_name = name.replace("`", r"\`") 

445 formatted_params[name] = "@`{}`".format(escaped_name) 

446 

447 try: 

448 return operation % formatted_params 

449 except (KeyError, ValueError, TypeError) as exc: 

450 raise exceptions.ProgrammingError(exc) 

451 

452 

453def _format_operation(operation, parameters): 

454 """Formats parameters in operation in way BigQuery expects. 

455 

456 Args: 

457 operation (str): A Google BigQuery query string. 

458 

459 parameters (Union[Mapping[str, Any], Sequence[Any]]): 

460 Optional parameter values. 

461 

462 Returns: 

463 str: A formatted query string. 

464 

465 Raises: 

466 google.cloud.bigquery.dbapi.ProgrammingError: 

467 if a parameter used in the operation is not found in the 

468 ``parameters`` argument. 

469 """ 

470 if parameters is None or len(parameters) == 0: 

471 return operation.replace("%%", "%"), None # Still do percent de-escaping. 

472 

473 operation, parameter_types = _extract_types(operation) 

474 if parameter_types is None: 

475 raise exceptions.ProgrammingError( 

476 f"Parameters were provided, but {repr(operation)} has no placeholders." 

477 ) 

478 

479 if isinstance(parameters, collections_abc.Mapping): 

480 return _format_operation_dict(operation, parameters), parameter_types 

481 

482 return _format_operation_list(operation, parameters), parameter_types 

483 

484 

485def _extract_types( 

486 operation, 

487 extra_type_sub=re.compile( 

488 r""" 

489 (%*) # Extra %s. We'll deal with these in the replacement code 

490 

491 % # Beginning of replacement, %s, %(...)s 

492 

493 (?:\( # Begin of optional name and/or type 

494 ([^:)]*) # name 

495 (?:: # ':' introduces type 

496 ( # start of type group 

497 [a-zA-Z0-9_<>, ]+ # First part, no parens 

498 

499 (?: # start sets of parens + non-paren text 

500 \([0-9 ,]+\) # comma-separated groups of digits in parens 

501 # (e.g. string(10)) 

502 (?=[, >)]) # Must be followed by ,>) or space 

503 [a-zA-Z0-9<>, ]* # Optional non-paren chars 

504 )* # Can be zero or more of parens and following text 

505 ) # end of type group 

506 )? # close type clause ":type" 

507 \))? # End of optional name and/or type 

508 

509 s # End of replacement 

510 """, 

511 re.VERBOSE, 

512 ).sub, 

513): 

514 """Remove type information from parameter placeholders. 

515 

516 For every parameter of the form %(name:type)s, replace with %(name)s and add the 

517 item name->type to dict that's returned. 

518 

519 Returns operation without type information and a dictionary of names and types. 

520 """ 

521 parameter_types = None 

522 

523 def repl(m): 

524 nonlocal parameter_types 

525 prefix, name, type_ = m.groups() 

526 if len(prefix) % 2: 

527 # The prefix has an odd number of %s, the last of which 

528 # escapes the % we're looking for, so we don't want to 

529 # change anything. 

530 return m.group(0) 

531 

532 try: 

533 if name: 

534 if not parameter_types: 

535 parameter_types = {} 

536 if type_: 

537 if name in parameter_types: 

538 if type_ != parameter_types[name]: 

539 raise exceptions.ProgrammingError( 

540 f"Conflicting types for {name}: " 

541 f"{parameter_types[name]} and {type_}." 

542 ) 

543 else: 

544 parameter_types[name] = type_ 

545 else: 

546 if not isinstance(parameter_types, dict): 

547 raise TypeError() 

548 

549 return f"{prefix}%({name})s" 

550 else: 

551 if parameter_types is None: 

552 parameter_types = [] 

553 parameter_types.append(type_) 

554 return f"{prefix}%s" 

555 except (AttributeError, TypeError): 

556 raise exceptions.ProgrammingError( 

557 f"{repr(operation)} mixes named and unamed parameters." 

558 ) 

559 

560 return extra_type_sub(repl, operation), parameter_types