Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/trace/replayer_impl.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "utilities/trace/replayer_impl.h"
7
8
#include <cmath>
9
#include <thread>
10
11
#include "rocksdb/options.h"
12
#include "rocksdb/slice.h"
13
#include "rocksdb/system_clock.h"
14
#include "util/threadpool_imp.h"
15
16
namespace ROCKSDB_NAMESPACE {
17
18
ReplayerImpl::ReplayerImpl(DB* db,
19
                           const std::vector<ColumnFamilyHandle*>& handles,
20
                           std::unique_ptr<TraceReader>&& reader)
21
0
    : Replayer(),
22
0
      trace_reader_(std::move(reader)),
23
0
      prepared_(false),
24
0
      trace_end_(false),
25
0
      header_ts_(0),
26
0
      exec_handler_(TraceRecord::NewExecutionHandler(db, handles)),
27
0
      env_(db->GetEnv()),
28
0
      trace_file_version_(-1) {}
29
30
0
ReplayerImpl::~ReplayerImpl() {
31
0
  exec_handler_.reset();
32
0
  trace_reader_.reset();
33
0
}
34
35
0
Status ReplayerImpl::Prepare() {
36
0
  Trace header;
37
0
  int db_version;
38
0
  Status s = ReadHeader(&header);
39
0
  if (!s.ok()) {
40
0
    return s;
41
0
  }
42
0
  s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
43
0
  if (!s.ok()) {
44
0
    return s;
45
0
  }
46
0
  header_ts_ = header.ts;
47
0
  prepared_ = true;
48
0
  trace_end_ = false;
49
0
  return Status::OK();
50
0
}
51
52
0
Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
53
0
  if (!prepared_) {
54
0
    return Status::Incomplete("Not prepared!");
55
0
  }
56
0
  if (trace_end_) {
57
0
    return Status::Incomplete("Trace end.");
58
0
  }
59
60
0
  Trace trace;
61
0
  Status s = ReadTrace(&trace);  // ReadTrace is atomic
62
  // Reached the trace end.
63
0
  if (s.ok() && trace.type == kTraceEnd) {
64
0
    trace_end_ = true;
65
0
    return Status::Incomplete("Trace end.");
66
0
  }
67
0
  if (!s.ok() || record == nullptr) {
68
0
    return s;
69
0
  }
70
71
0
  return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record);
72
0
}
73
74
Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
75
0
                             std::unique_ptr<TraceRecordResult>* result) {
76
0
  return record->Accept(exec_handler_.get(), result);
77
0
}
78
79
Status ReplayerImpl::Replay(
80
    const ReplayOptions& options,
81
    const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
82
0
        result_callback) {
83
0
  if (options.fast_forward <= 0.0) {
84
0
    return Status::InvalidArgument("Wrong fast forward speed!");
85
0
  }
86
87
0
  if (!prepared_) {
88
0
    return Status::Incomplete("Not prepared!");
89
0
  }
90
0
  if (trace_end_) {
91
0
    return Status::Incomplete("Trace end.");
92
0
  }
93
94
0
  Status s = Status::OK();
95
96
0
  if (options.num_threads <= 1) {
97
    // num_threads == 0 or num_threads == 1 uses single thread.
98
0
    std::chrono::system_clock::time_point replay_epoch =
99
0
        std::chrono::system_clock::now();
100
101
0
    while (s.ok()) {
102
0
      Trace trace;
103
0
      s = ReadTrace(&trace);
104
      // If already at trace end, ReadTrace should return Status::Incomplete().
105
0
      if (!s.ok()) {
106
0
        break;
107
0
      }
108
109
      // No need to sleep before breaking the loop if at the trace end.
110
0
      if (trace.type == kTraceEnd) {
111
0
        trace_end_ = true;
112
0
        s = Status::Incomplete("Trace end.");
113
0
        break;
114
0
      }
115
116
      // In single-threaded replay, decode first then sleep.
117
0
      std::unique_ptr<TraceRecord> record;
118
0
      s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record);
119
0
      if (!s.ok() && !s.IsNotSupported()) {
120
0
        break;
121
0
      }
122
123
0
      std::chrono::system_clock::time_point sleep_to =
124
0
          replay_epoch +
125
0
          std::chrono::microseconds(static_cast<uint64_t>(std::llround(
126
0
              1.0 * (trace.ts - header_ts_) / options.fast_forward)));
127
0
      if (sleep_to > std::chrono::system_clock::now()) {
128
0
        std::this_thread::sleep_until(sleep_to);
129
0
      }
130
131
      // Skip unsupported traces, stop for other errors.
132
0
      if (s.IsNotSupported()) {
133
0
        if (result_callback != nullptr) {
134
0
          result_callback(s, nullptr);
135
0
        }
136
0
        s = Status::OK();
137
0
        continue;
138
0
      }
139
140
0
      if (result_callback == nullptr) {
141
0
        s = Execute(record, nullptr);
142
0
      } else {
143
0
        std::unique_ptr<TraceRecordResult> res;
144
0
        s = Execute(record, &res);
145
0
        result_callback(s, std::move(res));
146
0
      }
147
0
    }
148
0
  } else {
149
    // Multi-threaded replay.
150
0
    ThreadPoolImpl thread_pool;
151
0
    thread_pool.SetHostEnv(env_);
152
0
    thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads));
