Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/cursor.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

181 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Cursor for the Google BigQuery DB-API.""" 

16 

17from __future__ import annotations 

18 

19import collections 

20from collections import abc as collections_abc 

21import re 

22from typing import Optional 

23 

24try: 

25 from google.cloud.bigquery_storage import ArrowSerializationOptions 

26except ImportError: 

27 _ARROW_COMPRESSION_SUPPORT = False 

28else: 

29 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. 

30 _ARROW_COMPRESSION_SUPPORT = True 

31 

32from google.cloud.bigquery import job 

33from google.cloud.bigquery.dbapi import _helpers 

34from google.cloud.bigquery.dbapi import exceptions 

35import google.cloud.exceptions # type: ignore 

36 

37 

38# Per PEP 249: A 7-item sequence containing information describing one result 

39# column. The first two items (name and type_code) are mandatory, the other 

40# five are optional and are set to None if no meaningful values can be 

41# provided. 

42Column = collections.namedtuple( 

43 "Column", 

44 [ 

45 "name", 

46 "type_code", 

47 "display_size", 

48 "internal_size", 

49 "precision", 

50 "scale", 

51 "null_ok", 

52 ], 

53) 

54 

55 

56@_helpers.raise_on_closed("Operating on a closed cursor.") 

57class Cursor(object): 

58 """DB-API Cursor to Google BigQuery. 

59 

60 Args: 

61 connection (google.cloud.bigquery.dbapi.Connection): 

62 A DB-API connection to Google BigQuery. 

63 """ 

64 

65 def __init__(self, connection): 

66 self.connection = connection 

67 self.description = None 

68 # Per PEP 249: The attribute is -1 in case no .execute*() has been 

69 # performed on the cursor or the rowcount of the last operation 

70 # cannot be determined by the interface. 

71 self.rowcount = -1 

72 # Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch 

73 # a single row at a time. However, we deviate from that, and set the 

74 # default to None, allowing the backend to automatically determine the 

75 # most appropriate size. 

76 self.arraysize = None 

77 self._query_data = None 

78 self._query_rows = None 

79 self._closed = False 

80 

81 @property 

82 def query_job(self) -> Optional[job.QueryJob]: 

83 """google.cloud.bigquery.job.query.QueryJob | None: The query job 

84 created by the last ``execute*()`` call, if a query job was created. 

85 

86 .. note:: 

87 If the last ``execute*()`` call was ``executemany()``, this is the 

88 last job created by ``executemany()``.""" 

89 rows = self._query_rows 

90 

91 if rows is None: 

92 return None 

93 

94 job_id = rows.job_id 

95 project = rows.project 

96 location = rows.location 

97 client = self.connection._client 

98 

99 if job_id is None: 

100 return None 

101 

102 return client.get_job(job_id, location=location, project=project) 

103 

104 def close(self): 

105 """Mark the cursor as closed, preventing its further use.""" 

106 self._closed = True 

107 

108 def _set_description(self, schema): 

109 """Set description from schema. 

110 

111 Args: 

112 schema (Sequence[google.cloud.bigquery.schema.SchemaField]): 

113 A description of fields in the schema. 

114 """ 

115 if schema is None: 

116 self.description = None 

117 return 

118 

119 self.description = tuple( 

120 Column( 

121 name=field.name, 

122 type_code=field.field_type, 

123 display_size=None, 

124 internal_size=None, 

125 precision=None, 

126 scale=None, 

127 null_ok=field.is_nullable, 

128 ) 

129 for field in schema 

130 ) 

131 

132 def _set_rowcount(self, rows): 

133 """Set the rowcount from a RowIterator. 

134 

135 Normally, this sets rowcount to the number of rows returned by the 

136 query, but if it was a DML statement, it sets rowcount to the number 

137 of modified rows. 

138 

139 Args: 

140 query_results (google.cloud.bigquery.query._QueryResults): 

141 Results of a query. 

142 """ 

143 total_rows = 0 

144 num_dml_affected_rows = rows.num_dml_affected_rows 

145 

146 if rows.total_rows is not None and rows.total_rows > 0: 

147 total_rows = rows.total_rows 

148 if num_dml_affected_rows is not None and num_dml_affected_rows > 0: 

149 total_rows = num_dml_affected_rows 

150 self.rowcount = total_rows 

151 

152 def execute(self, operation, parameters=None, job_id=None, job_config=None): 

153 """Prepare and execute a database operation. 

154 

155 .. note:: 

156 When setting query parameters, values which are "text" 

157 (``unicode`` in Python2, ``str`` in Python3) will use 

158 the 'STRING' BigQuery type. Values which are "bytes" (``str`` in 

159 Python2, ``bytes`` in Python3), will use using the 'BYTES' type. 

160 

