Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pyarrow/ipc.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
18# Arrow file and stream reader/writer classes, and other messaging tools
20import os
22import pyarrow as pa
24from pyarrow.lib import (IpcReadOptions, IpcWriteOptions, ReadStats, WriteStats, # noqa
25 Message, MessageReader,
26 RecordBatchReader, _ReadPandasMixin,
27 MetadataVersion,
28 read_message, read_record_batch, read_schema,
29 read_tensor, write_tensor,
30 get_record_batch_size, get_tensor_size)
31import pyarrow.lib as lib
34class RecordBatchStreamReader(lib._RecordBatchStreamReader):
35 """
36 Reader for the Arrow streaming binary format.
38 Parameters
39 ----------
40 source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object
41 Either an in-memory buffer, or a readable file object.
42 If you want to use memory map use MemoryMappedFile as source.
43 options : pyarrow.ipc.IpcReadOptions
44 Options for IPC deserialization.
45 If None, default values will be used.
46 memory_pool : MemoryPool, default None
47 If None, default memory pool is used.
48 """
50 def __init__(self, source, *, options=None, memory_pool=None):
51 options = _ensure_default_ipc_read_options(options)
52 self._open(source, options=options, memory_pool=memory_pool)
55_ipc_writer_class_doc = """\
56Parameters
57----------
58sink : str, pyarrow.NativeFile, or file-like Python object
59 Either a file path, or a writable file object.
60schema : pyarrow.Schema
61 The Arrow schema for data to be written to the file.
62use_legacy_format : bool, default None
63 Deprecated in favor of setting options. Cannot be provided with
64 options.
66 If None, False will be used unless this default is overridden by
67 setting the environment variable ARROW_PRE_0_15_IPC_FORMAT=1
68options : pyarrow.ipc.IpcWriteOptions
69 Options for IPC serialization.
71 If None, default values will be used: the legacy format will not
72 be used unless overridden by setting the environment variable
73 ARROW_PRE_0_15_IPC_FORMAT=1, and the V5 metadata version will be
74 used unless overridden by setting the environment variable
75 ARROW_PRE_1_0_METADATA_VERSION=1."""
78class RecordBatchStreamWriter(lib._RecordBatchStreamWriter):
79 __doc__ = """Writer for the Arrow streaming binary format
81{}""".format(_ipc_writer_class_doc)
83 def __init__(self, sink, schema, *, use_legacy_format=None, options=None):
84 options = _get_legacy_format_default(use_legacy_format, options)
85 self._open(sink, schema, options=options)
88class RecordBatchFileReader(lib._RecordBatchFileReader):
89 """
90 Class for reading Arrow record batch data from the Arrow binary file format
92 Parameters
93 ----------
94 source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object
95 Either an in-memory buffer, or a readable file object.
96 If you want to use memory map use MemoryMappedFile as source.
97 footer_offset : int, default None
98 If the file is embedded in some larger file, this is the byte offset to
99 the very end of the file data
100 options : pyarrow.ipc.IpcReadOptions
101 Options for IPC serialization.
102 If None, default values will be used.
103 memory_pool : MemoryPool, default None
104 If None, default memory pool is used.
105 """
107 def __init__(self, source, footer_offset=None, *, options=None,
108 memory_pool=None):
109 options = _ensure_default_ipc_read_options(options)
110 self._open(source, footer_offset=footer_offset,
111 options=options, memory_pool=memory_pool)
114class RecordBatchFileWriter(lib._RecordBatchFileWriter):
116 __doc__ = """Writer to create the Arrow binary file format
118{}""".format(_ipc_writer_class_doc)
120 def __init__(self, sink, schema, *, use_legacy_format=None, options=None):
121 options = _get_legacy_format_default(use_legacy_format, options)
122 self._open(sink, schema, options=options)
125def _get_legacy_format_default(use_legacy_format, options):
126 if use_legacy_format is not None and options is not None:
127 raise ValueError(
128 "Can provide at most one of options and use_legacy_format")
129 elif options:
130 if not isinstance(options, IpcWriteOptions):
131 raise TypeError("expected IpcWriteOptions, got {}"
132 .format(type(options)))
133 return options
135 metadata_version = MetadataVersion.V5
136 if use_legacy_format is None:
137 use_legacy_format = \
138 bool(int(os.environ.get('ARROW_PRE_0_15_IPC_FORMAT', '0')))
139 if bool(int(os.environ.get('ARROW_PRE_1_0_METADATA_VERSION', '0'))):
140 metadata_version = MetadataVersion.V4
141 return IpcWriteOptions(use_legacy_format=use_legacy_format,
142 metadata_version=metadata_version)
145def _ensure_default_ipc_read_options(options):
146 if options and not isinstance(options, IpcReadOptions):
147 raise TypeError(
148 "expected IpcReadOptions, got {}".format(type(options))
149 )
150 return options or IpcReadOptions()
153def new_stream(sink, schema, *, use_legacy_format=None, options=None):
154 return RecordBatchStreamWriter(sink, schema,
155 use_legacy_format=use_legacy_format,
156 options=options)
159new_stream.__doc__ = """\
160Create an Arrow columnar IPC stream writer instance
162{}
164Returns
165-------
166writer : RecordBatchStreamWriter
167 A writer for the given sink
168""".format(_ipc_writer_class_doc)
171def open_stream(source, *, options=None, memory_pool=None):
172 """
173 Create reader for Arrow streaming format.
175 Parameters
176 ----------
177 source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object
178 Either an in-memory buffer, or a readable file object.
179 options : pyarrow.ipc.IpcReadOptions
180 Options for IPC serialization.
181 If None, default values will be used.
182 memory_pool : MemoryPool, default None
183 If None, default memory pool is used.
185 Returns
186 -------
187 reader : RecordBatchStreamReader
188 A reader for the given source
189 """
190 return RecordBatchStreamReader(source, options=options,
191 memory_pool=memory_pool)
194def new_file(sink, schema, *, use_legacy_format=None, options=None):
195 return RecordBatchFileWriter(sink, schema,
196 use_legacy_format=use_legacy_format,
197 options=options)
200new_file.__doc__ = """\
201Create an Arrow columnar IPC file writer instance
203{}
205Returns
206-------
207writer : RecordBatchFileWriter
208 A writer for the given sink
209""".format(_ipc_writer_class_doc)
212def open_file(source, footer_offset=None, *, options=None, memory_pool=None):
213 """
214 Create reader for Arrow file format.
216 Parameters
217 ----------
218 source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object
219 Either an in-memory buffer, or a readable file object.
220 footer_offset : int, default None
221 If the file is embedded in some larger file, this is the byte offset to
222 the very end of the file data.
223 options : pyarrow.ipc.IpcReadOptions
224 Options for IPC serialization.
225 If None, default values will be used.
226 memory_pool : MemoryPool, default None
227 If None, default memory pool is used.
229 Returns
230 -------
231 reader : RecordBatchFileReader
232 A reader for the given source
233 """
234 return RecordBatchFileReader(
235 source, footer_offset=footer_offset,
236 options=options, memory_pool=memory_pool)
239def serialize_pandas(df, *, nthreads=None, preserve_index=None):
240 """
241 Serialize a pandas DataFrame into a buffer protocol compatible object.
243 Parameters
244 ----------
245 df : pandas.DataFrame
246 nthreads : int, default None
247 Number of threads to use for conversion to Arrow, default all CPUs.
248 preserve_index : bool, default None
249 The default of None will store the index as a column, except for
250 RangeIndex which is stored as metadata only. If True, always
251 preserve the pandas index data as a column. If False, no index
252 information is saved and the result will have a default RangeIndex.
254 Returns
255 -------
256 buf : buffer
257 An object compatible with the buffer protocol.
258 """
259 batch = pa.RecordBatch.from_pandas(df, nthreads=nthreads,
260 preserve_index=preserve_index)
261 sink = pa.BufferOutputStream()
262 with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
263 writer.write_batch(batch)
264 return sink.getvalue()
267def deserialize_pandas(buf, *, use_threads=True):
268 """Deserialize a buffer protocol compatible object into a pandas DataFrame.
270 Parameters
271 ----------
272 buf : buffer
273 An object compatible with the buffer protocol.
274 use_threads : bool, default True
275 Whether to parallelize the conversion using multiple threads.
277 Returns
278 -------
279 df : pandas.DataFrame
280 The buffer deserialized as pandas DataFrame
281 """
282 buffer_reader = pa.BufferReader(buf)
283 with pa.RecordBatchStreamReader(buffer_reader) as reader:
284 table = reader.read_all()
285 return table.to_pandas(use_threads=use_threads)