Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/reader.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

271 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# https://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import absolute_import 

16 

17import collections 

18import io 

19import json 

20import time 

21 

22try: 

23 import fastavro 

24except ImportError: # pragma: NO COVER 

25 fastavro = None 

26import google.api_core.exceptions 

27import google.rpc.error_details_pb2 

28 

29try: 

30 import pandas 

31except ImportError: # pragma: NO COVER 

32 pandas = None 

33try: 

34 import pyarrow 

35except ImportError: # pragma: NO COVER 

36 pyarrow = None 

37 

38try: 

39 import pyarrow 

40except ImportError: # pragma: NO COVER 

41 pyarrow = None 

42 

43 

44_STREAM_RESUMPTION_EXCEPTIONS = ( 

45 google.api_core.exceptions.ServiceUnavailable, 

46 # Caused by transport-level error. No status code was received. 

47 # https://github.com/googleapis/python-bigquery-storage/issues/262 

48 google.api_core.exceptions.Unknown, 

49) 

50 

51# The Google API endpoint can unexpectedly close long-running HTTP/2 streams. 

52# Unfortunately, this condition is surfaced to the caller as an internal error 

53# by gRPC. We don't want to resume on all internal errors, so instead we look 

54# for error message that we know are caused by problems that are safe to 

55# reconnect. 

56_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = ( 

57 # See: https://github.com/googleapis/google-cloud-python/pull/9994 

58 "RST_STREAM", 

59) 

60 

61_FASTAVRO_REQUIRED = ( 

62 "fastavro is required to parse ReadRowResponse messages with Avro bytes." 

63) 

64_PANDAS_REQUIRED = "pandas is required to create a DataFrame" 

65_PYARROW_REQUIRED = ( 

66 "pyarrow is required to parse ReadRowResponse messages with Arrow bytes." 

67) 

68 

69 

70class ReadRowsStream(object): 

71 """A stream of results from a read rows request. 

72 

73 This stream is an iterable of 

74 :class:`~google.cloud.bigquery_storage_v1.types.ReadRowsResponse`. 

75 Iterate over it to fetch all row messages. 

76 

77 If the fastavro library is installed, use the 

78 :func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.rows()` 

79 method to parse all messages into a stream of row dictionaries. 

80 

81 If the pandas and fastavro libraries are installed, use the 

82 :func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.to_dataframe()` 

83 method to parse all messages into a :class:`pandas.DataFrame`. 

84 

85 This object should not be created directly, but is returned by 

86 other methods in this library. 

87 """ 

88 

89 def __init__( 

90 self, client, name, offset, read_rows_kwargs, retry_delay_callback=None 

91 ): 

92 """Construct a ReadRowsStream. 

93 

94 Args: 

95 client ( \ 

96 ~google.cloud.bigquery_storage_v1.services. \ 

97 big_query_read.BigQueryReadClient \ 

98 ): 

99 A GAPIC client used to reconnect to a ReadRows stream. This 

100 must be the GAPIC client to avoid a circular dependency on 

101 this class. 

102 name (str): 

103 Required. Stream ID from which rows are being read. 

104 offset (int): 

105 Required. Position in the stream to start 

106 reading from. The offset requested must be less than the last 

107 row read from ReadRows. Requesting a larger offset is 

108 undefined. 

109 read_rows_kwargs (dict): 

110 Keyword arguments to use when reconnecting to a ReadRows 

111 stream. 

112 retry_delay_callback (Optional[Callable[[float], None]]): 

113 If the client receives a retryable error that asks the client to 

114 delay its next attempt and retry_delay_callback is not None, 

115 ReadRowsStream will call retry_delay_callback with the delay 

116 duration (in seconds) before it starts sleeping until the next 

117 attempt. 

118 

119 Returns: 

120 Iterable[ \ 

121 ~google.cloud.bigquery_storage.types.ReadRowsResponse \ 

122 ]: 

123 A sequence of row messages. 

124 """ 

125 

126 # Make a copy of the read position so that we can update it without 

127 # mutating the original input. 

128 self._client = client 

129 self._name = name 

130 self._offset = offset 

131 self._read_rows_kwargs = read_rows_kwargs 

132 self._retry_delay_callback = retry_delay_callback 

133 self._wrapped = None 

134 

135 def __iter__(self): 

136 """An iterable of messages. 

137 

138 Returns: 

139 Iterable[ \ 

140 ~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \ 

141 ]: 

142 A sequence of row messages. 

143 """ 

