/src/rocksdb/utilities/trace/replayer_impl.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "utilities/trace/replayer_impl.h" |
7 | | |
8 | | #include <cmath> |
9 | | #include <thread> |
10 | | |
11 | | #include "rocksdb/options.h" |
12 | | #include "rocksdb/slice.h" |
13 | | #include "rocksdb/system_clock.h" |
14 | | #include "util/threadpool_imp.h" |
15 | | |
16 | | namespace ROCKSDB_NAMESPACE { |
17 | | |
18 | | ReplayerImpl::ReplayerImpl(DB* db, |
19 | | const std::vector<ColumnFamilyHandle*>& handles, |
20 | | std::unique_ptr<TraceReader>&& reader) |
21 | 0 | : Replayer(), |
22 | 0 | trace_reader_(std::move(reader)), |
23 | 0 | prepared_(false), |
24 | 0 | trace_end_(false), |
25 | 0 | header_ts_(0), |
26 | 0 | exec_handler_(TraceRecord::NewExecutionHandler(db, handles)), |
27 | 0 | env_(db->GetEnv()), |
28 | 0 | trace_file_version_(-1) {} |
29 | | |
30 | 0 | ReplayerImpl::~ReplayerImpl() { |
31 | 0 | exec_handler_.reset(); |
32 | 0 | trace_reader_.reset(); |
33 | 0 | } |
34 | | |
35 | 0 | Status ReplayerImpl::Prepare() { |
36 | 0 | Trace header; |
37 | 0 | int db_version; |
38 | 0 | Status s = ReadHeader(&header); |
39 | 0 | if (!s.ok()) { |
40 | 0 | return s; |
41 | 0 | } |
42 | 0 | s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version); |
43 | 0 | if (!s.ok()) { |
44 | 0 | return s; |
45 | 0 | } |
46 | 0 | header_ts_ = header.ts; |
47 | 0 | prepared_ = true; |
48 | 0 | trace_end_ = false; |
49 | 0 | return Status::OK(); |
50 | 0 | } |
51 | | |
52 | 0 | Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) { |
53 | 0 | if (!prepared_) { |
54 | 0 | return Status::Incomplete("Not prepared!"); |
55 | 0 | } |
56 | 0 | if (trace_end_) { |
57 | 0 | return Status::Incomplete("Trace end."); |
58 | 0 | } |
59 | | |
60 | 0 | Trace trace; |
61 | 0 | Status s = ReadTrace(&trace); // ReadTrace is atomic |
62 | | // Reached the trace end. |
63 | 0 | if (s.ok() && trace.type == kTraceEnd) { |
64 | 0 | trace_end_ = true; |
65 | 0 | return Status::Incomplete("Trace end."); |
66 | 0 | } |
67 | 0 | if (!s.ok() || record == nullptr) { |
68 | 0 | return s; |
69 | 0 | } |
70 | | |
71 | 0 | return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record); |
72 | 0 | } |
73 | | |
74 | | Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record, |
75 | 0 | std::unique_ptr<TraceRecordResult>* result) { |
76 | 0 | return record->Accept(exec_handler_.get(), result); |
77 | 0 | } |
78 | | |
79 | | Status ReplayerImpl::Replay( |
80 | | const ReplayOptions& options, |
81 | | const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>& |
82 | 0 | result_callback) { |
83 | 0 | if (options.fast_forward <= 0.0) { |
84 | 0 | return Status::InvalidArgument("Wrong fast forward speed!"); |
85 | 0 | } |
86 | | |
87 | 0 | if (!prepared_) { |
88 | 0 | return Status::Incomplete("Not prepared!"); |
89 | 0 | } |
90 | 0 | if (trace_end_) { |
91 | 0 | return Status::Incomplete("Trace end."); |
92 | 0 | } |
93 | | |
94 | 0 | Status s = Status::OK(); |
95 | |
|
96 | 0 | if (options.num_threads <= 1) { |
97 | | // num_threads == 0 or num_threads == 1 uses single thread. |
98 | 0 | std::chrono::system_clock::time_point replay_epoch = |
99 | 0 | std::chrono::system_clock::now(); |
100 | |
|
101 | 0 | while (s.ok()) { |
102 | 0 | Trace trace; |
103 | 0 | s = ReadTrace(&trace); |
104 | | // If already at trace end, ReadTrace should return Status::Incomplete(). |
105 | 0 | if (!s.ok()) { |
106 | 0 | break; |
107 | 0 | } |
108 | | |
109 | | // No need to sleep before breaking the loop if at the trace end. |
110 | 0 | if (trace.type == kTraceEnd) { |
111 | 0 | trace_end_ = true; |
112 | 0 | s = Status::Incomplete("Trace end."); |
113 | 0 | break; |
114 | 0 | } |
115 | | |
116 | | // In single-threaded replay, decode first then sleep. |
117 | 0 | std::unique_ptr<TraceRecord> record; |
118 | 0 | s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record); |
119 | 0 | if (!s.ok() && !s.IsNotSupported()) { |
120 | 0 | break; |
121 | 0 | } |
122 | | |
123 | 0 | std::chrono::system_clock::time_point sleep_to = |
124 | 0 | replay_epoch + |
125 | 0 | std::chrono::microseconds(static_cast<uint64_t>(std::llround( |
126 | 0 | 1.0 * (trace.ts - header_ts_) / options.fast_forward))); |
127 | 0 | if (sleep_to > std::chrono::system_clock::now()) { |
128 | 0 | std::this_thread::sleep_until(sleep_to); |
129 | 0 | } |
130 | | |
131 | | // Skip unsupported traces, stop for other errors. |
132 | 0 | if (s.IsNotSupported()) { |
133 | 0 | if (result_callback != nullptr) { |
134 | 0 | result_callback(s, nullptr); |
135 | 0 | } |
136 | 0 | s = Status::OK(); |
137 | 0 | continue; |
138 | 0 | } |
139 | | |
140 | 0 | if (result_callback == nullptr) { |
141 | 0 | s = Execute(record, nullptr); |
142 | 0 | } else { |
143 | 0 | std::unique_ptr<TraceRecordResult> res; |
144 | 0 | s = Execute(record, &res); |
145 | 0 | result_callback(s, std::move(res)); |
146 | 0 | } |
147 | 0 | } |
148 | 0 | } else { |
149 | | // Multi-threaded replay. |
150 | 0 | ThreadPoolImpl thread_pool; |
151 | 0 | thread_pool.SetHostEnv(env_); |
152 | 0 | thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads)); |
153 | |
|
154 | 0 | std::mutex mtx; |
155 | | // Background decoding and execution status. |
156 | 0 | Status bg_s = Status::OK(); |
157 | 0 | uint64_t last_err_ts = static_cast<uint64_t>(-1); |
158 | | // Callback function used in background work to update bg_s for the ealiest |
159 | | // TraceRecord which has execution error. This is different from the |
160 | | // timestamp of the first execution error (either start or end timestamp). |
161 | | // |
162 | | // Suppose TraceRecord R1, R2, with timestamps T1 < T2. Their execution |
163 | | // timestamps are T1_start, T1_end, T2_start, T2_end. |
164 | | // Single-thread: there must be T1_start < T1_end < T2_start < T2_end. |
165 | | // Multi-thread: T1_start < T2_start may not be enforced. Orders of them are |
166 | | // totally unknown. |
167 | | // In order to report the same `first` error in both single-thread and |
168 | | // multi-thread replay, we can only rely on the TraceRecords' timestamps, |
169 | | // rather than their executin timestamps. Although in single-thread replay, |
170 | | // the first error is also the last error, while in multi-thread replay, the |
171 | | // first error may not be the first error in execution, and it may not be |
172 | | // the last error in exeution as well. |
173 | 0 | auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) { |
174 | 0 | std::lock_guard<std::mutex> gd(mtx); |
175 | | // Only record the first error. |
176 | 0 | if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) { |
177 | 0 | bg_s = err; |
178 | 0 | last_err_ts = err_ts; |
179 | 0 | } |
180 | 0 | }; |
181 | |
|
182 | 0 | std::chrono::system_clock::time_point replay_epoch = |
183 | 0 | std::chrono::system_clock::now(); |
184 | |
|
185 | 0 | while (bg_s.ok() && s.ok()) { |
186 | 0 | Trace trace; |
187 | 0 | s = ReadTrace(&trace); |
188 | | // If already at trace end, ReadTrace should return Status::Incomplete(). |
189 | 0 | if (!s.ok()) { |
190 | 0 | break; |
191 | 0 | } |
192 | | |
193 | 0 | TraceType trace_type = trace.type; |
194 | | |
195 | | // No need to sleep before breaking the loop if at the trace end. |
196 | 0 | if (trace_type == kTraceEnd) { |
197 | 0 | trace_end_ = true; |
198 | 0 | s = Status::Incomplete("Trace end."); |
199 | 0 | break; |
200 | 0 | } |
201 | | |
202 | | // In multi-threaded replay, sleep first then start decoding and |
203 | | // execution in a thread. |
204 | 0 | std::chrono::system_clock::time_point sleep_to = |
205 | 0 | replay_epoch + |
206 | 0 | std::chrono::microseconds(static_cast<uint64_t>(std::llround( |
207 | 0 | 1.0 * (trace.ts - header_ts_) / options.fast_forward))); |
208 | 0 | if (sleep_to > std::chrono::system_clock::now()) { |
209 | 0 | std::this_thread::sleep_until(sleep_to); |
210 | 0 | } |
211 | |
|
212 | 0 | if (trace_type == kTraceWrite || trace_type == kTraceGet || |
213 | 0 | trace_type == kTraceIteratorSeek || |
214 | 0 | trace_type == kTraceIteratorSeekForPrev || |
215 | 0 | trace_type == kTraceMultiGet) { |
216 | 0 | std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg); |
217 | 0 | ra->trace_entry = std::move(trace); |
218 | 0 | ra->handler = exec_handler_.get(); |
219 | 0 | ra->trace_file_version = trace_file_version_; |
220 | 0 | ra->error_cb = error_cb; |
221 | 0 | ra->result_cb = result_callback; |
222 | 0 | thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(), |
223 | 0 | nullptr, nullptr); |
224 | 0 | } else { |
225 | | // Skip unsupported traces. |
226 | 0 | if (result_callback != nullptr) { |
227 | 0 | result_callback(Status::NotSupported("Unsupported trace type."), |
228 | 0 | nullptr); |
229 | 0 | } |
230 | 0 | } |
231 | 0 | } |
232 | |
|
233 | 0 | thread_pool.WaitForJobsAndJoinAllThreads(); |
234 | 0 | if (!bg_s.ok()) { |
235 | 0 | s = bg_s; |
236 | 0 | } |
237 | 0 | } |
238 | |
|
239 | 0 | if (s.IsIncomplete()) { |
240 | | // Reaching eof returns Incomplete status at the moment. |
241 | | // Could happen when killing a process without calling EndTrace() API. |
242 | | // TODO: Add better error handling. |
243 | 0 | trace_end_ = true; |
244 | 0 | return Status::OK(); |
245 | 0 | } |
246 | 0 | return s; |
247 | 0 | } |
248 | | |
249 | 0 | uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; } |
250 | | |
251 | 0 | Status ReplayerImpl::ReadHeader(Trace* header) { |
252 | 0 | assert(header != nullptr); |
253 | 0 | Status s = trace_reader_->Reset(); |
254 | 0 | if (!s.ok()) { |
255 | 0 | return s; |
256 | 0 | } |
257 | 0 | std::string encoded_trace; |
258 | | // Read the trace head |
259 | 0 | s = trace_reader_->Read(&encoded_trace); |
260 | 0 | if (!s.ok()) { |
261 | 0 | return s; |
262 | 0 | } |
263 | | |
264 | 0 | return TracerHelper::DecodeHeader(encoded_trace, header); |
265 | 0 | } |
266 | | |
267 | 0 | Status ReplayerImpl::ReadTrace(Trace* trace) { |
268 | 0 | assert(trace != nullptr); |
269 | 0 | std::string encoded_trace; |
270 | | // We don't know if TraceReader is implemented thread-safe, so we protect the |
271 | | // reading trace part with a mutex. The decoding part does not need to be |
272 | | // protected since it's local. |
273 | 0 | { |
274 | 0 | std::lock_guard<std::mutex> guard(mutex_); |
275 | 0 | Status s = trace_reader_->Read(&encoded_trace); |
276 | 0 | if (!s.ok()) { |
277 | 0 | return s; |
278 | 0 | } |
279 | 0 | } |
280 | 0 | return TracerHelper::DecodeTrace(encoded_trace, trace); |
281 | 0 | } |
282 | | |
283 | 0 | void ReplayerImpl::BackgroundWork(void* arg) { |
284 | 0 | std::unique_ptr<ReplayerWorkerArg> ra(static_cast<ReplayerWorkerArg*>(arg)); |
285 | 0 | assert(ra != nullptr); |
286 | |
|
287 | 0 | std::unique_ptr<TraceRecord> record; |
288 | 0 | Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry), |
289 | 0 | ra->trace_file_version, &record); |
290 | 0 | if (!s.ok()) { |
291 | | // Stop the replay |
292 | 0 | if (ra->error_cb != nullptr) { |
293 | 0 | ra->error_cb(s, ra->trace_entry.ts); |
294 | 0 | } |
295 | | // Report the result |
296 | 0 | if (ra->result_cb != nullptr) { |
297 | 0 | ra->result_cb(s, nullptr); |
298 | 0 | } |
299 | 0 | return; |
300 | 0 | } |
301 | | |
302 | 0 | if (ra->result_cb == nullptr) { |
303 | 0 | s = record->Accept(ra->handler, nullptr); |
304 | 0 | } else { |
305 | 0 | std::unique_ptr<TraceRecordResult> res; |
306 | 0 | s = record->Accept(ra->handler, &res); |
307 | 0 | ra->result_cb(s, std::move(res)); |
308 | 0 | } |
309 | 0 | record.reset(); |
310 | 0 | } |
311 | | |
312 | | } // namespace ROCKSDB_NAMESPACE |