161 A `~datetime.datetime` parameter without timezone information uses 

162 the 'DATETIME' BigQuery type (example: Global Pi Day Celebration 

163 March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with 

164 timezone information uses the 'TIMESTAMP' BigQuery type (example: 

165 a wedding on April 29, 2011 at 11am, British Summer Time). 

166 

167 For more information about BigQuery data types, see: 

168 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types 

169 

170 ``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not 

171 yet supported. See: 

172 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524 

173 

174 Args: 

175 operation (str): A Google BigQuery query string. 

176 

177 parameters (Union[Mapping[str, Any], Sequence[Any]]): 

178 (Optional) dictionary or sequence of parameter values. 

179 

180 job_id (str | None): 

181 (Optional and discouraged) The job ID to use when creating 

182 the query job. For best performance and reliability, manually 

183 setting a job ID is discouraged. 

184 

185 job_config (google.cloud.bigquery.job.QueryJobConfig): 

186 (Optional) Extra configuration options for the query job. 

187 """ 

188 formatted_operation, parameter_types = _format_operation(operation, parameters) 

189 self._execute( 

190 formatted_operation, parameters, job_id, job_config, parameter_types 

191 ) 

192 

193 def _execute( 

194 self, formatted_operation, parameters, job_id, job_config, parameter_types 

195 ): 

196 self._query_data = None 

197 self._query_results = None 

198 client = self.connection._client 

199 

200 # The DB-API uses the pyformat formatting, since the way BigQuery does 

201 # query parameters was not one of the standard options. Convert both 

202 # the query and the parameters to the format expected by the client 

203 # libraries. 

204 query_parameters = _helpers.to_query_parameters(parameters, parameter_types) 

205 

206 config = job_config or job.QueryJobConfig() 

207 config.query_parameters = query_parameters 

208 

209 # Start the query and wait for the query to finish. 

210 try: 

211 if job_id is not None: 

212 rows = client.query( 

213 formatted_operation, 

214 job_config=job_config, 

215 job_id=job_id, 

216 ).result( 

217 page_size=self.arraysize, 

218 ) 

219 else: 

220 rows = client.query_and_wait( 

221 formatted_operation, 

222 job_config=config, 

223 page_size=self.arraysize, 

224 ) 

225 except google.cloud.exceptions.GoogleCloudError as exc: 

226 raise exceptions.DatabaseError(exc) 

227 

228 self._query_rows = rows 

229 self._set_description(rows.schema) 

230 

231 if config.dry_run: 

232 self.rowcount = 0 

233 else: 

234 self._set_rowcount(rows) 

235 

236 def executemany(self, operation, seq_of_parameters): 

237 """Prepare and execute a database operation multiple times. 

238 

239 Args: 

240 operation (str): A Google BigQuery query string. 

241 

242 seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]): 

243 Sequence of many sets of parameter values. 

244 """ 

245 if seq_of_parameters: 

246 rowcount = 0 

247 # There's no reason to format the line more than once, as 

248 # the operation only barely depends on the parameters. So 

249 # we just use the first set of parameters. If there are 

250 # different numbers or types of parameters, we'll error 

251 # anyway. 

252 formatted_operation, parameter_types = _format_operation( 

253 operation, seq_of_parameters[0] 

254 ) 

255 for parameters in seq_of_parameters: 

256 self._execute( 

257 formatted_operation, parameters, None, None, parameter_types 

258 ) 

259 rowcount += self.rowcount 

260 

261 self.rowcount = rowcount 

262 

263 def _try_fetch(self, size=None): 

264 """Try to start fetching data, if not yet started. 

265 

266 Mutates self to indicate that iteration has started. 

267 """ 

268 if self._query_data is not None: 

269 # Already started fetching the data. 

270 return 

271 

272 rows = self._query_rows 

273 if rows is None: 

274 raise exceptions.InterfaceError( 

275 "No query results: execute() must be called before fetch." 

276 ) 

277 

278 bqstorage_client = self.connection._bqstorage_client 

279 if rows._should_use_bqstorage( 

280 bqstorage_client, 

281 create_bqstorage_client=False, 

282 ): 

283 rows_iterable = self._bqstorage_fetch(bqstorage_client) 

284 self._query_data = _helpers.to_bq_table_rows(rows_iterable) 

285 return 

286 

287 self._query_data = iter(rows) 

288 

289 def _bqstorage_fetch(self, bqstorage_client): 

290 """Start fetching data with the BigQuery Storage API. 

291 

292 The method assumes that the data about the relevant query job already 

293 exists internally. 

294 

295 Args: 

296 bqstorage_client(\ 

297 google.cloud.bigquery_storage_v1.BigQueryReadClient \ 

298 ): 

299 A client tha know how to talk to the BigQuery Storage API. 

300 

301 Returns: 