144 # Infinite loop to reconnect on reconnectable errors while processing 

145 # the row stream. 

146 

147 if self._wrapped is None: 

148 self._reconnect() 

149 

150 while True: 

151 try: 

152 for message in self._wrapped: 

153 rowcount = message.row_count 

154 self._offset += rowcount 

155 yield message 

156 

157 return # Made it through the whole stream. 

158 except google.api_core.exceptions.InternalServerError as exc: 

159 resumable_error = any( 

160 resumable_message in exc.message 

161 for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES 

162 ) 

163 if not resumable_error: 

164 raise 

165 except _STREAM_RESUMPTION_EXCEPTIONS: 

166 # Transient error, so reconnect to the stream. 

167 pass 

168 except Exception as exc: 

169 if not self._resource_exhausted_exception_is_retryable(exc): 

170 raise 

171 

172 self._reconnect() 

173 

174 def _reconnect(self): 

175 """Reconnect to the ReadRows stream using the most recent offset.""" 

176 while True: 

177 try: 

178 self._wrapped = self._client.read_rows( 

179 read_stream=self._name, 

180 offset=self._offset, 

181 **self._read_rows_kwargs 

182 ) 

183 break 

184 except Exception as exc: 

185 if not self._resource_exhausted_exception_is_retryable(exc): 

186 raise 

187 

188 def _resource_exhausted_exception_is_retryable(self, exc): 

189 if isinstance(exc, google.api_core.exceptions.ResourceExhausted): 

190 # ResourceExhausted errors are only retried if a valid 

191 # RetryInfo is provided with the error. 

192 # 

193 # TODO: Remove hasattr logic when we require google-api-core >= 2.2.0. 

194 # ResourceExhausted added details/_details in google-api-core 2.2.0. 

195 details = None 

196 if hasattr(exc, "details"): 

197 details = exc.details 

198 elif hasattr(exc, "_details"): 

199 details = exc._details 

200 if details is not None: 

201 for detail in details: 

202 if isinstance(detail, google.rpc.error_details_pb2.RetryInfo): 

203 retry_delay = detail.retry_delay 

204 if retry_delay is not None: 

205 delay = max( 

206 0, 

207 float(retry_delay.seconds) 

208 + (float(retry_delay.nanos) / 1e9), 

209 ) 

210 if self._retry_delay_callback: 

211 self._retry_delay_callback(delay) 

212 time.sleep(delay) 

213 return True 

214 return False 

215 

216 def rows(self, read_session=None): 

217 """Iterate over all rows in the stream. 

218 

219 This method requires the fastavro library in order to parse row 

220 messages in avro format. For arrow format messages, the pyarrow 

221 library is required. 

222 

223 .. warning:: 

224 DATETIME columns are not supported. They are currently parsed as 

225 strings in the fastavro library. 

226 

227 Args: 

228 read_session ( \ 

229 Optional[~google.cloud.bigquery_storage_v1.types.ReadSession] \ 

230 ): 

231 This argument was used to specify the schema of the rows in the 

232 stream, but now the first message in a read stream contains 

233 this information. When row_restriction is applied, some streams 

234 may be empty without read_session info. Provide this argument 

235 to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733 

236 

237 Returns: 

238 Iterable[Mapping]: 

239 A sequence of rows, represented as dictionaries. 

240 """ 

241 return ReadRowsIterable(self, read_session=read_session) 

242 

243 def to_arrow(self, read_session=None): 

244 """Create a :class:`pyarrow.Table` of all rows in the stream. 

245 

246 This method requires the pyarrow library and a stream using the Arrow 

247 format. 

248 

249 Args: 

250 read_session ( \ 

251 ~google.cloud.bigquery_storage_v1.types.ReadSession \ 

252 ): 

253 This argument was used to specify the schema of the rows in the 

254 stream, but now the first message in a read stream contains 

255 this information. When row_restriction is applied, some streams 

256 may be empty without read_session info. Provide this argument 

257 to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733 

258 

259 Returns: 

260 pyarrow.Table: 

261 A table of all rows in the stream. 

262 """ 

263 return self.rows(read_session=read_session).to_arrow() 

264 

265 def to_dataframe(self, read_session=None, dtypes=None): 

