1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16
17import collections
18import io
19import json
20import time
21
22try:
23 import fastavro
24except ImportError: # pragma: NO COVER
25 fastavro = None
26import google.api_core.exceptions
27import google.rpc.error_details_pb2
28
29try:
30 import pandas
31except ImportError: # pragma: NO COVER
32 pandas = None
33try:
34 import pyarrow
35except ImportError: # pragma: NO COVER
36 pyarrow = None
37
38try:
39 import pyarrow
40except ImportError: # pragma: NO COVER
41 pyarrow = None
42
43
44_STREAM_RESUMPTION_EXCEPTIONS = (
45 google.api_core.exceptions.ServiceUnavailable,
46 # Caused by transport-level error. No status code was received.
47 # https://github.com/googleapis/python-bigquery-storage/issues/262
48 google.api_core.exceptions.Unknown,
49)
50
51# The Google API endpoint can unexpectedly close long-running HTTP/2 streams.
52# Unfortunately, this condition is surfaced to the caller as an internal error
53# by gRPC. We don't want to resume on all internal errors, so instead we look
54# for error message that we know are caused by problems that are safe to
55# reconnect.
56_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
57 # See: https://github.com/googleapis/google-cloud-python/pull/9994
58 "RST_STREAM",
59)
60
61_FASTAVRO_REQUIRED = (
62 "fastavro is required to parse ReadRowResponse messages with Avro bytes."
63)
64_PANDAS_REQUIRED = "pandas is required to create a DataFrame"
65_PYARROW_REQUIRED = (
66 "pyarrow is required to parse ReadRowResponse messages with Arrow bytes."
67)
68
69
70class ReadRowsStream(object):
71 """A stream of results from a read rows request.
72
73 This stream is an iterable of
74 :class:`~google.cloud.bigquery_storage_v1.types.ReadRowsResponse`.
75 Iterate over it to fetch all row messages.
76
77 If the fastavro library is installed, use the
78 :func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.rows()`
79 method to parse all messages into a stream of row dictionaries.
80
81 If the pandas and fastavro libraries are installed, use the
82 :func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.to_dataframe()`
83 method to parse all messages into a :class:`pandas.DataFrame`.
84
85 This object should not be created directly, but is returned by
86 other methods in this library.
87 """
88
89 def __init__(
90 self, client, name, offset, read_rows_kwargs, retry_delay_callback=None
91 ):
92 """Construct a ReadRowsStream.
93
94 Args:
95 client ( \
96 ~google.cloud.bigquery_storage_v1.services. \
97 big_query_read.BigQueryReadClient \
98 ):
99 A GAPIC client used to reconnect to a ReadRows stream. This
100 must be the GAPIC client to avoid a circular dependency on
101 this class.
102 name (str):
103 Required. Stream ID from which rows are being read.
104 offset (int):
105 Required. Position in the stream to start
106 reading from. The offset requested must be less than the last
107 row read from ReadRows. Requesting a larger offset is
108 undefined.
109 read_rows_kwargs (dict):
110 Keyword arguments to use when reconnecting to a ReadRows
111 stream.
112 retry_delay_callback (Optional[Callable[[float], None]]):
113 If the client receives a retryable error that asks the client to
114 delay its next attempt and retry_delay_callback is not None,
115 ReadRowsStream will call retry_delay_callback with the delay
116 duration (in seconds) before it starts sleeping until the next
117 attempt.
118
119 Returns:
120 Iterable[ \
121 ~google.cloud.bigquery_storage.types.ReadRowsResponse \
122 ]:
123 A sequence of row messages.
124 """
125
126 # Make a copy of the read position so that we can update it without
127 # mutating the original input.
128 self._client = client
129 self._name = name
130 self._offset = offset
131 self._read_rows_kwargs = read_rows_kwargs
132 self._retry_delay_callback = retry_delay_callback
133 self._wrapped = None
134
135 def __iter__(self):
136 """An iterable of messages.
137
138 Returns:
139 Iterable[ \
140 ~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \
141 ]:
142 A sequence of row messages.
143 """
144 # Infinite loop to reconnect on reconnectable errors while processing
145 # the row stream.
146
147 if self._wrapped is None:
148 self._reconnect()
149
150 while True:
151 try:
152 for message in self._wrapped:
153 rowcount = message.row_count
154 self._offset += rowcount
155 yield message
156
157 return # Made it through the whole stream.
158 except google.api_core.exceptions.InternalServerError as exc:
159 resumable_error = any(
160 resumable_message in exc.message
161 for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES
162 )
163 if not resumable_error:
164 raise
165 except _STREAM_RESUMPTION_EXCEPTIONS:
166 # Transient error, so reconnect to the stream.
167 pass
168 except Exception as exc:
169 if not self._resource_exhausted_exception_is_retryable(exc):
170 raise
171
172 self._reconnect()
173
174 def _reconnect(self):
175 """Reconnect to the ReadRows stream using the most recent offset."""
176 while True:
177 try:
178 self._wrapped = self._client.read_rows(
179 read_stream=self._name,
180 offset=self._offset,
181 **self._read_rows_kwargs
182 )
183 break
184 except Exception as exc:
185 if not self._resource_exhausted_exception_is_retryable(exc):
186 raise
187
188 def _resource_exhausted_exception_is_retryable(self, exc):
189 if isinstance(exc, google.api_core.exceptions.ResourceExhausted):
190 # ResourceExhausted errors are only retried if a valid
191 # RetryInfo is provided with the error.
192 #
193 # TODO: Remove hasattr logic when we require google-api-core >= 2.2.0.
194 # ResourceExhausted added details/_details in google-api-core 2.2.0.
195 details = None
196 if hasattr(exc, "details"):
197 details = exc.details
198 elif hasattr(exc, "_details"):
199 details = exc._details
200 if details is not None:
201 for detail in details:
202 if isinstance(detail, google.rpc.error_details_pb2.RetryInfo):
203 retry_delay = detail.retry_delay
204 if retry_delay is not None:
205 delay = max(
206 0,
207 float(retry_delay.seconds)
208 + (float(retry_delay.nanos) / 1e9),
209 )
210 if self._retry_delay_callback:
211 self._retry_delay_callback(delay)
212 time.sleep(delay)
213 return True
214 return False
215
216 def rows(self, read_session=None):
217 """Iterate over all rows in the stream.
218
219 This method requires the fastavro library in order to parse row
220 messages in avro format. For arrow format messages, the pyarrow
221 library is required.
222
223 .. warning::
224 DATETIME columns are not supported. They are currently parsed as
225 strings in the fastavro library.
226
227 Args:
228 read_session ( \
229 Optional[~google.cloud.bigquery_storage_v1.types.ReadSession] \
230 ):
231 This argument was used to specify the schema of the rows in the
232 stream, but now the first message in a read stream contains
233 this information. When row_restriction is applied, some streams
234 may be empty without read_session info. Provide this argument
235 to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733
236
237 Returns:
238 Iterable[Mapping]:
239 A sequence of rows, represented as dictionaries.
240 """
241 return ReadRowsIterable(self, read_session=read_session)
242
243 def to_arrow(self, read_session=None):
244 """Create a :class:`pyarrow.Table` of all rows in the stream.
245
246 This method requires the pyarrow library and a stream using the Arrow
247 format.
248
249 Args:
250 read_session ( \
251 ~google.cloud.bigquery_storage_v1.types.ReadSession \
252 ):
253 This argument was used to specify the schema of the rows in the
254 stream, but now the first message in a read stream contains
255 this information. When row_restriction is applied, some streams
256 may be empty without read_session info. Provide this argument
257 to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733
258
259 Returns:
260 pyarrow.Table:
261 A table of all rows in the stream.
262 """
263 return self.rows(read_session=read_session).to_arrow()
264
265 def to_dataframe(self, read_session=None, dtypes=None):
266 """Create a :class:`pandas.DataFrame` of all rows in the stream.
267
268 This method requires the pandas libary to create a data frame and the
269 fastavro library to parse row messages.
270
271 .. warning::
272 DATETIME columns are not supported. They are currently parsed as
273 strings.
274
275 Args:
276 read_session ( \
277 ~google.cloud.bigquery_storage_v1.types.ReadSession \
278 ):
279 This argument was used to specify the schema of the rows in the
280 stream, but now the first message in a read stream contains
281 this information. When row_restriction is applied, some streams
282 may be empty without read_session info. Provide this argument
283 to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733
284 dtypes ( \
285 Map[str, Union[str, pandas.Series.dtype]] \
286 ):
287 Optional. A dictionary of column names pandas ``dtype``s. The
288 provided ``dtype`` is used when constructing the series for
289 the column specified. Otherwise, the default pandas behavior
290 is used.
291
292 Returns:
293 pandas.DataFrame:
294 A data frame of all rows in the stream.
295 """
296 if pandas is None:
297 raise ImportError(_PANDAS_REQUIRED)
298
299 return self.rows(read_session=read_session).to_dataframe(dtypes=dtypes)
300
301
302class ReadRowsIterable(object):
303 """An iterable of rows from a read session.
304
305 Args:
306 reader (google.cloud.bigquery_storage_v1.reader.ReadRowsStream):
307 A read rows stream.
308 read_session ( \
309 Optional[~google.cloud.bigquery_storage_v1.types.ReadSession] \
310 ):
311 This argument was used to specify the schema of the rows in the
312 stream, but now the first message in a read stream contains
313 this information. When row_restriction is applied, some streams
314 may be empty without read_session info. Provide this argument
315 to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733ß
316 """
317
318 # This class is modelled after the google.cloud.bigquery.table.RowIterator
319 # and aims to be API compatible where possible.
320
321 def __init__(self, reader, read_session=None):
322 self._reader = reader
323 if read_session is not None:
324 self._stream_parser = _StreamParser.from_read_session(read_session)
325 else:
326 self._stream_parser = None
327
328 @property
329 def pages(self):
330 """A generator of all pages in the stream.
331
332 Returns:
333 types.GeneratorType[google.cloud.bigquery_storage_v1.ReadRowsPage]:
334 A generator of pages.
335 """
336 # Each page is an iterator of rows. But also has num_items, remaining,
337 # and to_dataframe.
338 for message in self._reader:
339 # Only the first message contains the schema, which is needed to
340 # decode the messages.
341 if not self._stream_parser:
342 self._stream_parser = _StreamParser.from_read_rows_response(message)
343 yield ReadRowsPage(self._stream_parser, message)
344
345 def __iter__(self):
346 """Iterator for each row in all pages."""
347 for page in self.pages:
348 for row in page:
349 yield row
350
351 def to_arrow(self):
352 """Create a :class:`pyarrow.Table` of all rows in the stream.
353
354 This method requires the pyarrow library and a stream using the Arrow
355 format.
356
357 Returns:
358 pyarrow.Table:
359 A table of all rows in the stream.
360 """
361 record_batches = []
362 for page in self.pages:
363 record_batches.append(page.to_arrow())
364
365 if record_batches:
366 return pyarrow.Table.from_batches(record_batches)
367
368 # No data, return an empty Table.
369 self._stream_parser._parse_arrow_schema()
370 return pyarrow.Table.from_batches([], schema=self._stream_parser._schema)
371
372 def to_dataframe(self, dtypes=None):
373 """Create a :class:`pandas.DataFrame` of all rows in the stream.
374
375 This method requires the pandas libary to create a data frame and the
376 fastavro library to parse row messages.
377
378 .. warning::
379 DATETIME columns are not supported. They are currently parsed as
380 strings in the fastavro library.
381
382 Args:
383 dtypes ( \
384 Map[str, Union[str, pandas.Series.dtype]] \
385 ):
386 Optional. A dictionary of column names pandas ``dtype``s. The
387 provided ``dtype`` is used when constructing the series for
388 the column specified. Otherwise, the default pandas behavior
389 is used.
390
391 Returns:
392 pandas.DataFrame:
393 A data frame of all rows in the stream.
394 """
395 if pandas is None:
396 raise ImportError(_PANDAS_REQUIRED)
397
398 if dtypes is None:
399 dtypes = {}
400
401 # If it's an Arrow stream, calling to_arrow, then converting to a
402 # pandas dataframe is about 2x faster. This is because pandas.concat is
403 # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
404 # usually no-copy.
405 try:
406 record_batch = self.to_arrow()
407 except NotImplementedError:
408 pass
409 else:
410 df = record_batch.to_pandas()
411 for column in dtypes:
412 df[column] = pandas.Series(df[column], dtype=dtypes[column])
413 return df
414
415 frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages]
416
417 if frames:
418 return pandas.concat(frames)
419
420 # No data, construct an empty dataframe with columns matching the schema.
421 # The result should be consistent with what an empty ARROW stream would produce.
422 self._stream_parser._parse_avro_schema()
423 schema = self._stream_parser._avro_schema_json
424
425 column_dtypes = self._dtypes_from_avro(schema["fields"])
426 column_dtypes.update(dtypes)
427
428 df = pandas.DataFrame(columns=column_dtypes.keys())
429 for column in df:
430 df[column] = pandas.Series([], dtype=column_dtypes[column])
431
432 return df
433
434 def _dtypes_from_avro(self, avro_fields):
435 """Determine Pandas dtypes for columns in Avro schema.
436
437 Args:
438 avro_fields (Iterable[Mapping[str, Any]]):
439 Avro fields' metadata.
440
441 Returns:
442 colelctions.OrderedDict[str, str]:
443 Column names with their corresponding Pandas dtypes.
444 """
445 result = collections.OrderedDict()
446
447 type_map = {"long": "int64", "double": "float64", "boolean": "bool"}
448
449 for field_info in avro_fields:
450 # If a type is an union of multiple types, pick the first type
451 # that is not "null".
452 if isinstance(field_info["type"], list):
453 type_info = next(item for item in field_info["type"] if item != "null")
454
455 if isinstance(type_info, str):
456 field_dtype = type_map.get(type_info, "object")
457 else:
458 logical_type = type_info.get("logicalType")
459 if logical_type == "timestamp-micros":
460 field_dtype = "datetime64[ns, UTC]"
461 else:
462 field_dtype = "object"
463
464 result[field_info["name"]] = field_dtype
465
466 return result
467
468
469class ReadRowsPage(object):
470 """An iterator of rows from a read session message.
471
472 Args:
473 stream_parser (google.cloud.bigquery_storage_v1.reader._StreamParser):
474 A helper for parsing messages into rows.
475 message (google.cloud.bigquery_storage_v1.types.ReadRowsResponse):
476 A message of data from a read rows stream.
477 """
478
479 # This class is modeled after google.api_core.page_iterator.Page and aims
480 # to provide API compatibility where possible.
481
482 def __init__(self, stream_parser, message):
483 self._stream_parser = stream_parser
484 self._message = message
485 self._iter_rows = None
486 self._num_items = self._message.row_count
487 self._remaining = self._message.row_count
488
489 def _parse_rows(self):
490 """Parse rows from the message only once."""
491 if self._iter_rows is not None:
492 return
493
494 rows = self._stream_parser.to_rows(self._message)
495 self._iter_rows = iter(rows)
496
497 @property
498 def num_items(self):
499 """int: Total items in the page."""
500 return self._num_items
501
502 @property
503 def remaining(self):
504 """int: Remaining items in the page."""
505 return self._remaining
506
507 def __iter__(self):
508 """A ``ReadRowsPage`` is an iterator."""
509 return self
510
511 def next(self):
512 """Get the next row in the page."""
513 self._parse_rows()
514 if self._remaining > 0:
515 self._remaining -= 1
516 return next(self._iter_rows)
517
518 # Alias needed for Python 2/3 support.
519 __next__ = next
520
521 def to_arrow(self):
522 """Create an :class:`pyarrow.RecordBatch` of rows in the page.
523
524 Returns:
525 pyarrow.RecordBatch:
526 Rows from the message, as an Arrow record batch.
527 """
528 return self._stream_parser.to_arrow(self._message)
529
530 def to_dataframe(self, dtypes=None):
531 """Create a :class:`pandas.DataFrame` of rows in the page.
532
533 This method requires the pandas libary to create a data frame and the
534 fastavro library to parse row messages.
535
536 .. warning::
537 DATETIME columns are not supported. They are currently parsed as
538 strings in the fastavro library.
539
540 Args:
541 dtypes ( \
542 Map[str, Union[str, pandas.Series.dtype]] \
543 ):
544 Optional. A dictionary of column names pandas ``dtype``s. The
545 provided ``dtype`` is used when constructing the series for
546 the column specified. Otherwise, the default pandas behavior
547 is used.
548
549 Returns:
550 pandas.DataFrame:
551 A data frame of all rows in the stream.
552 """
553 if pandas is None:
554 raise ImportError(_PANDAS_REQUIRED)
555
556 return self._stream_parser.to_dataframe(self._message, dtypes=dtypes)
557
558
559class _StreamParser(object):
560 def to_arrow(self, message):
561 raise NotImplementedError("Not implemented.")
562
563 def to_dataframe(self, message, dtypes=None):
564 raise NotImplementedError("Not implemented.")
565
566 def to_rows(self, message):
567 raise NotImplementedError("Not implemented.")
568
569 def _parse_avro_schema(self):
570 raise NotImplementedError("Not implemented.")
571
572 def _parse_arrow_schema(self):
573 raise NotImplementedError("Not implemented.")
574
575 @staticmethod
576 def from_read_session(read_session):
577 schema_type = read_session._pb.WhichOneof("schema")
578 if schema_type == "avro_schema":
579 return _AvroStreamParser(read_session)
580 elif schema_type == "arrow_schema":
581 return _ArrowStreamParser(read_session)
582 else:
583 raise TypeError(
584 "Unsupported schema type in read_session: {0}".format(schema_type)
585 )
586
587 @staticmethod
588 def from_read_rows_response(message):
589 schema_type = message._pb.WhichOneof("schema")
590 if schema_type == "avro_schema":
591 return _AvroStreamParser(message)
592 elif schema_type == "arrow_schema":
593 return _ArrowStreamParser(message)
594 else:
595 raise TypeError(
596 "Unsupported schema type in message: {0}".format(schema_type)
597 )
598
599
600class _AvroStreamParser(_StreamParser):
601 """Helper to parse Avro messages into useful representations."""
602
603 def __init__(self, message):
604 """Construct an _AvroStreamParser.
605
606 Args:
607 message (Union[
608 google.cloud.bigquery_storage_v1.types.ReadSession, \
609 google.cloud.bigquery_storage_v1.types.ReadRowsResponse, \
610 ]):
611 Either the first message of data from a read rows stream or a
612 read session. Both types contain a oneof "schema" field, which
613 can be used to determine how to deserialize rows.
614 """
615 if fastavro is None:
616 raise ImportError(_FASTAVRO_REQUIRED)
617
618 self._first_message = message
619 self._avro_schema_json = None
620 self._fastavro_schema = None
621 self._column_names = None
622
623 def to_arrow(self, message):
624 """Create an :class:`pyarrow.RecordBatch` of rows in the page.
625
626 Args:
627 message (google.cloud.bigquery_storage_v1.types.ReadRowsResponse):
628 Protocol buffer from the read rows stream, to convert into an
629 Arrow record batch.
630
631 Returns:
632 pyarrow.RecordBatch:
633 Rows from the message, as an Arrow record batch.
634 """
635 raise NotImplementedError("to_arrow not implemented for Avro streams.")
636
637 def to_dataframe(self, message, dtypes=None):
638 """Create a :class:`pandas.DataFrame` of rows in the page.
639
640 This method requires the pandas libary to create a data frame and the
641 fastavro library to parse row messages.
642
643 .. warning::
644 DATETIME columns are not supported. They are currently parsed as
645 strings in the fastavro library.
646
647 Args:
648 message ( \
649 ~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \
650 ):
651 A message containing Avro bytes to parse into a pandas DataFrame.
652 dtypes ( \
653 Map[str, Union[str, pandas.Series.dtype]] \
654 ):
655 Optional. A dictionary of column names pandas ``dtype``s. The
656 provided ``dtype`` is used when constructing the series for
657 the column specified. Otherwise, the default pandas behavior
658 is used.
659
660 Returns:
661 pandas.DataFrame:
662 A data frame of all rows in the stream.
663 """
664 self._parse_avro_schema()
665
666 if dtypes is None:
667 dtypes = {}
668
669 columns = collections.defaultdict(list)
670 for row in self.to_rows(message):
671 for column in row:
672 columns[column].append(row[column])
673 for column in dtypes:
674 columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
675 return pandas.DataFrame(columns, columns=self._column_names)
676
677 def _parse_avro_schema(self):
678 """Extract and parse Avro schema from a read session."""
679 if self._avro_schema_json:
680 return
681
682 self._avro_schema_json = json.loads(self._first_message.avro_schema.schema)
683 self._column_names = tuple(
684 (field["name"] for field in self._avro_schema_json["fields"])
685 )
686 self._first_message = None
687
688 def _parse_fastavro(self):
689 """Convert parsed Avro schema to fastavro format."""
690 self._parse_avro_schema()
691 self._fastavro_schema = fastavro.parse_schema(self._avro_schema_json)
692
693 def to_rows(self, message):
694 """Parse all rows in a stream message.
695
696 Args:
697 message ( \
698 ~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \
699 ):
700 A message containing Avro bytes to parse into rows.
701
702 Returns:
703 Iterable[Mapping]:
704 A sequence of rows, represented as dictionaries.
705 """
706 self._parse_fastavro()
707 messageio = io.BytesIO(message.avro_rows.serialized_binary_rows)
708 while True:
709 # Loop in a while loop because schemaless_reader can only read
710 # a single record.
711 try:
712 # TODO: Parse DATETIME into datetime.datetime (no timezone),
713 # instead of as a string.
714 yield fastavro.schemaless_reader(messageio, self._fastavro_schema)
715 except (StopIteration, EOFError):
716 break # Finished with message
717
718
719class _ArrowStreamParser(_StreamParser):
720 def __init__(self, message):
721 """Construct an _ArrowStreamParser.
722
723 Args:
724 message (Union[
725 google.cloud.bigquery_storage_v1.types.ReadSession, \
726 google.cloud.bigquery_storage_v1.types.ReadRowsResponse, \
727 ]):
728 Either the first message of data from a read rows stream or a
729 read session. Both types contain a oneof "schema" field, which
730 can be used to determine how to deserialize rows.
731 """
732 if pyarrow is None:
733 raise ImportError(_PYARROW_REQUIRED)
734
735 self._first_message = message
736 self._schema = None
737
738 def to_arrow(self, message):
739 return self._parse_arrow_message(message)
740
741 def to_rows(self, message):
742 record_batch = self._parse_arrow_message(message)
743
744 # Iterate through each column simultaneously, and make a dict from the
745 # row values
746 for row in zip(*record_batch.columns):
747 yield dict(zip(self._column_names, row))
748
749 def to_dataframe(self, message, dtypes=None):
750 record_batch = self._parse_arrow_message(message)
751
752 if dtypes is None:
753 dtypes = {}
754
755 df = record_batch.to_pandas()
756
757 for column in dtypes:
758 df[column] = pandas.Series(df[column], dtype=dtypes[column])
759
760 return df
761
762 def _parse_arrow_message(self, message):
763 self._parse_arrow_schema()
764
765 return pyarrow.ipc.read_record_batch(
766 pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch),
767 self._schema,
768 )
769
770 def _parse_arrow_schema(self):
771 if self._schema:
772 return
773
774 self._schema = pyarrow.ipc.read_schema(
775 pyarrow.py_buffer(self._first_message.arrow_schema.serialized_schema)
776 )
777 self._column_names = [field.name for field in self._schema]
778 self._first_message = None