Coverage Report

Created: 2026-06-30 06:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/src/main/stream_query_result.cpp
Line
Count
Source
1
#include "duckdb/main/stream_query_result.hpp"
2
3
#include "duckdb/main/client_context.hpp"
4
#include "duckdb/main/materialized_query_result.hpp"
5
#include "duckdb/common/box_renderer.hpp"
6
#include "duckdb/main/database.hpp"
7
8
namespace duckdb {
9
10
StreamQueryResult::StreamQueryResult(StatementType statement_type, StatementProperties properties,
11
                                     vector<LogicalType> types, vector<string> names,
12
                                     ClientProperties client_properties, shared_ptr<BufferedData> data)
13
0
    : QueryResult(QueryResultType::STREAM_RESULT, statement_type, std::move(properties), std::move(types),
14
0
                  std::move(names), std::move(client_properties)),
15
0
      buffered_data(std::move(data)) {
16
0
  context = buffered_data->GetContext();
17
0
}
18
19
0
StreamQueryResult::StreamQueryResult(ErrorData error) : QueryResult(QueryResultType::STREAM_RESULT, std::move(error)) {
20
0
}
21
22
0
StreamQueryResult::~StreamQueryResult() {
23
0
}
24
25
0
string StreamQueryResult::ToString() {
26
0
  string result;
27
0
  if (success) {
28
0
    result = HeaderToString();
29
0
    result += "[[STREAM RESULT]]";
30
0
  } else {
31
0
    result = GetError() + "\n";
32
0
  }
33
0
  return result;
34
0
}
35
36
0
unique_ptr<ClientContextLock> StreamQueryResult::LockContext() {
37
0
  if (!context) {
38
0
    string error_str = "Attempting to execute an unsuccessful or closed pending query result";
39
0
    if (HasError()) {
40
0
      error_str += StringUtil::Format("\nError: %s", GetError());
41
0
    }
42
0
    throw InvalidInputException(error_str);
43
0
  }
44
0
  return context->LockContext();
45
0
}
46
47
0
StreamExecutionResult StreamQueryResult::ExecuteTaskInternal(ClientContextLock &lock) {
48
0
  return buffered_data->ExecuteTaskInternal(*this, lock);
49
0
}
50
51
0
StreamExecutionResult StreamQueryResult::ExecuteTask() {
52
0
  auto lock = LockContext();
53
0
  return ExecuteTaskInternal(*lock);
54
0
}
55
56
0
void StreamQueryResult::WaitForTask() {
57
0
  auto lock = LockContext();
58
0
  buffered_data->UnblockSinks();
59
0
  context->WaitForTask(*lock, *this);
60
0
}
61
62
0
static bool ExecutionErrorOccurred(StreamExecutionResult result) {
63
0
  if (result == StreamExecutionResult::EXECUTION_CANCELLED) {
64
0
    return true;
65
0
  }
66
0
  if (result == StreamExecutionResult::EXECUTION_ERROR) {
67
0
    return true;
68
0
  }
69
0
  return false;
70
0
}
71
72
0
unique_ptr<DataChunk> StreamQueryResult::FetchNextInternal(ClientContextLock &lock) {
73
0
  bool invalidate_query = true;
74
0
  unique_ptr<DataChunk> chunk;
75
0
  try {
76
    // fetch the chunk and return it
77
0
    auto stream_execution_result = buffered_data->ReplenishBuffer(*this, lock);
78
0
    if (ExecutionErrorOccurred(stream_execution_result)) {
79
0
      return chunk;
80
0
    }
81
0
    chunk = buffered_data->Scan();
82
0
    if (!chunk || chunk->ColumnCount() == 0 || chunk->size() == 0) {
83
0
      context->CleanupInternal(lock, this);
84
0
      chunk = nullptr;
85
0
    }
86
0
    return chunk;
87
0
  } catch (std::exception &ex) {
88
0
    ErrorData error(ex);
89
0
    if (!Exception::InvalidatesTransaction(error.Type())) {
90
      // standard exceptions do not invalidate the current transaction
91
0
      invalidate_query = false;
92
0
    } else if (Exception::InvalidatesDatabase(error.Type())) {
93
      // fatal exceptions invalidate the entire database
94
0
      auto &db_instance = DatabaseInstance::GetDatabase(*context);
95
0
      ValidChecker::Invalidate(db_instance, error.RawMessage());
96
0
    }
97
0
    context->ProcessError(error, context->GetCurrentQuery());
98
0
    SetError(std::move(error));
99
0
  } catch (...) { // LCOV_EXCL_START
100
0
    SetError(ErrorData("Unhandled exception in FetchInternal"));
101
0
  } // LCOV_EXCL_STOP
102
0
  context->CleanupInternal(lock, this, invalidate_query);
103
0
  return nullptr;
104
0
}
105
106
0
unique_ptr<DataChunk> StreamQueryResult::FetchInternal() {
107
0
  unique_ptr<DataChunk> chunk;
108
0
  {
109
0
    auto lock = LockContext();
110
0
    CheckExecutableInternal(*lock);
111
0
    chunk = FetchNextInternal(*lock);
112
0
  }
113
0
  if (!chunk || chunk->ColumnCount() == 0 || chunk->size() == 0) {
114
0
    Close();
115
0
    return nullptr;
116
0
  }
117
0
  return chunk;
118
0
}
119
120
#ifdef DUCKDB_ALTERNATIVE_VERIFY
121
static unique_ptr<DataChunk> AlternativeFetch(StreamQueryResult &stream_result) {
122
  // We first use StreamQueryResult::ExecuteTask until IsChunkReady becomes true
123
  // then call Fetch
124
  StreamExecutionResult execution_result;
125
  while (!StreamQueryResult::IsChunkReady(execution_result = stream_result.ExecuteTask())) {
126
    if (execution_result == StreamExecutionResult::BLOCKED) {
127
      stream_result.WaitForTask();
128
    }
129
  }
130
  if (execution_result == StreamExecutionResult::EXECUTION_CANCELLED) {
131
    throw InvalidInputException("The execution of the query was cancelled before it could finish, likely "
132
                                "caused by executing a different query");
133
  }
134
  if (execution_result == StreamExecutionResult::EXECUTION_ERROR) {
135
    stream_result.ThrowError();
136
  }
137
  return stream_result.Fetch();
138
}
139
#endif
140
141
0
unique_ptr<MaterializedQueryResult> StreamQueryResult::Materialize() {
142
0
  if (HasError() || !context) {
143
0
    return make_uniq<MaterializedQueryResult>(GetErrorObject());
144
0
  }
145
0
  auto collection = make_uniq<ColumnDataCollection>(Allocator::DefaultAllocator(), types);
146
147
0
  ColumnDataAppendState append_state;
148
0
  collection->InitializeAppend(append_state);
149
0
  while (true) {
150
#ifdef DUCKDB_ALTERNATIVE_VERIFY
151
    auto chunk = AlternativeFetch(*this);
152
#else
153
0
    auto chunk = Fetch();
154
0
#endif
155
0
    if (!chunk || chunk->size() == 0) {
156
0
      break;
157
0
    }
158
0
    collection->Append(append_state, *chunk);
159
0
  }
160
0
  auto result =
161
0
      make_uniq<MaterializedQueryResult>(statement_type, properties, names, std::move(collection), client_properties);
162
0
  if (HasError()) {
163
0
    return make_uniq<MaterializedQueryResult>(GetErrorObject());
164
0
  }
165
0
  return result;
166
0
}
167
168
0
bool StreamQueryResult::IsOpenInternal(ClientContextLock &lock) {
169
0
  bool invalidated = !success || !context;
170
0
  if (!invalidated) {
171
0
    invalidated = !context->IsActiveResult(lock, *this);
172
0
  }
173
0
  return !invalidated;
174
0
}
175
176
0
void StreamQueryResult::CheckExecutableInternal(ClientContextLock &lock) {
177
0
  if (!IsOpenInternal(lock)) {
178
0
    string error_str = "Attempting to execute an unsuccessful or closed pending query result";
179
0
    if (HasError()) {
180
0
      error_str += StringUtil::Format("\nError: %s", GetError());
181
0
    }
182
0
    throw InvalidInputException(error_str);
183
0
  }
184
0
}
185
186
0
bool StreamQueryResult::IsOpen() {
187
0
  if (!success || !context) {
188
0
    return false;
189
0
  }
190
0
  auto lock = LockContext();
191
0
  return IsOpenInternal(*lock);
192
0
}
193
194
0
void StreamQueryResult::Close() {
195
0
  buffered_data->Close();
196
0
  if (context) {
197
0
    auto lock = LockContext();
198
0
    if (context->IsActiveResult(*lock, *this)) {
199
      // Abandoned before the stream was fully drained: release the active-query state now
200
      // (matching InitialCleanup) instead of leaking it until the next query or context teardown.
201
0
      context->CleanupInternal(*lock, this, false);
202
0
    }
203
0
  }
204
0
  context.reset();
205
0
}
206
207
0
bool StreamQueryResult::IsChunkReady(StreamExecutionResult result) {
208
0
  if (result == StreamExecutionResult::CHUNK_READY) {
209
    // A chunk is ready to be fetched with Fetch()
210
0
    return true;
211
0
  }
212
0
  if (result == StreamExecutionResult::EXECUTION_CANCELLED) {
213
    // Another query execution was started that cancelled this one
214
0
    return true;
215
0
  }
216
0
  if (result == StreamExecutionResult::EXECUTION_ERROR) {
217
    // An error was encountered while executing the final pipeline
218
0
    return true;
219
0
  }
220
0
  if (result == StreamExecutionResult::EXECUTION_FINISHED) {
221
    // The final pipeline completed successfully
222
0
    return true;
223
0
  }
224
0
  return false;
225
0
}
226
227
} // namespace duckdb