266 """Create a :class:`pandas.DataFrame` of all rows in the stream. 

267 

268 This method requires the pandas libary to create a data frame and the 

269 fastavro library to parse row messages. 

270 

271 .. warning:: 

272 DATETIME columns are not supported. They are currently parsed as 

273 strings. 

274 

275 Args: 

276 read_session ( \ 

277 ~google.cloud.bigquery_storage_v1.types.ReadSession \ 

278 ): 

279 This argument was used to specify the schema of the rows in the 

280 stream, but now the first message in a read stream contains 

281 this information. When row_restriction is applied, some streams 

282 may be empty without read_session info. Provide this argument 

283 to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733 

284 dtypes ( \ 

285 Map[str, Union[str, pandas.Series.dtype]] \ 

286 ): 

287 Optional. A dictionary of column names pandas ``dtype``s. The 

288 provided ``dtype`` is used when constructing the series for 

289 the column specified. Otherwise, the default pandas behavior 

290 is used. 

291 

292 Returns: 

293 pandas.DataFrame: 

294 A data frame of all rows in the stream. 

295 """ 

296 if pandas is None: 

297 raise ImportError(_PANDAS_REQUIRED) 

298 

299 return self.rows(read_session=read_session).to_dataframe(dtypes=dtypes) 

300 

301 

302class ReadRowsIterable(object): 

303 """An iterable of rows from a read session. 

304 

305 Args: 

306 reader (google.cloud.bigquery_storage_v1.reader.ReadRowsStream): 

307 A read rows stream. 

308 read_session ( \ 

309 Optional[~google.cloud.bigquery_storage_v1.types.ReadSession] \ 

310 ): 

311 This argument was used to specify the schema of the rows in the 

312 stream, but now the first message in a read stream contains 

313 this information. When row_restriction is applied, some streams 

314 may be empty without read_session info. Provide this argument 

315 to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733ß 

316 """ 

317 

318 # This class is modelled after the google.cloud.bigquery.table.RowIterator 

319 # and aims to be API compatible where possible. 

320 

321 def __init__(self, reader, read_session=None): 

322 self._reader = reader 

323 if read_session is not None: 

324 self._stream_parser = _StreamParser.from_read_session(read_session) 

325 else: 

326 self._stream_parser = None 

327 

328 @property 

329 def pages(self): 

330 """A generator of all pages in the stream. 

331 

332 Returns: 

333 types.GeneratorType[google.cloud.bigquery_storage_v1.ReadRowsPage]: 

334 A generator of pages. 

335 """ 

336 # Each page is an iterator of rows. But also has num_items, remaining, 

337 # and to_dataframe. 

338 for message in self._reader: 

339 # Only the first message contains the schema, which is needed to 

340 # decode the messages. 

341 if not self._stream_parser: 

342 self._stream_parser = _StreamParser.from_read_rows_response(message) 

343 yield ReadRowsPage(self._stream_parser, message) 

344 

345 def __iter__(self): 

346 """Iterator for each row in all pages.""" 

347 for page in self.pages: 

348 for row in page: 

349 yield row 

350 

351 def to_arrow(self): 

352 """Create a :class:`pyarrow.Table` of all rows in the stream. 

353 

354 This method requires the pyarrow library and a stream using the Arrow 

355 format. 

356 

357 Returns: 

358 pyarrow.Table: 

359 A table of all rows in the stream. 

360 """ 

361 record_batches = [] 

362 for page in self.pages: 

363 record_batches.append(page.to_arrow()) 

364 

365 if record_batches: 

366 return pyarrow.Table.from_batches(record_batches) 

367 

368 # No data, return an empty Table. 

369 self._stream_parser._parse_arrow_schema() 

370 return pyarrow.Table.from_batches([], schema=self._stream_parser._schema) 

371 

372 def to_dataframe(self, dtypes=None): 

373 """Create a :class:`pandas.DataFrame` of all rows in the stream. 

374 

375 This method requires the pandas libary to create a data frame and the 

376 fastavro library to parse row messages. 

377 

378 .. warning:: 

379 DATETIME columns are not supported. They are currently parsed as 

380 strings in the fastavro library. 

381 

382 Args: 

383 dtypes ( \ 

384 Map[str, Union[str, pandas.Series.dtype]] \ 

385 ): 

386 Optional. A dictionary of column names pandas ``dtype``s. The 

387 provided ``dtype`` is used when constructing the series for 

388 the column specified. Otherwise, the default pandas behavior 

389 is used. 

390 

391 Returns: 

392 pandas.DataFrame: 

393 A data frame of all rows in the stream. 

394 """ 

395 if pandas is None: 