302 Iterable[Mapping]: 

303 A sequence of rows, represented as dictionaries. 

304 """ 

305 # Hitting this code path with a BQ Storage client instance implies that 

306 # bigquery_storage can indeed be imported here without errors. 

307 from google.cloud import bigquery_storage 

308 

309 table_reference = self._query_rows._table 

310 

311 requested_session = bigquery_storage.types.ReadSession( 

312 table=table_reference.to_bqstorage(), 

313 data_format=bigquery_storage.types.DataFormat.ARROW, 

314 ) 

315 

316 if _ARROW_COMPRESSION_SUPPORT: 

317 requested_session.read_options.arrow_serialization_options.buffer_compression = ( 

318 ArrowSerializationOptions.CompressionCodec.LZ4_FRAME 

319 ) 

320 

321 read_session = bqstorage_client.create_read_session( 

322 parent="projects/{}".format(table_reference.project), 

323 read_session=requested_session, 

324 # a single stream only, as DB API is not well-suited for multithreading 

325 max_stream_count=1, 

326 retry=None, 

327 timeout=None, 

328 ) 

329 

330 if not read_session.streams: 

331 return iter([]) # empty table, nothing to read 

332 

333 stream_name = read_session.streams[0].name 

334 read_rows_stream = bqstorage_client.read_rows(stream_name) 

335 

336 rows_iterable = read_rows_stream.rows(read_session) 

337 return rows_iterable 

338 

339 def fetchone(self): 

340 """Fetch a single row from the results of the last ``execute*()`` call. 

341 

342 .. note:: 

343 If a dry run query was executed, no rows are returned. 

344 

345 Returns: 

346 Tuple: 

347 A tuple representing a row or ``None`` if no more data is 

348 available. 

349 

350 Raises: 

351 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

352 """ 

353 self._try_fetch() 

354 try: 

355 return next(self._query_data) 

356 except StopIteration: 

357 return None 

358 

359 def fetchmany(self, size=None): 

360 """Fetch multiple results from the last ``execute*()`` call. 

361 

362 .. note:: 

363 If a dry run query was executed, no rows are returned. 

364 

365 .. note:: 

366 The size parameter is not used for the request/response size. 

367 Set the ``arraysize`` attribute before calling ``execute()`` to 

368 set the batch size. 

369 

370 Args: 

371 size (int): 

372 (Optional) Maximum number of rows to return. Defaults to the 

373 ``arraysize`` property value. If ``arraysize`` is not set, it 

374 defaults to ``1``. 

375 

376 Returns: 

377 List[Tuple]: A list of rows. 

378 

379 Raises: 

380 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

381 """ 

382 if size is None: 

383 # Since self.arraysize can be None (a deviation from PEP 249), 

384 # use an actual PEP 249 default of 1 in such case (*some* number 

385 # is needed here). 

386 size = self.arraysize if self.arraysize else 1 

387 

388 self._try_fetch(size=size) 

389 rows = [] 

390 

391 for row in self._query_data: 

392 rows.append(row) 

393 if len(rows) >= size: 

394 break 

395 

396 return rows 

397 

398 def fetchall(self): 

399 """Fetch all remaining results from the last ``execute*()`` call. 

400 

401 .. note:: 

402 If a dry run query was executed, no rows are returned. 

403 

404 Returns: 

405 List[Tuple]: A list of all the rows in the results. 

406 

407 Raises: 

408 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

409 """ 

410 self._try_fetch() 

411 return list(self._query_data) 

412 

413 def setinputsizes(self, sizes): 

414 """No-op, but for consistency raise an error if cursor is closed.""" 

415 

416 def setoutputsize(self, size, column=None): 

417 """No-op, but for consistency raise an error if cursor is closed.""" 

418 

419 def __iter__(self): 

420 self._try_fetch() 

421 return iter(self._query_data) 

422 

423 

424def _format_operation_list(operation, parameters): 

425 """Formats parameters in operation in the way BigQuery expects. 

426 

427 The input operation will be a query like ``SELECT %s`` and the output 

428 will be a query like ``SELECT ?``. 

429 

430 Args: 

431 operation (str): A Google BigQuery query string. 

432 

433 parameters (Sequence[Any]): Sequence of parameter values. 

434 

435 Returns: 

436 str: A formatted query string. 

437 

438 Raises: 

439 google.cloud.bigquery.dbapi.ProgrammingError: 

440 if a parameter used in the operation is not found in the 

441 ``parameters`` argument. 

442 """ 

443 formatted_params = ["?" for _ in parameters] 

444 

445 try: 

446 return operation % tuple(formatted_params) 

447 except (TypeError, ValueError) as exc: 

448 raise exceptions.ProgrammingError(exc) 

449 

450 

451def _format_operation_dict(operation, parameters): 

452 """Formats parameters in operation in the way BigQuery expects. 

