Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/cursor.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

181 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Cursor for the Google BigQuery DB-API.""" 

16 

17from __future__ import annotations 

18 

19import collections 

20from collections import abc as collections_abc 

21import re 

22from typing import Optional 

23 

24try: 

25 from google.cloud.bigquery_storage import ArrowSerializationOptions 

26except ImportError: 

27 _ARROW_COMPRESSION_SUPPORT = False 

28else: 

29 # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. 

30 _ARROW_COMPRESSION_SUPPORT = True 

31 

32from google.cloud.bigquery import job 

33from google.cloud.bigquery.dbapi import _helpers 

34from google.cloud.bigquery.dbapi import exceptions 

35import google.cloud.exceptions # type: ignore 

36 

37 

38# Per PEP 249: A 7-item sequence containing information describing one result 

39# column. The first two items (name and type_code) are mandatory, the other 

40# five are optional and are set to None if no meaningful values can be 

41# provided. 

42Column = collections.namedtuple( 

43 "Column", 

44 [ 

45 "name", 

46 "type_code", 

47 "display_size", 

48 "internal_size", 

49 "precision", 

50 "scale", 

51 "null_ok", 

52 ], 

53) 

54 

55 

56@_helpers.raise_on_closed("Operating on a closed cursor.") 

57class Cursor(object): 

58 """DB-API Cursor to Google BigQuery. 

59 

60 Args: 

61 connection (google.cloud.bigquery.dbapi.Connection): 

62 A DB-API connection to Google BigQuery. 

63 """ 

64 

65 def __init__(self, connection): 

66 self.connection = connection 

67 self.description = None 

68 # Per PEP 249: The attribute is -1 in case no .execute*() has been 

69 # performed on the cursor or the rowcount of the last operation 

70 # cannot be determined by the interface. 

71 self.rowcount = -1 

72 # Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch 

73 # a single row at a time. However, we deviate from that, and set the 

74 # default to None, allowing the backend to automatically determine the 

75 # most appropriate size. 

76 self.arraysize = None 

77 self._query_data = None 

78 self._query_rows = None 

79 self._closed = False 

80 

81 @property 

82 def query_job(self) -> Optional[job.QueryJob]: 

83 """google.cloud.bigquery.job.query.QueryJob | None: The query job 

84 created by the last ``execute*()`` call, if a query job was created. 

85 

86 .. note:: 

87 If the last ``execute*()`` call was ``executemany()``, this is the 

88 last job created by ``executemany()``.""" 

89 rows = self._query_rows 

90 

91 if rows is None: 

92 return None 

93 

94 job_id = rows.job_id 

95 project = rows.project 

96 location = rows.location 

97 client = self.connection._client 

98 

99 if job_id is None: 

100 return None 

101 

102 return client.get_job(job_id, location=location, project=project) 

103 

104 def close(self): 

105 """Mark the cursor as closed, preventing its further use.""" 

106 self._closed = True 

107 

108 def _set_description(self, schema): 

109 """Set description from schema. 

110 

111 Args: 

112 schema (Sequence[google.cloud.bigquery.schema.SchemaField]): 

113 A description of fields in the schema. 

114 """ 

115 if schema is None: 

116 self.description = None 

117 return 

118 

119 self.description = tuple( 

120 Column( 

121 name=field.name, 

122 type_code=field.field_type, 

123 display_size=None, 

124 internal_size=None, 

125 precision=None, 

126 scale=None, 

127 null_ok=field.is_nullable, 

128 ) 

129 for field in schema 

130 ) 

131 

132 def _set_rowcount(self, rows): 

133 """Set the rowcount from a RowIterator. 

134 

135 Normally, this sets rowcount to the number of rows returned by the 

136 query, but if it was a DML statement, it sets rowcount to the number 

137 of modified rows. 

138 

139 Args: 

140 query_results (google.cloud.bigquery.query._QueryResults): 

141 Results of a query. 

142 """ 

143 total_rows = 0 

144 num_dml_affected_rows = rows.num_dml_affected_rows 

145 

146 if rows.total_rows is not None and rows.total_rows > 0: 

147 total_rows = rows.total_rows 

148 if num_dml_affected_rows is not None and num_dml_affected_rows > 0: 

149 total_rows = num_dml_affected_rows 

150 self.rowcount = total_rows 

151 

152 def execute(self, operation, parameters=None, job_id=None, job_config=None): 

153 """Prepare and execute a database operation. 

154 

155 .. note:: 

156 When setting query parameters, values which are "text" 

157 (``unicode`` in Python2, ``str`` in Python3) will use 

158 the 'STRING' BigQuery type. Values which are "bytes" (``str`` in 

159 Python2, ``bytes`` in Python3), will use using the 'BYTES' type. 

160 

