Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package replay
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "sync"
11 : "sync/atomic"
12 :
13 : "github.com/cockroachdb/pebble"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/vfs"
16 : )
17 :
18 : type workloadCaptureState uint8
19 :
20 : const (
21 : obsolete = workloadCaptureState(1) << iota
22 : readyForProcessing
23 : capturedSuccessfully
24 : )
25 :
26 1 : func (wcs workloadCaptureState) is(flag workloadCaptureState) bool { return wcs&flag != 0 }
27 :
28 : type manifestDetails struct {
29 : sourceFilepath string
30 : sourceFile vfs.File
31 :
32 : destFile vfs.File
33 : }
34 :
35 : // WorkloadCollector is designed to capture workloads by handling manifest
36 : // files, flushed SSTs and ingested SSTs. The collector hooks into the
37 : // pebble.EventListener and pebble.Cleaner in order keep track of file states.
38 : type WorkloadCollector struct {
39 : mu struct {
40 : sync.Mutex
41 : fileState map[string]workloadCaptureState
42 : // pendingSSTables holds a slice of file paths to sstables that need to
43 : // be copied but haven't yet. The `copyFiles` goroutine grabs these
44 : // files, and the flush and ingest event handlers append them.
45 : pendingSSTables []string
46 : // manifestIndex is an index into `manifests`, pointing to the
47 : // manifest currently being copied.
48 : manifestIndex int
49 : // appending to manifests requires holding mu. Only the `copyFiles`
50 : // goroutine is permitted to read or edit the struct contents once
51 : // appended, so it does not need to hold mu while accessing the structs'
52 : // fields.
53 : manifests []*manifestDetails
54 :
55 : // The following condition variable and counts are used in tests to
56 : // synchronize with the copying goroutine.
57 : copyCond sync.Cond
58 : tablesCopied int
59 : tablesEnqueued int
60 : }
61 : // Stores the current manifest that is being used by the database.
62 : curManifest atomic.Uint64
63 : // Stores whether the workload collector is enabled.
64 : enabled atomic.Bool
65 : buffer []byte
66 : // config contains information that is only set on the creation of the
67 : // WorkloadCollector.
68 : config struct {
69 : // srcFS and srcDir represent the location from which the workload collector
70 : // collects the files from.
71 : srcFS vfs.FS
72 : srcDir string
73 : // destFS and destDir represent the location to which the workload collector
74 : // sends the files to.
75 : destFS vfs.FS
76 : destDir string
77 : // cleaner stores the cleaner to use when files become obsolete and need to
78 : // be cleaned.
79 : cleaner base.Cleaner
80 : }
81 : copier struct {
82 : sync.Cond
83 : stop bool
84 : done chan struct{}
85 : }
86 : }
87 :
88 : // NewWorkloadCollector is used externally to create a New WorkloadCollector.
89 1 : func NewWorkloadCollector(srcDir string) *WorkloadCollector {
90 1 : wc := &WorkloadCollector{}
91 1 : wc.buffer = make([]byte, 1<<10 /* 1KB */)
92 1 : wc.config.srcDir = srcDir
93 1 : wc.mu.copyCond.L = &wc.mu.Mutex
94 1 : wc.mu.fileState = make(map[string]workloadCaptureState)
95 1 : wc.copier.Cond.L = &wc.mu.Mutex
96 1 : return wc
97 1 : }
98 :
99 : // Attach is used to set up the WorkloadCollector by attaching itself to
100 : // pebble.Options EventListener and Cleaner.
101 1 : func (w *WorkloadCollector) Attach(opts *pebble.Options) {
102 1 : opts.AddEventListener(pebble.EventListener{
103 1 : FlushEnd: w.onFlushEnd,
104 1 : ManifestCreated: w.onManifestCreated,
105 1 : TableIngested: w.onTableIngest,
106 1 : })
107 1 :
108 1 : opts.EnsureDefaults()
109 1 : // Replace the original Cleaner with the workload collector's implementation,
110 1 : // which will invoke the original Cleaner, but only once the collector's copied
111 1 : // what it needs.
112 1 : c := cleaner{
113 1 : name: fmt.Sprintf("replay.WorkloadCollector(%q)", opts.Cleaner),
114 1 : clean: w.clean,
115 1 : }
116 1 : w.config.cleaner, opts.Cleaner = opts.Cleaner, c
117 1 : w.config.srcFS = opts.FS
118 1 : }
119 :
120 : // enqueueCopyLocked enqueues the sstable with the provided filenum be copied in
121 : // the background. Requires w.mu.
122 1 : func (w *WorkloadCollector) enqueueCopyLocked(fileNum base.DiskFileNum) {
123 1 : fileName := base.MakeFilename(base.FileTypeTable, fileNum)
124 1 : w.mu.fileState[fileName] |= readyForProcessing
125 1 : w.mu.pendingSSTables = append(w.mu.pendingSSTables, w.srcFilepath(fileName))
126 1 : w.mu.tablesEnqueued++
127 1 : }
128 :
129 : // cleanFile calls the cleaner on the specified path and removes the path from
130 : // the fileState map.
131 1 : func (w *WorkloadCollector) cleanFile(fileType base.FileType, path string) error {
132 1 : err := w.config.cleaner.Clean(w.config.srcFS, fileType, path)
133 1 : if err == nil {
134 1 : w.mu.Lock()
135 1 : delete(w.mu.fileState, w.config.srcFS.PathBase(path))
136 1 : w.mu.Unlock()
137 1 : }
138 1 : return err
139 : }
140 :
141 : // clean deletes files only after they have been processed or are not required
142 : // for the workload collection.
143 1 : func (w *WorkloadCollector) clean(fs vfs.FS, fileType base.FileType, path string) error {
144 1 : if !w.IsRunning() {
145 1 : return w.cleanFile(fileType, path)
146 1 : }
147 1 : w.mu.Lock()
148 1 : fileName := fs.PathBase(path)
149 1 : if fileState, ok := w.mu.fileState[fileName]; !ok || fileState.is(capturedSuccessfully) {
150 1 : // Delete the file if it has been captured or the file is not important
151 1 : // to capture which means it can be deleted.
152 1 : w.mu.Unlock()
153 1 : return w.cleanFile(fileType, path)
154 1 : }
155 1 : w.mu.fileState[fileName] |= obsolete
156 1 : w.mu.Unlock()
157 1 : return nil
158 : }
159 :
160 : // onTableIngest is attached to a pebble.DB as an EventListener.TableIngested
161 : // func. It enqueues all ingested tables to be copied.
162 1 : func (w *WorkloadCollector) onTableIngest(info pebble.TableIngestInfo) {
163 1 : if !w.IsRunning() {
164 0 : return
165 0 : }
166 1 : w.mu.Lock()
167 1 : defer w.mu.Unlock()
168 1 : for _, table := range info.Tables {
169 1 : w.enqueueCopyLocked(table.FileNum.DiskFileNum())
170 1 : }
171 1 : w.copier.Broadcast()
172 : }
173 :
174 : // onFlushEnd is attached to a pebble.DB as an EventListener.FlushEnd func. It
175 : // enqueues all flushed tables to be copied.
176 1 : func (w *WorkloadCollector) onFlushEnd(info pebble.FlushInfo) {
177 1 : if !w.IsRunning() {
178 1 : return
179 1 : }
180 1 : w.mu.Lock()
181 1 : defer w.mu.Unlock()
182 1 : for _, table := range info.Output {
183 1 : w.enqueueCopyLocked(table.FileNum.DiskFileNum())
184 1 : }
185 1 : w.copier.Broadcast()
186 : }
187 :
188 : // onManifestCreated is attached to a pebble.DB as an
189 : // EventListener.ManifestCreated func. It records the the new manifest so that
190 : // it's copied asynchronously in the background.
191 1 : func (w *WorkloadCollector) onManifestCreated(info pebble.ManifestCreateInfo) {
192 1 : w.curManifest.Store(uint64(info.FileNum))
193 1 : if !w.enabled.Load() {
194 1 : return
195 1 : }
196 1 : w.mu.Lock()
197 1 : defer w.mu.Unlock()
198 1 :
199 1 : // mark the manifest file as ready for processing to prevent it from being
200 1 : // cleaned before we process it.
201 1 : fileName := base.MakeFilename(base.FileTypeManifest, info.FileNum)
202 1 : w.mu.fileState[fileName] |= readyForProcessing
203 1 : w.mu.manifests = append(w.mu.manifests, &manifestDetails{
204 1 : sourceFilepath: info.Path,
205 1 : })
206 : }
207 :
208 : // copyFiles is run in a separate goroutine, copying sstables and manifests.
209 1 : func (w *WorkloadCollector) copyFiles() {
210 1 : w.mu.Lock()
211 1 : defer w.mu.Unlock()
212 1 : // NB: This loop must hold w.mu at the beginning of each iteration. It may
213 1 : // drop w.mu at times, but it must reacquire it before the next iteration.
214 1 : for !w.copier.stop {
215 1 : // The following performs the workload capture. It waits on a condition
216 1 : // variable (fileListener) to let it know when new files are available to be
217 1 : // collected.
218 1 : if len(w.mu.pendingSSTables) == 0 {
219 1 : w.copier.Wait()
220 1 : }
221 : // Grab the manifests to copy.
222 1 : index := w.mu.manifestIndex
223 1 : pendingManifests := w.mu.manifests[index:]
224 1 : var pending []string
225 1 : pending, w.mu.pendingSSTables = w.mu.pendingSSTables, nil
226 1 : func() {
227 1 : // Note the unusual lock order; Temporarily unlock the
228 1 : // mutex, but re-acquire it before returning.
229 1 : w.mu.Unlock()
230 1 : defer w.mu.Lock()
231 1 :
232 1 : // Copy any updates to the manifests files.
233 1 : w.copyManifests(index, pendingManifests)
234 1 : // Copy the SSTables provided in pending. copySSTables takes
235 1 : // ownership of the pending slice.
236 1 : w.copySSTables(pending)
237 1 : }()
238 :
239 : // This helps in tests; Tests can wait on the copyCond condition
240 : // variable until the necessary bits have been copied.
241 1 : w.mu.tablesCopied += len(pending)
242 1 : w.mu.copyCond.Broadcast()
243 : }
244 :
245 1 : for idx := range w.mu.manifests {
246 1 : if f := w.mu.manifests[idx].sourceFile; f != nil {
247 1 : if err := f.Close(); err != nil {
248 0 : panic(err)
249 : }
250 1 : w.mu.manifests[idx].sourceFile = nil
251 : }
252 1 : if f := w.mu.manifests[idx].destFile; f != nil {
253 1 : if err := f.Close(); err != nil {
254 0 : panic(err)
255 : }
256 1 : w.mu.manifests[idx].destFile = nil
257 : }
258 : }
259 1 : close(w.copier.done)
260 : }
261 :
262 : // copyManifests copies any un-copied portions of the source manifests.
263 1 : func (w *WorkloadCollector) copyManifests(startAtIndex int, manifests []*manifestDetails) {
264 1 : destFS := w.config.destFS
265 1 :
266 1 : for index, manifest := range manifests {
267 1 : if manifest.destFile == nil && manifest.sourceFile == nil {
268 1 : // This is the first time we've read from this manifest, and we
269 1 : // don't yet have open file descriptors for the src or dst files. It
270 1 : // is safe to write to manifest.{destFile,sourceFile} without
271 1 : // holding d.mu, because the copyFiles goroutine is the only
272 1 : // goroutine that accesses the fields of the `manifestDetails`
273 1 : // struct.
274 1 : var err error
275 1 : manifest.destFile, err = destFS.Create(w.destFilepath(destFS.PathBase(manifest.sourceFilepath)))
276 1 : if err != nil {
277 0 : panic(err)
278 : }
279 1 : manifest.sourceFile, err = w.config.srcFS.Open(manifest.sourceFilepath)
280 1 : if err != nil {
281 0 : panic(err)
282 : }
283 : }
284 :
285 1 : numBytesRead, err := io.CopyBuffer(manifest.destFile, manifest.sourceFile, w.buffer)
286 1 : if err != nil {
287 0 : panic(err)
288 : }
289 :
290 : // Read 0 bytes from the current manifest and this is not the
291 : // latest/newest manifest which means we have read its entirety. No new
292 : // data will be written to it, because only the latest manifest may
293 : // receive edits. Close the current source and destination files and
294 : // move the manifest to start at the next index in w.mu.manifests.
295 1 : if numBytesRead == 0 && index != len(manifests)-1 {
296 1 : // Rotating the manifests so we can close the files.
297 1 : if err := manifests[index].sourceFile.Close(); err != nil {
298 0 : panic(err)
299 : }
300 1 : manifests[index].sourceFile = nil
301 1 : if err := manifests[index].destFile.Close(); err != nil {
302 0 : panic(err)
303 : }
304 1 : manifests[index].destFile = nil
305 1 : w.mu.Lock()
306 1 : w.mu.manifestIndex = startAtIndex + index + 1
307 1 : w.mu.Unlock()
308 : }
309 : }
310 : }
311 :
312 : // copySSTables copies the provided sstables to the stored workload. If a file
313 : // has already been marked as obsolete, then file will be cleaned by the
314 : // w.config.cleaner after it is copied. The provided slice will be mutated and
315 : // should not be used following the call to this function.
316 1 : func (w *WorkloadCollector) copySSTables(pending []string) {
317 1 : for _, filePath := range pending {
318 1 : err := vfs.CopyAcrossFS(w.config.srcFS,
319 1 : filePath,
320 1 : w.config.destFS,
321 1 : w.destFilepath(w.config.srcFS.PathBase(filePath)))
322 1 : if err != nil {
323 0 : panic(err)
324 : }
325 : }
326 :
327 : // Identify the subset of `pending` files that should now be cleaned. The
328 : // WorkloadCollector intercepts Cleaner.Clean calls to defer cleaning until
329 : // copying has completed. If Cleaner.Clean has already been invoked for any
330 : // of the files that copied, we can now actually Clean them.
331 1 : pendingClean := pending[:0]
332 1 : w.mu.Lock()
333 1 : for _, filePath := range pending {
334 1 : fileName := w.config.srcFS.PathBase(filePath)
335 1 : if w.mu.fileState[fileName].is(obsolete) {
336 1 : pendingClean = append(pendingClean, filePath)
337 1 : } else {
338 1 : w.mu.fileState[fileName] |= capturedSuccessfully
339 1 : }
340 : }
341 1 : w.mu.Unlock()
342 1 :
343 1 : for _, path := range pendingClean {
344 1 : _ = w.cleanFile(base.FileTypeTable, path)
345 1 : }
346 : }
347 :
348 : // Start begins collecting a workload. All flushed and ingested sstables, plus
349 : // corresponding manifests are copied to the provided destination path on the
350 : // provided FS.
351 1 : func (w *WorkloadCollector) Start(destFS vfs.FS, destPath string) {
352 1 : w.mu.Lock()
353 1 : defer w.mu.Unlock()
354 1 :
355 1 : // If the collector not is running then that means w.enabled == 0 so swap it
356 1 : // to 1 and continue else return since it is already running.
357 1 : if !w.enabled.CompareAndSwap(false, true) {
358 0 : return
359 0 : }
360 1 : w.config.destFS = destFS
361 1 : w.config.destDir = destPath
362 1 :
363 1 : // Initialize the tracked manifests to the database's current manifest, if
364 1 : // the database has already started. Every database Open creates a new
365 1 : // manifest. There are two cases:
366 1 : // 1. The database has already been opened. Then `w.atomic.curManifest`
367 1 : // contains the file number of the current manifest. We must initialize
368 1 : // the w.mu.manifests slice to contain this first manifest.
369 1 : // 2. The database has not yet been opened. Then `w.atomic.curManifest` is
370 1 : // still zero. Once the associated database is opened, it'll invoke
371 1 : // onManifestCreated which will handle enqueuing the manifest on
372 1 : // `w.mu.manifests`.
373 1 : fileNum := base.FileNum(w.curManifest.Load())
374 1 : if fileNum != 0 {
375 1 : fileName := base.MakeFilename(base.FileTypeManifest, fileNum.DiskFileNum())
376 1 : w.mu.manifests = append(w.mu.manifests[:0], &manifestDetails{sourceFilepath: w.srcFilepath(fileName)})
377 1 : w.mu.fileState[fileName] |= readyForProcessing
378 1 : }
379 :
380 : // Begin copying files asynchronously in the background.
381 1 : w.copier.done = make(chan struct{})
382 1 : w.copier.stop = false
383 1 : go w.copyFiles()
384 : }
385 :
386 : // WaitAndStop waits for all enqueued sstables to be copied over, and then
387 : // calls Stop. Gracefully ensures that all sstables referenced in the collected
388 : // manifest's latest version edit will exist in the copy directory.
389 1 : func (w *WorkloadCollector) WaitAndStop() {
390 1 : w.mu.Lock()
391 1 : for w.mu.tablesEnqueued != w.mu.tablesCopied {
392 1 : w.mu.copyCond.Wait()
393 1 : }
394 1 : w.mu.Unlock()
395 1 : w.Stop()
396 : }
397 :
398 : // Stop stops collection of the workload.
399 1 : func (w *WorkloadCollector) Stop() {
400 1 : w.mu.Lock()
401 1 : // If the collector is running then that means w.enabled == true so swap it to
402 1 : // false and continue else return since it is not running.
403 1 : if !w.enabled.CompareAndSwap(true, false) {
404 0 : w.mu.Unlock()
405 0 : return
406 0 : }
407 1 : w.copier.stop = true
408 1 : w.copier.Broadcast()
409 1 : w.mu.Unlock()
410 1 : <-w.copier.done
411 : }
412 :
413 : // IsRunning returns whether the WorkloadCollector is currently running.
414 1 : func (w *WorkloadCollector) IsRunning() bool {
415 1 : return w.enabled.Load()
416 1 : }
417 :
418 : // srcFilepath returns the file path to the named file in the source directory
419 : // on the source filesystem.
420 1 : func (w *WorkloadCollector) srcFilepath(name string) string {
421 1 : return w.config.srcFS.PathJoin(w.config.srcDir, name)
422 1 : }
423 :
424 : // destFilepath returns the file path to the named file in the destination
425 : // directory on the destination filesystem.
426 1 : func (w *WorkloadCollector) destFilepath(name string) string {
427 1 : return w.config.destFS.PathJoin(w.config.destDir, name)
428 1 : }
429 :
430 : type cleaner struct {
431 : name string
432 : clean func(vfs.FS, base.FileType, string) error
433 : }
434 :
435 1 : func (c cleaner) String() string { return c.name }
436 1 : func (c cleaner) Clean(fs vfs.FS, fileType base.FileType, path string) error {
437 1 : return c.clean(fs, fileType, path)
438 1 : }
|