396 raise ImportError(_PANDAS_REQUIRED) 

397 

398 if dtypes is None: 

399 dtypes = {} 

400 

401 # If it's an Arrow stream, calling to_arrow, then converting to a 

402 # pandas dataframe is about 2x faster. This is because pandas.concat is 

403 # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is 

404 # usually no-copy. 

405 try: 

406 record_batch = self.to_arrow() 

407 except NotImplementedError: 

408 pass 

409 else: 

410 df = record_batch.to_pandas() 

411 for column in dtypes: 

412 df[column] = pandas.Series(df[column], dtype=dtypes[column]) 

413 return df 

414 

415 frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages] 

416 

417 if frames: 

418 return pandas.concat(frames) 

419 

420 # No data, construct an empty dataframe with columns matching the schema. 

421 # The result should be consistent with what an empty ARROW stream would produce. 

422 self._stream_parser._parse_avro_schema() 

423 schema = self._stream_parser._avro_schema_json 

424 

425 column_dtypes = self._dtypes_from_avro(schema["fields"]) 

426 column_dtypes.update(dtypes) 

427 

428 df = pandas.DataFrame(columns=column_dtypes.keys()) 

429 for column in df: 

430 df[column] = pandas.Series([], dtype=column_dtypes[column]) 

431 

432 return df 

433 

434 def _dtypes_from_avro(self, avro_fields): 

435 """Determine Pandas dtypes for columns in Avro schema. 

436 

437 Args: 

438 avro_fields (Iterable[Mapping[str, Any]]): 

439 Avro fields' metadata. 

440 

441 Returns: 

442 colelctions.OrderedDict[str, str]: 

443 Column names with their corresponding Pandas dtypes. 

444 """ 

445 result = collections.OrderedDict() 

446 

447 type_map = {"long": "int64", "double": "float64", "boolean": "bool"} 

448 

449 for field_info in avro_fields: 

450 # If a type is an union of multiple types, pick the first type 

451 # that is not "null". 

452 if isinstance(field_info["type"], list): 

453 type_info = next(item for item in field_info["type"] if item != "null") 

454 

455 if isinstance(type_info, str): 

456 field_dtype = type_map.get(type_info, "object") 

457 else: 

458 logical_type = type_info.get("logicalType") 

459 if logical_type == "timestamp-micros": 

460 field_dtype = "datetime64[ns, UTC]" 

461 else: 

462 field_dtype = "object" 

463 

464 result[field_info["name"]] = field_dtype 

465 

466 return result 

467 

468 

469class ReadRowsPage(object): 

470 """An iterator of rows from a read session message. 

471 

472 Args: 

473 stream_parser (google.cloud.bigquery_storage_v1.reader._StreamParser): 

474 A helper for parsing messages into rows. 

475 message (google.cloud.bigquery_storage_v1.types.ReadRowsResponse): 

476 A message of data from a read rows stream. 

477 """ 

478 

479 # This class is modeled after google.api_core.page_iterator.Page and aims 

480 # to provide API compatibility where possible. 

481 

482 def __init__(self, stream_parser, message): 

483 self._stream_parser = stream_parser 

484 self._message = message 

485 self._iter_rows = None 

486 self._num_items = self._message.row_count 

487 self._remaining = self._message.row_count 

488 

489 def _parse_rows(self): 

490 """Parse rows from the message only once.""" 

491 if self._iter_rows is not None: 

492 return 

493 

494 rows = self._stream_parser.to_rows(self._message) 

495 self._iter_rows = iter(rows) 

496 

497 @property 

498 def num_items(self): 

499 """int: Total items in the page.""" 

500 return self._num_items 

501 

502 @property 

503 def remaining(self): 

504 """int: Remaining items in the page.""" 

505 return self._remaining 

506 

507 def __iter__(self): 

508 """A ``ReadRowsPage`` is an iterator.""" 

509 return self 

510 

511 def next(self): 

512 """Get the next row in the page.""" 

513 self._parse_rows() 

514 if self._remaining > 0: 

515 self._remaining -= 1 

516 return next(self._iter_rows) 

517 

518 # Alias needed for Python 2/3 support. 

519 __next__ = next 

520 

521 def to_arrow(self): 

522 """Create an :class:`pyarrow.RecordBatch` of rows in the page. 

523 

524 Returns: 

525 pyarrow.RecordBatch: 

526 Rows from the message, as an Arrow record batch. 

527 """ 