161 A `~datetime.datetime` parameter without timezone information uses 

162 the 'DATETIME' BigQuery type (example: Global Pi Day Celebration 

163 March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with 

164 timezone information uses the 'TIMESTAMP' BigQuery type (example: 

165 a wedding on April 29, 2011 at 11am, British Summer Time). 

166 

167 For more information about BigQuery data types, see: 

168 https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types 

169 

170 ``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not 

171 yet supported. See: 

172 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524 

173 

174 Args: 

175 operation (str): A Google BigQuery query string. 

176 

177 parameters (Union[Mapping[str, Any], Sequence[Any]]): 

178 (Optional) dictionary or sequence of parameter values. 

179 

180 job_id (str | None): 

181 (Optional and discouraged) The job ID to use when creating 

182 the query job. For best performance and reliability, manually 

183 setting a job ID is discouraged. 

184 

185 job_config (google.cloud.bigquery.job.QueryJobConfig): 

186 (Optional) Extra configuration options for the query job. 

187 """ 

188 formatted_operation, parameter_types = _format_operation(operation, parameters) 

189 self._execute( 

190 formatted_operation, parameters, job_id, job_config, parameter_types 

191 ) 

192 

193 def _execute( 

194 self, formatted_operation, parameters, job_id, job_config, parameter_types 

195 ): 

196 self._query_data = None 

197 self._query_results = None 

198 client = self.connection._client 

199 

200 # The DB-API uses the pyformat formatting, since the way BigQuery does 

201 # query parameters was not one of the standard options. Convert both 

202 # the query and the parameters to the format expected by the client 

203 # libraries. 

204 query_parameters = _helpers.to_query_parameters(parameters, parameter_types) 

205 

206 config = job_config or job.QueryJobConfig() 

207 config.query_parameters = query_parameters 

208 

209 # Start the query and wait for the query to finish. 

210 try: 

211 if job_id is not None: 

212 rows = client.query( 

213 formatted_operation, 

214 job_config=job_config, 

215 job_id=job_id, 

216 ).result( 

217 page_size=self.arraysize, 

218 ) 

219 else: 

220 rows = client.query_and_wait( 

221 formatted_operation, 

222 job_config=config, 

223 page_size=self.arraysize, 

224 ) 

225 except google.cloud.exceptions.GoogleCloudError as exc: 

226 raise exceptions.DatabaseError(exc) 

227 

228 self._query_rows = rows 

229 self._set_description(rows.schema) 

230 

231 if config.dry_run: 

232 self.rowcount = 0 

233 else: 

234 self._set_rowcount(rows) 

235 

236 def executemany(self, operation, seq_of_parameters): 

237 """Prepare and execute a database operation multiple times. 

238 

239 Args: 

240 operation (str): A Google BigQuery query string. 

241 

242 seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]): 

243 Sequence of many sets of parameter values. 

244 """ 

245 if seq_of_parameters: 

246 rowcount = 0 

247 # There's no reason to format the line more than once, as 

248 # the operation only barely depends on the parameters. So 

249 # we just use the first set of parameters. If there are 

250 # different numbers or types of parameters, we'll error 

251 # anyway. 

252 formatted_operation, parameter_types = _format_operation( 

253 operation, seq_of_parameters[0] 

254 ) 

255 for parameters in seq_of_parameters: 

256 self._execute( 

257 formatted_operation, parameters, None, None, parameter_types 

258 ) 

259 rowcount += self.rowcount 

260 

261 self.rowcount = rowcount 

262 

263 def _try_fetch(self, size=None): 

264 """Try to start fetching data, if not yet started. 

265 

266 Mutates self to indicate that iteration has started. 

267 """ 

268 if self._query_data is not None: 

269 # Already started fetching the data. 

270 return 

271 

272 rows = self._query_rows 

273 if rows is None: 

274 raise exceptions.InterfaceError( 

275 "No query results: execute() must be called before fetch." 

276 ) 

277 

278 bqstorage_client = self.connection._bqstorage_client 

279 if rows._should_use_bqstorage( 

280 bqstorage_client, 

281 create_bqstorage_client=False, 

282 ): 

283 rows_iterable = self._bqstorage_fetch(bqstorage_client) 

284 self._query_data = _helpers.to_bq_table_rows(rows_iterable) 

285 return 

286 

287 self._query_data = iter(rows) 

288 

289 def _bqstorage_fetch(self, bqstorage_client): 

290 """Start fetching data with the BigQuery Storage API. 

291 

292 The method assumes that the data about the relevant query job already 

293 exists internally. 

294 

295 Args: 

296 bqstorage_client(\ 

297 google.cloud.bigquery_storage_v1.BigQueryReadClient \ 

298 ): 

299 A client tha know how to talk to the BigQuery Storage API. 

