/src/duckdb/src/common/arrow/arrow_util.cpp
Line | Count | Source |
1 | | #include <utility> |
2 | | |
3 | | #include "duckdb/common/arrow/arrow_util.hpp" |
4 | | #include "duckdb/common/arrow/arrow_appender.hpp" |
5 | | #include "duckdb/common/types/data_chunk.hpp" |
6 | | #include "duckdb/function/table/arrow/arrow_duck_schema.hpp" |
7 | | namespace duckdb { |
8 | | |
9 | | bool ArrowUtil::TryFetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t batch_size, ArrowArray *out, |
10 | | idx_t &count, ErrorData &error, |
11 | 0 | unordered_map<idx_t, const shared_ptr<ArrowTypeExtensionData>> extension_type_cast) { |
12 | 0 | count = 0; |
13 | 0 | ArrowAppender appender(scan_state.Types(), batch_size, std::move(options), std::move(extension_type_cast)); |
14 | 0 | const auto remaining_tuples_in_chunk = scan_state.RemainingInChunk(); |
15 | 0 | if (remaining_tuples_in_chunk) { |
16 | | // We start by scanning the non-finished current chunk |
17 | 0 | idx_t cur_consumption = MinValue(remaining_tuples_in_chunk, batch_size); |
18 | 0 | count += cur_consumption; |
19 | 0 | auto ¤t_chunk = scan_state.CurrentChunk(); |
20 | 0 | appender.Append(current_chunk, scan_state.CurrentOffset(), scan_state.CurrentOffset() + cur_consumption, |
21 | 0 | current_chunk.size()); |
22 | 0 | scan_state.IncreaseOffset(cur_consumption); |
23 | 0 | } |
24 | 0 | while (count < batch_size) { |
25 | 0 | if (!scan_state.LoadNextChunk(error)) { |
26 | 0 | if (scan_state.HasError()) { |
27 | 0 | error = scan_state.GetError(); |
28 | 0 | } |
29 | 0 | return false; |
30 | 0 | } |
31 | 0 | if (scan_state.ChunkIsEmpty()) { |
32 | | // The scan was successful, but an empty chunk was returned |
33 | 0 | break; |
34 | 0 | } |
35 | 0 | auto ¤t_chunk = scan_state.CurrentChunk(); |
36 | 0 | if (scan_state.Finished() || current_chunk.size() == 0) { |
37 | 0 | break; |
38 | 0 | } |
39 | | // The amount we still need to append into this chunk |
40 | 0 | auto remaining = batch_size - count; |
41 | | |
42 | | // The amount remaining, capped by the amount left in the current chunk |
43 | 0 | auto to_append_to_batch = MinValue(remaining, scan_state.RemainingInChunk()); |
44 | 0 | appender.Append(current_chunk, 0, to_append_to_batch, current_chunk.size()); |
45 | 0 | count += to_append_to_batch; |
46 | 0 | scan_state.IncreaseOffset(to_append_to_batch); |
47 | 0 | } |
48 | 0 | if (count > 0) { |
49 | 0 | *out = appender.Finalize(); |
50 | 0 | } |
51 | 0 | return true; |
52 | 0 | } |
53 | | |
54 | | idx_t ArrowUtil::FetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out, |
55 | 0 | const unordered_map<idx_t, const shared_ptr<ArrowTypeExtensionData>> &extension_type_cast) { |
56 | 0 | ErrorData error; |
57 | 0 | idx_t result_count; |
58 | 0 | if (!TryFetchChunk(scan_state, std::move(options), chunk_size, out, result_count, error, extension_type_cast)) { |
59 | 0 | error.Throw(); |
60 | 0 | } |
61 | 0 | return result_count; |
62 | 0 | } |
63 | | |
64 | | } // namespace duckdb |