528 return self._stream_parser.to_arrow(self._message) 

529 

530 def to_dataframe(self, dtypes=None): 

531 """Create a :class:`pandas.DataFrame` of rows in the page. 

532 

533 This method requires the pandas libary to create a data frame and the 

534 fastavro library to parse row messages. 

535 

536 .. warning:: 

537 DATETIME columns are not supported. They are currently parsed as 

538 strings in the fastavro library. 

539 

540 Args: 

541 dtypes ( \ 

542 Map[str, Union[str, pandas.Series.dtype]] \ 

543 ): 

544 Optional. A dictionary of column names pandas ``dtype``s. The 

545 provided ``dtype`` is used when constructing the series for 

546 the column specified. Otherwise, the default pandas behavior 

547 is used. 

548 

549 Returns: 

550 pandas.DataFrame: 

551 A data frame of all rows in the stream. 

552 """ 

553 if pandas is None: 

554 raise ImportError(_PANDAS_REQUIRED) 

555 

556 return self._stream_parser.to_dataframe(self._message, dtypes=dtypes) 

557 

558 

559class _StreamParser(object): 

560 def to_arrow(self, message): 

561 raise NotImplementedError("Not implemented.") 

562 

563 def to_dataframe(self, message, dtypes=None): 

564 raise NotImplementedError("Not implemented.") 

565 

566 def to_rows(self, message): 

567 raise NotImplementedError("Not implemented.") 

568 

569 def _parse_avro_schema(self): 

570 raise NotImplementedError("Not implemented.") 

571 

572 def _parse_arrow_schema(self): 

573 raise NotImplementedError("Not implemented.") 

574 

575 @staticmethod 

576 def from_read_session(read_session): 

577 schema_type = read_session._pb.WhichOneof("schema") 

578 if schema_type == "avro_schema": 

579 return _AvroStreamParser(read_session) 

580 elif schema_type == "arrow_schema": 

581 return _ArrowStreamParser(read_session) 

582 else: 

583 raise TypeError( 

584 "Unsupported schema type in read_session: {0}".format(schema_type) 

585 ) 

586 

587 @staticmethod 

588 def from_read_rows_response(message): 

589 schema_type = message._pb.WhichOneof("schema") 

590 if schema_type == "avro_schema": 

591 return _AvroStreamParser(message) 

592 elif schema_type == "arrow_schema": 

593 return _ArrowStreamParser(message) 

594 else: 

595 raise TypeError( 

596 "Unsupported schema type in message: {0}".format(schema_type) 

597 ) 

598 

599 

600class _AvroStreamParser(_StreamParser): 

601 """Helper to parse Avro messages into useful representations.""" 

602 

603 def __init__(self, message): 

604 """Construct an _AvroStreamParser. 

605 

606 Args: 

607 message (Union[ 

608 google.cloud.bigquery_storage_v1.types.ReadSession, \ 

609 google.cloud.bigquery_storage_v1.types.ReadRowsResponse, \ 

610 ]): 

611 Either the first message of data from a read rows stream or a 

612 read session. Both types contain a oneof "schema" field, which 

613 can be used to determine how to deserialize rows. 

614 """ 

615 if fastavro is None: 

616 raise ImportError(_FASTAVRO_REQUIRED) 

617 

618 self._first_message = message 

619 self._avro_schema_json = None 

620 self._fastavro_schema = None 

621 self._column_names = None 

622 

623 def to_arrow(self, message): 

624 """Create an :class:`pyarrow.RecordBatch` of rows in the page. 

625 

626 Args: 

627 message (google.cloud.bigquery_storage_v1.types.ReadRowsResponse): 

628 Protocol buffer from the read rows stream, to convert into an 

629 Arrow record batch. 

630 

631 Returns: 

632 pyarrow.RecordBatch: 

633 Rows from the message, as an Arrow record batch. 

634 """ 

635 raise NotImplementedError("to_arrow not implemented for Avro streams.") 

636 

637 def to_dataframe(self, message, dtypes=None): 