300 

301 Returns: 

302 Iterable[Mapping]: 

303 A sequence of rows, represented as dictionaries. 

304 """ 

305 # Hitting this code path with a BQ Storage client instance implies that 

306 # bigquery_storage can indeed be imported here without errors. 

307 from google.cloud import bigquery_storage 

308 

309 table_reference = self._query_rows._table 

310 

311 requested_session = bigquery_storage.types.ReadSession( 

312 table=table_reference.to_bqstorage(), 

313 data_format=bigquery_storage.types.DataFormat.ARROW, 

314 ) 

315 

316 if _ARROW_COMPRESSION_SUPPORT: 

317 requested_session.read_options.arrow_serialization_options.buffer_compression = ( 

318 ArrowSerializationOptions.CompressionCodec.LZ4_FRAME 

319 ) 

320 

321 read_session = bqstorage_client.create_read_session( 

322 parent="projects/{}".format(table_reference.project), 

323 read_session=requested_session, 

324 # a single stream only, as DB API is not well-suited for multithreading 

325 max_stream_count=1, 

326 ) 

327 

328 if not read_session.streams: 

329 return iter([]) # empty table, nothing to read 

330 

331 stream_name = read_session.streams[0].name 

332 read_rows_stream = bqstorage_client.read_rows(stream_name) 

333 

334 rows_iterable = read_rows_stream.rows(read_session) 

335 return rows_iterable 

336 

337 def fetchone(self): 

338 """Fetch a single row from the results of the last ``execute*()`` call. 

339 

340 .. note:: 

341 If a dry run query was executed, no rows are returned. 

342 

343 Returns: 

344 Tuple: 

345 A tuple representing a row or ``None`` if no more data is 

346 available. 

347 

348 Raises: 

349 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

350 """ 

351 self._try_fetch() 

352 try: 

353 return next(self._query_data) 

354 except StopIteration: 

355 return None 

356 

357 def fetchmany(self, size=None): 

358 """Fetch multiple results from the last ``execute*()`` call. 

359 

360 .. note:: 

361 If a dry run query was executed, no rows are returned. 

362 

363 .. note:: 

364 The size parameter is not used for the request/response size. 

365 Set the ``arraysize`` attribute before calling ``execute()`` to 

366 set the batch size. 

367 

368 Args: 

369 size (int): 

370 (Optional) Maximum number of rows to return. Defaults to the 

371 ``arraysize`` property value. If ``arraysize`` is not set, it 

372 defaults to ``1``. 

373 

374 Returns: 

375 List[Tuple]: A list of rows. 

376 

377 Raises: 

378 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

379 """ 

380 if size is None: 

381 # Since self.arraysize can be None (a deviation from PEP 249), 

382 # use an actual PEP 249 default of 1 in such case (*some* number 

383 # is needed here). 

384 size = self.arraysize if self.arraysize else 1 

385 

386 self._try_fetch(size=size) 

387 rows = [] 

388 

389 for row in self._query_data: 

390 rows.append(row) 

391 if len(rows) >= size: 

392 break 

393 

394 return rows 

395 

396 def fetchall(self): 

397 """Fetch all remaining results from the last ``execute*()`` call. 

398 

399 .. note:: 

400 If a dry run query was executed, no rows are returned. 

401 

402 Returns: 

403 List[Tuple]: A list of all the rows in the results. 

404 

405 Raises: 

406 google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``. 

407 """ 

408 self._try_fetch() 

409 return list(self._query_data) 

410 

411 def setinputsizes(self, sizes): 

412 """No-op, but for consistency raise an error if cursor is closed.""" 

413 

414 def setoutputsize(self, size, column=None): 

415 """No-op, but for consistency raise an error if cursor is closed.""" 

416 

417 def __iter__(self): 

418 self._try_fetch() 

419 return iter(self._query_data) 

420 

421 

422def _format_operation_list(operation, parameters): 

423 """Formats parameters in operation in the way BigQuery expects. 

424 

425 The input operation will be a query like ``SELECT %s`` and the output 

426 will be a query like ``SELECT ?``. 

427 

428 Args: 

429 operation (str): A Google BigQuery query string. 

430 

431 parameters (Sequence[Any]): Sequence of parameter values. 

432 

433 Returns: 

434 str: A formatted query string. 

435 

436 Raises: 

437 google.cloud.bigquery.dbapi.ProgrammingError: 

438 if a parameter used in the operation is not found in the 

439 ``parameters`` argument. 

440 """ 

441 formatted_params = ["?" for _ in parameters] 

442 

443 try: 

444 return operation % tuple(formatted_params) 

445 except (TypeError, ValueError) as exc: 

446 raise exceptions.ProgrammingError(exc) 

447 

448 

449def _format_operation_dict(operation, parameters): 

450 """Formats parameters in operation in the way BigQuery expects. 

