/src/duckdb/src/main/buffered_data/buffered_data.cpp
Line | Count | Source |
1 | | #include "duckdb/main/buffered_data/buffered_data.hpp" |
2 | | #include "duckdb/main/client_config.hpp" |
3 | | #include "duckdb/main/client_context.hpp" |
4 | | |
5 | | namespace duckdb { |
6 | | |
7 | 0 | BufferedData::BufferedData(Type type, ClientContext &context_p) : type(type), context(context_p.shared_from_this()) { |
8 | 0 | auto &config = ClientConfig::GetConfig(context_p); |
9 | 0 | total_buffer_size = config.streaming_buffer_size; |
10 | 0 | } |
11 | | |
12 | 0 | BufferedData::~BufferedData() { |
13 | 0 | } |
14 | | |
15 | 0 | StreamExecutionResult BufferedData::ReplenishBuffer(StreamQueryResult &result, ClientContextLock &context_lock) { |
16 | 0 | auto cc = context.lock(); |
17 | 0 | if (!cc) { |
18 | 0 | return StreamExecutionResult::EXECUTION_CANCELLED; |
19 | 0 | } |
20 | | |
21 | 0 | StreamExecutionResult execution_result; |
22 | 0 | while (!StreamQueryResult::IsChunkReady(execution_result = ExecuteTaskInternal(result, context_lock))) { |
23 | 0 | if (execution_result == StreamExecutionResult::BLOCKED) { |
24 | 0 | UnblockSinks(); |
25 | 0 | cc->WaitForTask(context_lock, result); |
26 | 0 | } |
27 | 0 | } |
28 | 0 | if (result.HasError()) { |
29 | 0 | Close(); |
30 | 0 | } |
31 | 0 | return execution_result; |
32 | 0 | } |
33 | | |
34 | | } // namespace duckdb |