453 

454 The input operation will be a query like ``SELECT %(namedparam)s`` and 

455 the output will be a query like ``SELECT @namedparam``. 

456 

457 Args: 

458 operation (str): A Google BigQuery query string. 

459 

460 parameters (Mapping[str, Any]): Dictionary of parameter values. 

461 

462 Returns: 

463 str: A formatted query string. 

464 

465 Raises: 

466 google.cloud.bigquery.dbapi.ProgrammingError: 

467 if a parameter used in the operation is not found in the 

468 ``parameters`` argument. 

469 """ 

470 formatted_params = {} 

471 for name in parameters: 

472 escaped_name = name.replace("`", r"\`") 

473 formatted_params[name] = "@`{}`".format(escaped_name) 

474 

475 try: 

476 return operation % formatted_params 

477 except (KeyError, ValueError, TypeError) as exc: 

478 raise exceptions.ProgrammingError(exc) 

479 

480 

481def _format_operation(operation, parameters): 

482 """Formats parameters in operation in way BigQuery expects. 

483 

484 Args: 

485 operation (str): A Google BigQuery query string. 

486 

487 parameters (Union[Mapping[str, Any], Sequence[Any]]): 

488 Optional parameter values. 

489 

490 Returns: 

491 str: A formatted query string. 

492 

493 Raises: 

494 google.cloud.bigquery.dbapi.ProgrammingError: 

495 if a parameter used in the operation is not found in the 

496 ``parameters`` argument. 

497 """ 

498 if parameters is None or len(parameters) == 0: 

499 return operation.replace("%%", "%"), None # Still do percent de-escaping. 

500 

501 operation, parameter_types = _extract_types(operation) 

502 if parameter_types is None: 

503 raise exceptions.ProgrammingError( 

504 f"Parameters were provided, but {repr(operation)} has no placeholders." 

505 ) 

506 

507 if isinstance(parameters, collections_abc.Mapping): 

508 return _format_operation_dict(operation, parameters), parameter_types 

509 

510 return _format_operation_list(operation, parameters), parameter_types 

511 

512 

513def _extract_types( 

514 operation, 

515 extra_type_sub=re.compile( 

516 r""" 

517 (%*) # Extra %s. We'll deal with these in the replacement code 

518 

519 % # Beginning of replacement, %s, %(...)s 

520 

521 (?:\( # Begin of optional name and/or type 

522 ([^:)]*) # name 

523 (?:: # ':' introduces type 

524 ( # start of type group 

525 [a-zA-Z0-9_<>, ]+ # First part, no parens 

526 

527 (?: # start sets of parens + non-paren text 

528 \([0-9 ,]+\) # comma-separated groups of digits in parens 

529 # (e.g. string(10)) 

530 (?=[, >)]) # Must be followed by ,>) or space 

531 [a-zA-Z0-9<>, ]* # Optional non-paren chars 

532 )* # Can be zero or more of parens and following text 

533 ) # end of type group 

534 )? # close type clause ":type" 

535 \))? # End of optional name and/or type 

536 

537 s # End of replacement 

538 """, 

539 re.VERBOSE, 

540 ).sub, 

541): 

542 """Remove type information from parameter placeholders. 

543 

544 For every parameter of the form %(name:type)s, replace with %(name)s and add the 

545 item name->type to dict that's returned. 

546 

547 Returns operation without type information and a dictionary of names and types. 

548 """ 

549 parameter_types = None 

550 

551 def repl(m): 

552 nonlocal parameter_types 

553 prefix, name, type_ = m.groups() 

554 if len(prefix) % 2: 

555 # The prefix has an odd number of %s, the last of which 

556 # escapes the % we're looking for, so we don't want to 

557 # change anything. 

558 return m.group(0) 

559 

560 try: 

561 if name: 

562 if not parameter_types: 

563 parameter_types = {} 

564 if type_: 

565 if name in parameter_types: 

566 if type_ != parameter_types[name]: 

567 raise exceptions.ProgrammingError( 

568 f"Conflicting types for {name}: " 

569 f"{parameter_types[name]} and {type_}." 

570 ) 

571 else: 

572 parameter_types[name] = type_ 

573 else: 

574 if not isinstance(parameter_types, dict): 

575 raise TypeError() 

576 

577 return f"{prefix}%({name})s" 

578 else: 

579 if parameter_types is None: 

580 parameter_types = [] 

581 parameter_types.append(type_) 

582 return f"{prefix}%s" 

583 except (AttributeError, TypeError): 

584 raise exceptions.ProgrammingError( 

585 f"{repr(operation)} mixes named and unamed parameters." 

586 ) 

587 

588 return extra_type_sub(repl, operation), parameter_types