451 

452 The input operation will be a query like ``SELECT %(namedparam)s`` and 

453 the output will be a query like ``SELECT @namedparam``. 

454 

455 Args: 

456 operation (str): A Google BigQuery query string. 

457 

458 parameters (Mapping[str, Any]): Dictionary of parameter values. 

459 

460 Returns: 

461 str: A formatted query string. 

462 

463 Raises: 

464 google.cloud.bigquery.dbapi.ProgrammingError: 

465 if a parameter used in the operation is not found in the 

466 ``parameters`` argument. 

467 """ 

468 formatted_params = {} 

469 for name in parameters: 

470 escaped_name = name.replace("`", r"\`") 

471 formatted_params[name] = "@`{}`".format(escaped_name) 

472 

473 try: 

474 return operation % formatted_params 

475 except (KeyError, ValueError, TypeError) as exc: 

476 raise exceptions.ProgrammingError(exc) 

477 

478 

479def _format_operation(operation, parameters): 

480 """Formats parameters in operation in way BigQuery expects. 

481 

482 Args: 

483 operation (str): A Google BigQuery query string. 

484 

485 parameters (Union[Mapping[str, Any], Sequence[Any]]): 

486 Optional parameter values. 

487 

488 Returns: 

489 str: A formatted query string. 

490 

491 Raises: 

492 google.cloud.bigquery.dbapi.ProgrammingError: 

493 if a parameter used in the operation is not found in the 

494 ``parameters`` argument. 

495 """ 

496 if parameters is None or len(parameters) == 0: 

497 return operation.replace("%%", "%"), None # Still do percent de-escaping. 

498 

499 operation, parameter_types = _extract_types(operation) 

500 if parameter_types is None: 

501 raise exceptions.ProgrammingError( 

502 f"Parameters were provided, but {repr(operation)} has no placeholders." 

503 ) 

504 

505 if isinstance(parameters, collections_abc.Mapping): 

506 return _format_operation_dict(operation, parameters), parameter_types 

507 

508 return _format_operation_list(operation, parameters), parameter_types 

509 

510 

511def _extract_types( 

512 operation, 

513 extra_type_sub=re.compile( 

514 r""" 

515 (%*) # Extra %s. We'll deal with these in the replacement code 

516 

517 % # Beginning of replacement, %s, %(...)s 

518 

519 (?:\( # Begin of optional name and/or type 

520 ([^:)]*) # name 

521 (?:: # ':' introduces type 

522 ( # start of type group 

523 [a-zA-Z0-9_<>, ]+ # First part, no parens 

524 

525 (?: # start sets of parens + non-paren text 

526 \([0-9 ,]+\) # comma-separated groups of digits in parens 

527 # (e.g. string(10)) 

528 (?=[, >)]) # Must be followed by ,>) or space 

529 [a-zA-Z0-9<>, ]* # Optional non-paren chars 

530 )* # Can be zero or more of parens and following text 

531 ) # end of type group 

532 )? # close type clause ":type" 

533 \))? # End of optional name and/or type 

534 

535 s # End of replacement 

536 """, 

537 re.VERBOSE, 

538 ).sub, 

539): 

540 """Remove type information from parameter placeholders. 

541 

542 For every parameter of the form %(name:type)s, replace with %(name)s and add the 

543 item name->type to dict that's returned. 

544 

545 Returns operation without type information and a dictionary of names and types. 

546 """ 

547 parameter_types = None 

548 

549 def repl(m): 

550 nonlocal parameter_types 

551 prefix, name, type_ = m.groups() 

552 if len(prefix) % 2: 

553 # The prefix has an odd number of %s, the last of which 

554 # escapes the % we're looking for, so we don't want to 

555 # change anything. 

556 return m.group(0) 

557 

558 try: 

559 if name: 

560 if not parameter_types: 

561 parameter_types = {} 

562 if type_: 

563 if name in parameter_types: 

564 if type_ != parameter_types[name]: 

565 raise exceptions.ProgrammingError( 

566 f"Conflicting types for {name}: " 

567 f"{parameter_types[name]} and {type_}." 

568 ) 

569 else: 

570 parameter_types[name] = type_ 

571 else: 

572 if not isinstance(parameter_types, dict): 

573 raise TypeError() 

574 

575 return f"{prefix}%({name})s" 

576 else: 

577 if parameter_types is None: 

578 parameter_types = [] 

579 parameter_types.append(type_) 

580 return f"{prefix}%s" 

581 except (AttributeError, TypeError): 

582 raise exceptions.ProgrammingError( 

583 f"{repr(operation)} mixes named and unamed parameters." 

584 ) 

585 

586 return extra_type_sub(repl, operation), parameter_types