153
154
0
    std::mutex mtx;
155
    // Background decoding and execution status.
156
0
    Status bg_s = Status::OK();
157
0
    uint64_t last_err_ts = static_cast<uint64_t>(-1);
158
    // Callback function used in background work to update bg_s for the ealiest
159
    // TraceRecord which has execution error. This is different from the
160
    // timestamp of the first execution error (either start or end timestamp).
161
    //
162
    // Suppose TraceRecord R1, R2, with timestamps T1 < T2. Their execution
163
    // timestamps are T1_start, T1_end, T2_start, T2_end.
164
    // Single-thread: there must be T1_start < T1_end < T2_start < T2_end.
165
    // Multi-thread: T1_start < T2_start may not be enforced. Orders of them are
166
    // totally unknown.
167
    // In order to report the same `first` error in both single-thread and
168
    // multi-thread replay, we can only rely on the TraceRecords' timestamps,
169
    // rather than their executin timestamps. Although in single-thread replay,
170
    // the first error is also the last error, while in multi-thread replay, the
171
    // first error may not be the first error in execution, and it may not be
172
    // the last error in exeution as well.
173
0
    auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) {
174
0
      std::lock_guard<std::mutex> gd(mtx);
175
      // Only record the first error.
176
0
      if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) {
177
0
        bg_s = err;
178
0
        last_err_ts = err_ts;
179
0
      }
180
0
    };
181
182
0
    std::chrono::system_clock::time_point replay_epoch =
183
0
        std::chrono::system_clock::now();
184
185
0
    while (bg_s.ok() && s.ok()) {
186
0
      Trace trace;
187
0
      s = ReadTrace(&trace);
188
      // If already at trace end, ReadTrace should return Status::Incomplete().
189
0
      if (!s.ok()) {
190
0
        break;
191
0
      }
192
193
0
      TraceType trace_type = trace.type;
194
195
      // No need to sleep before breaking the loop if at the trace end.
196
0
      if (trace_type == kTraceEnd) {
197
0
        trace_end_ = true;
198
0
        s = Status::Incomplete("Trace end.");
199
0
        break;
200
0
      }
201
202
      // In multi-threaded replay, sleep first then start decoding and
203
      // execution in a thread.
204
0
      std::chrono::system_clock::time_point sleep_to =
205
0
          replay_epoch +
206
0
          std::chrono::microseconds(static_cast<uint64_t>(std::llround(
207
0
              1.0 * (trace.ts - header_ts_) / options.fast_forward)));
208
0
      if (sleep_to > std::chrono::system_clock::now()) {
209
0
        std::this_thread::sleep_until(sleep_to);
210
0
      }
211
212
0
      if (trace_type == kTraceWrite || trace_type == kTraceGet ||
213
0
          trace_type == kTraceIteratorSeek ||
214
0
          trace_type == kTraceIteratorSeekForPrev ||
215
0
          trace_type == kTraceMultiGet) {
216
0
        std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
217
0
        ra->trace_entry = std::move(trace);
218
0
        ra->handler = exec_handler_.get();
219
0
        ra->trace_file_version = trace_file_version_;
220
0
        ra->error_cb = error_cb;
221
0
        ra->result_cb = result_callback;
222
0
        thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
223
0
                             nullptr, nullptr);
224
0
      } else {
225
        // Skip unsupported traces.
226
0
        if (result_callback != nullptr) {
227
0
          result_callback(Status::NotSupported("Unsupported trace type."),
228
0
                          nullptr);
229
0
        }
230
0
      }