638 """Create a :class:`pandas.DataFrame` of rows in the page. 

639 

640 This method requires the pandas libary to create a data frame and the 

641 fastavro library to parse row messages. 

642 

643 .. warning:: 

644 DATETIME columns are not supported. They are currently parsed as 

645 strings in the fastavro library. 

646 

647 Args: 

648 message ( \ 

649 ~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \ 

650 ): 

651 A message containing Avro bytes to parse into a pandas DataFrame. 

652 dtypes ( \ 

653 Map[str, Union[str, pandas.Series.dtype]] \ 

654 ): 

655 Optional. A dictionary of column names pandas ``dtype``s. The 

656 provided ``dtype`` is used when constructing the series for 

657 the column specified. Otherwise, the default pandas behavior 

658 is used. 

659 

660 Returns: 

661 pandas.DataFrame: 

662 A data frame of all rows in the stream. 

663 """ 

664 self._parse_avro_schema() 

665 

666 if dtypes is None: 

667 dtypes = {} 

668 

669 columns = collections.defaultdict(list) 

670 for row in self.to_rows(message): 

671 for column in row: 

672 columns[column].append(row[column]) 

673 for column in dtypes: 

674 columns[column] = pandas.Series(columns[column], dtype=dtypes[column]) 

675 return pandas.DataFrame(columns, columns=self._column_names) 

676 

677 def _parse_avro_schema(self): 

678 """Extract and parse Avro schema from a read session.""" 

679 if self._avro_schema_json: 

680 return 

681 

682 self._avro_schema_json = json.loads(self._first_message.avro_schema.schema) 

683 self._column_names = tuple( 

684 (field["name"] for field in self._avro_schema_json["fields"]) 

685 ) 

686 self._first_message = None 

687 

688 def _parse_fastavro(self): 

689 """Convert parsed Avro schema to fastavro format.""" 

690 self._parse_avro_schema() 

691 self._fastavro_schema = fastavro.parse_schema(self._avro_schema_json) 

692 

693 def to_rows(self, message): 

694 """Parse all rows in a stream message. 

695 

696 Args: 

697 message ( \ 

698 ~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \ 

699 ): 

700 A message containing Avro bytes to parse into rows. 

701 

702 Returns: 

703 Iterable[Mapping]: 

704 A sequence of rows, represented as dictionaries. 

705 """ 

706 self._parse_fastavro() 

707 messageio = io.BytesIO(message.avro_rows.serialized_binary_rows) 

708 while True: 

709 # Loop in a while loop because schemaless_reader can only read 

710 # a single record. 

711 try: 

712 # TODO: Parse DATETIME into datetime.datetime (no timezone), 

713 # instead of as a string. 

714 yield fastavro.schemaless_reader(messageio, self._fastavro_schema) 

715 except (StopIteration, EOFError): 

716 break # Finished with message 

717 

718 

719class _ArrowStreamParser(_StreamParser): 

720 def __init__(self, message): 

721 """Construct an _ArrowStreamParser. 

722 

723 Args: 

724 message (Union[ 

725 google.cloud.bigquery_storage_v1.types.ReadSession, \ 

726 google.cloud.bigquery_storage_v1.types.ReadRowsResponse, \ 

727 ]): 

728 Either the first message of data from a read rows stream or a 

729 read session. Both types contain a oneof "schema" field, which 

730 can be used to determine how to deserialize rows. 

731 """ 

732 if pyarrow is None: 

733 raise ImportError(_PYARROW_REQUIRED) 

734 

735 self._first_message = message 

736 self._schema = None 

737 

738 def to_arrow(self, message): 

739 return self._parse_arrow_message(message) 

740 

741 def to_rows(self, message): 

742 record_batch = self._parse_arrow_message(message) 

743 

744 # Iterate through each column simultaneously, and make a dict from the 

745 # row values 

746 for row in zip(*record_batch.columns): 

747 yield dict(zip(self._column_names, row)) 

748 

749 def to_dataframe(self, message, dtypes=None): 

750 record_batch = self._parse_arrow_message(message) 

751 

752 if dtypes is None: 

753 dtypes = {} 

754 

755 df = record_batch.to_pandas() 

756 

757 for column in dtypes: 

758 df[column] = pandas.Series(df[column], dtype=dtypes[column]) 

759 

760 return df 

761 

762 def _parse_arrow_message(self, message): 

763 self._parse_arrow_schema() 

764 

765 return pyarrow.ipc.read_record_batch( 

766 pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch), 

767 self._schema, 

768 ) 

769 

770 def _parse_arrow_schema(self): 

771 if self._schema: 

772 return 

773 

774 self._schema = pyarrow.ipc.read_schema( 

775 pyarrow.py_buffer(self._first_message.arrow_schema.serialized_schema) 

776 ) 

777 self._column_names = [field.name for field in self._schema] 

778 self._first_message = None