Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pyarrow/ipc.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18# Arrow file and stream reader/writer classes, and other messaging tools 

19 

20import os 

21 

22import pyarrow as pa 

23 

24from pyarrow.lib import (IpcReadOptions, IpcWriteOptions, ReadStats, WriteStats, # noqa 

25 Message, MessageReader, 

26 RecordBatchReader, _ReadPandasMixin, 

27 MetadataVersion, 

28 read_message, read_record_batch, read_schema, 

29 read_tensor, write_tensor, 

30 get_record_batch_size, get_tensor_size) 

31import pyarrow.lib as lib 

32 

33 

34class RecordBatchStreamReader(lib._RecordBatchStreamReader): 

35 """ 

36 Reader for the Arrow streaming binary format. 

37 

38 Parameters 

39 ---------- 

40 source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object 

41 Either an in-memory buffer, or a readable file object. 

42 If you want to use memory map use MemoryMappedFile as source. 

43 options : pyarrow.ipc.IpcReadOptions 

44 Options for IPC deserialization. 

45 If None, default values will be used. 

46 memory_pool : MemoryPool, default None 

47 If None, default memory pool is used. 

48 """ 

49 

50 def __init__(self, source, *, options=None, memory_pool=None): 

51 options = _ensure_default_ipc_read_options(options) 

52 self._open(source, options=options, memory_pool=memory_pool) 

53 

54 

55_ipc_writer_class_doc = """\ 

56Parameters 

57---------- 

58sink : str, pyarrow.NativeFile, or file-like Python object 

59 Either a file path, or a writable file object. 

60schema : pyarrow.Schema 

61 The Arrow schema for data to be written to the file. 

62use_legacy_format : bool, default None 

63 Deprecated in favor of setting options. Cannot be provided with 

64 options. 

65 

66 If None, False will be used unless this default is overridden by 

67 setting the environment variable ARROW_PRE_0_15_IPC_FORMAT=1 

68options : pyarrow.ipc.IpcWriteOptions 

69 Options for IPC serialization. 

70 

71 If None, default values will be used: the legacy format will not 

72 be used unless overridden by setting the environment variable 

73 ARROW_PRE_0_15_IPC_FORMAT=1, and the V5 metadata version will be 

74 used unless overridden by setting the environment variable 

75 ARROW_PRE_1_0_METADATA_VERSION=1.""" 

76 

77 

78class RecordBatchStreamWriter(lib._RecordBatchStreamWriter): 

79 __doc__ = """Writer for the Arrow streaming binary format 

80 

81{}""".format(_ipc_writer_class_doc) 

82 

83 def __init__(self, sink, schema, *, use_legacy_format=None, options=None): 

84 options = _get_legacy_format_default(use_legacy_format, options) 

85 self._open(sink, schema, options=options) 

86 

87 

88class RecordBatchFileReader(lib._RecordBatchFileReader): 

89 """ 

90 Class for reading Arrow record batch data from the Arrow binary file format 

91 

92 Parameters 

93 ---------- 

94 source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object 

95 Either an in-memory buffer, or a readable file object. 

96 If you want to use memory map use MemoryMappedFile as source. 

97 footer_offset : int, default None 

98 If the file is embedded in some larger file, this is the byte offset to 

99 the very end of the file data 

100 options : pyarrow.ipc.IpcReadOptions 

101 Options for IPC serialization. 

102 If None, default values will be used. 

103 memory_pool : MemoryPool, default None 

104 If None, default memory pool is used. 

105 """ 

106 

107 def __init__(self, source, footer_offset=None, *, options=None, 

108 memory_pool=None): 

109 options = _ensure_default_ipc_read_options(options) 

110 self._open(source, footer_offset=footer_offset, 

111 options=options, memory_pool=memory_pool) 

112 

113 

114class RecordBatchFileWriter(lib._RecordBatchFileWriter): 

115 

116 __doc__ = """Writer to create the Arrow binary file format 

117 

118{}""".format(_ipc_writer_class_doc) 

119 

120 def __init__(self, sink, schema, *, use_legacy_format=None, options=None): 

121 options = _get_legacy_format_default(use_legacy_format, options) 

122 self._open(sink, schema, options=options) 

123 

124 

125def _get_legacy_format_default(use_legacy_format, options): 

126 if use_legacy_format is not None and options is not None: 

127 raise ValueError( 

128 "Can provide at most one of options and use_legacy_format") 

129 elif options: 

130 if not isinstance(options, IpcWriteOptions): 

131 raise TypeError("expected IpcWriteOptions, got {}" 

132 .format(type(options))) 

133 return options 

134 

135 metadata_version = MetadataVersion.V5 

136 if use_legacy_format is None: 

137 use_legacy_format = \ 

138 bool(int(os.environ.get('ARROW_PRE_0_15_IPC_FORMAT', '0'))) 

139 if bool(int(os.environ.get('ARROW_PRE_1_0_METADATA_VERSION', '0'))): 