231
0
    }
232
233
0
    thread_pool.WaitForJobsAndJoinAllThreads();
234
0
    if (!bg_s.ok()) {
235
0
      s = bg_s;
236
0
    }
237
0
  }
238
239
0
  if (s.IsIncomplete()) {
240
    // Reaching eof returns Incomplete status at the moment.
241
    // Could happen when killing a process without calling EndTrace() API.
242
    // TODO: Add better error handling.
243
0
    trace_end_ = true;
244
0
    return Status::OK();
245
0
  }
246
0
  return s;
247
0
}
248
249
0
uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; }
250
251
0
Status ReplayerImpl::ReadHeader(Trace* header) {
252
0
  assert(header != nullptr);
253
0
  Status s = trace_reader_->Reset();
254
0
  if (!s.ok()) {
255
0
    return s;
256
0
  }
257
0
  std::string encoded_trace;
258
  // Read the trace head
259
0
  s = trace_reader_->Read(&encoded_trace);
260
0
  if (!s.ok()) {
261
0
    return s;
262
0
  }
263
264
0
  return TracerHelper::DecodeHeader(encoded_trace, header);
265
0
}
266
267
0
Status ReplayerImpl::ReadTrace(Trace* trace) {
268
0
  assert(trace != nullptr);
269
0
  std::string encoded_trace;
270
  // We don't know if TraceReader is implemented thread-safe, so we protect the
271
  // reading trace part with a mutex. The decoding part does not need to be
272
  // protected since it's local.
273
0
  {
274
0
    std::lock_guard<std::mutex> guard(mutex_);
275
0
    Status s = trace_reader_->Read(&encoded_trace);
276
0
    if (!s.ok()) {
277
0
      return s;
278
0
    }
279
0
  }
280
0
  return TracerHelper::DecodeTrace(encoded_trace, trace);
281
0
}
282
283
0
void ReplayerImpl::BackgroundWork(void* arg) {
284
0
  std::unique_ptr<ReplayerWorkerArg> ra(static_cast<ReplayerWorkerArg*>(arg));
285
0
  assert(ra != nullptr);
286
287
0
  std::unique_ptr<TraceRecord> record;
288
0
  Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry),
289
0
                                             ra->trace_file_version, &record);
290
0
  if (!s.ok()) {
291
    // Stop the replay
292
0
    if (ra->error_cb != nullptr) {
293
0
      ra->error_cb(s, ra->trace_entry.ts);
294
0
    }
295
    // Report the result
296
0
    if (ra->result_cb != nullptr) {
297
0
      ra->result_cb(s, nullptr);
298
0
    }
299
0
    return;
300
0
  }
301
302
0
  if (ra->result_cb == nullptr) {
303
0
    s = record->Accept(ra->handler, nullptr);
304
0
  } else {
305
0
    std::unique_ptr<TraceRecordResult> res;
306
0
    s = record->Accept(ra->handler, &res);
307
0
    ra->result_cb(s, std::move(res));
308
0
  }
309
0
  record.reset();
310
0
}
311
312
}  // namespace ROCKSDB_NAMESPACE