140 metadata_version = MetadataVersion.V4 

141 return IpcWriteOptions(use_legacy_format=use_legacy_format, 

142 metadata_version=metadata_version) 

143 

144 

145def _ensure_default_ipc_read_options(options): 

146 if options and not isinstance(options, IpcReadOptions): 

147 raise TypeError( 

148 "expected IpcReadOptions, got {}".format(type(options)) 

149 ) 

150 return options or IpcReadOptions() 

151 

152 

153def new_stream(sink, schema, *, use_legacy_format=None, options=None): 

154 return RecordBatchStreamWriter(sink, schema, 

155 use_legacy_format=use_legacy_format, 

156 options=options) 

157 

158 

159new_stream.__doc__ = """\ 

160Create an Arrow columnar IPC stream writer instance 

161 

162{} 

163 

164Returns 

165------- 

166writer : RecordBatchStreamWriter 

167 A writer for the given sink 

168""".format(_ipc_writer_class_doc) 

169 

170 

171def open_stream(source, *, options=None, memory_pool=None): 

172 """ 

173 Create reader for Arrow streaming format. 

174 

175 Parameters 

176 ---------- 

177 source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object 

178 Either an in-memory buffer, or a readable file object. 

179 options : pyarrow.ipc.IpcReadOptions 

180 Options for IPC serialization. 

181 If None, default values will be used. 

182 memory_pool : MemoryPool, default None 

183 If None, default memory pool is used. 

184 

185 Returns 

186 ------- 

187 reader : RecordBatchStreamReader 

188 A reader for the given source 

189 """ 

190 return RecordBatchStreamReader(source, options=options, 

191 memory_pool=memory_pool) 

192 

193 

194def new_file(sink, schema, *, use_legacy_format=None, options=None): 

195 return RecordBatchFileWriter(sink, schema, 

196 use_legacy_format=use_legacy_format, 

197 options=options) 

198 

199 

200new_file.__doc__ = """\ 

201Create an Arrow columnar IPC file writer instance 

202 

203{} 

204 

205Returns 

206------- 

207writer : RecordBatchFileWriter 

208 A writer for the given sink 

209""".format(_ipc_writer_class_doc) 

210 

211 

212def open_file(source, footer_offset=None, *, options=None, memory_pool=None): 

213 """ 

214 Create reader for Arrow file format. 

215 

216 Parameters 

217 ---------- 

218 source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object 

219 Either an in-memory buffer, or a readable file object. 

220 footer_offset : int, default None 

221 If the file is embedded in some larger file, this is the byte offset to 

222 the very end of the file data. 

223 options : pyarrow.ipc.IpcReadOptions 

224 Options for IPC serialization. 

225 If None, default values will be used. 

226 memory_pool : MemoryPool, default None 

227 If None, default memory pool is used. 

228 

229 Returns 

230 ------- 

231 reader : RecordBatchFileReader 

232 A reader for the given source 

233 """ 

234 return RecordBatchFileReader( 

235 source, footer_offset=footer_offset, 

236 options=options, memory_pool=memory_pool) 

237 

238 

239def serialize_pandas(df, *, nthreads=None, preserve_index=None): 

240 """ 

241 Serialize a pandas DataFrame into a buffer protocol compatible object. 

242 

243 Parameters 

244 ---------- 

245 df : pandas.DataFrame 

246 nthreads : int, default None 

247 Number of threads to use for conversion to Arrow, default all CPUs. 

248 preserve_index : bool, default None 

249 The default of None will store the index as a column, except for 

250 RangeIndex which is stored as metadata only. If True, always 

251 preserve the pandas index data as a column. If False, no index 

252 information is saved and the result will have a default RangeIndex. 

253 

254 Returns 

255 ------- 

256 buf : buffer 

257 An object compatible with the buffer protocol. 

258 """ 

259 batch = pa.RecordBatch.from_pandas(df, nthreads=nthreads, 

260 preserve_index=preserve_index) 

261 sink = pa.BufferOutputStream() 

262 with pa.RecordBatchStreamWriter(sink, batch.schema) as writer: 

263 writer.write_batch(batch) 

264 return sink.getvalue() 

265 

266 

267def deserialize_pandas(buf, *, use_threads=True): 

268 """Deserialize a buffer protocol compatible object into a pandas DataFrame. 

269 

270 Parameters 

271 ---------- 

272 buf : buffer 

273 An object compatible with the buffer protocol. 

274 use_threads : bool, default True 

275 Whether to parallelize the conversion using multiple threads. 

276 

277 Returns 

278 ------- 

279 df : pandas.DataFrame 

280 The buffer deserialized as pandas DataFrame 

281 """ 

282 buffer_reader = pa.BufferReader(buf) 

283 with pa.RecordBatchStreamReader(buffer_reader) as reader: 

284 table = reader.read_all() 

285 return table.to_pandas(use_threads=use_threads)