Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "os"
9 : "slices"
10 : "sync"
11 :
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/record"
14 : "github.com/cockroachdb/pebble/vfs"
15 : )
16 :
17 : // StandaloneManager implements Manager with a single log file per WAL (no
18 : // failover capability).
19 : type StandaloneManager struct {
20 : o Options
21 : recycler LogRecycler
22 : walDir vfs.File
23 : // initialObsolete holds the set of DeletableLogs that formed the logs
24 : // passed into Init. The initialObsolete logs are all obsolete. Once
25 : // returned via Manager.Obsolete, initialObsolete is cleared. The
26 : // initialObsolete logs are stored separately from mu.queue because they may
27 : // include logs that were NOT created by the standalone manager, and
28 : // multiple physical log files may form one logical WAL.
29 : initialObsolete []DeletableLog
30 :
31 : // External synchronization is relied on when accessing w in Manager.Create,
32 : // Writer.{WriteRecord,Close}.
33 : w *standaloneWriter
34 :
35 : mu struct {
36 : sync.Mutex
37 : // The queue of WALs, containing both flushed and unflushed WALs. The
38 : // FileInfo.FileNum is also the NumWAL, since there is one log file for
39 : // each WAL. The flushed logs are a prefix, the unflushed logs a suffix.
40 : // If w != nil, the last entry here is that active WAL. For the active
41 : // log, FileInfo.FileSize is the size when it was opened and can be
42 : // greater than zero because of log recycling.
43 : queue []base.FileInfo
44 : }
45 : }
46 :
47 : var _ Manager = &StandaloneManager{}
48 :
49 : // init implements Manager.
50 2 : func (m *StandaloneManager) init(o Options, initial Logs) error {
51 2 : if o.Secondary.FS != nil {
52 0 : return base.AssertionFailedf("cannot create StandaloneManager with a secondary")
53 0 : }
54 2 : var err error
55 2 : var walDir vfs.File
56 2 : if walDir, err = o.Primary.FS.OpenDir(o.Primary.Dirname); err != nil {
57 1 : return err
58 1 : }
59 2 : *m = StandaloneManager{
60 2 : o: o,
61 2 : walDir: walDir,
62 2 : }
63 2 : m.recycler.Init(o.MaxNumRecyclableLogs)
64 2 :
65 2 : closeAndReturnErr := func(err error) error {
66 0 : err = firstError(err, walDir.Close())
67 0 : return err
68 0 : }
69 2 : for _, ll := range initial {
70 2 : if m.recycler.MinRecycleLogNum() <= ll.Num {
71 2 : m.recycler.SetMinRecycleLogNum(ll.Num + 1)
72 2 : }
73 2 : m.initialObsolete, err = appendDeletableLogs(m.initialObsolete, ll)
74 2 : if err != nil {
75 0 : return closeAndReturnErr(err)
76 0 : }
77 : }
78 2 : return nil
79 : }
80 :
81 : // List implements Manager.
82 2 : func (m *StandaloneManager) List() Logs {
83 2 : m.mu.Lock()
84 2 : defer m.mu.Unlock()
85 2 : wals := make(Logs, len(m.mu.queue))
86 2 : for i := range m.mu.queue {
87 2 : wals[i] = LogicalLog{
88 2 : Num: NumWAL(m.mu.queue[i].FileNum),
89 2 : segments: []segment{{dir: m.o.Primary}},
90 2 : }
91 2 : }
92 2 : return wals
93 : }
94 :
95 : // Obsolete implements Manager.
96 : func (m *StandaloneManager) Obsolete(
97 : minUnflushedNum NumWAL, noRecycle bool,
98 2 : ) (toDelete []DeletableLog, err error) {
99 2 : m.mu.Lock()
100 2 : defer m.mu.Unlock()
101 2 :
102 2 : // If the DB was recently opened, we may have deletable logs outside the
103 2 : // queue.
104 2 : m.initialObsolete = slices.DeleteFunc(m.initialObsolete, func(dl DeletableLog) bool {
105 2 : if dl.NumWAL >= minUnflushedNum {
106 2 : return false
107 2 : }
108 2 : toDelete = append(toDelete, dl)
109 2 : return true
110 : })
111 :
112 2 : i := 0
113 2 : for ; i < len(m.mu.queue); i++ {
114 2 : fi := m.mu.queue[i]
115 2 : if fi.FileNum >= base.DiskFileNum(minUnflushedNum) {
116 2 : break
117 : }
118 2 : if noRecycle || !m.recycler.Add(fi) {
119 2 : toDelete = append(toDelete, DeletableLog{
120 2 : FS: m.o.Primary.FS,
121 2 : Path: m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(fi.FileNum), 000)),
122 2 : NumWAL: NumWAL(fi.FileNum),
123 2 : ApproxFileSize: fi.FileSize,
124 2 : })
125 2 : }
126 : }
127 2 : m.mu.queue = m.mu.queue[i:]
128 2 : return toDelete, nil
129 : }
130 :
131 : // Create implements Manager.
132 2 : func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) {
133 2 : // TODO(sumeer): check monotonicity of wn.
134 2 : newLogNum := base.DiskFileNum(wn)
135 2 : newLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(wn, 0))
136 2 :
137 2 : // Try to use a recycled log file. Recycling log files is an important
138 2 : // performance optimization as it is faster to sync a file that has
139 2 : // already been written, than one which is being written for the first
140 2 : // time. This is due to the need to sync file metadata when a file is
141 2 : // being written for the first time. Note this is true even if file
142 2 : // preallocation is performed (e.g. fallocate).
143 2 : var recycleLog base.FileInfo
144 2 : var recycleOK bool
145 2 : var newLogFile vfs.File
146 2 : var err error
147 2 : recycleLog, recycleOK = m.recycler.Peek()
148 2 : if recycleOK {
149 1 : recycleLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
150 1 : newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName, "pebble-wal")
151 1 : base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
152 2 : } else {
153 2 : newLogFile, err = m.o.Primary.FS.Create(newLogName, "pebble-wal")
154 2 : base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
155 2 : }
156 2 : createInfo := CreateInfo{
157 2 : JobID: jobID,
158 2 : Path: newLogName,
159 2 : IsSecondary: false,
160 2 : Num: wn,
161 2 : RecycledFileNum: recycleLog.FileNum,
162 2 : Err: nil,
163 2 : }
164 2 : defer func() {
165 2 : createInfo.Err = err
166 2 : m.o.EventListener.LogCreated(createInfo)
167 2 : }()
168 :
169 2 : if err != nil {
170 1 : return nil, err
171 1 : }
172 2 : var newLogSize uint64
173 2 : if recycleOK {
174 1 : // Figure out the recycled WAL size. This Stat is necessary
175 1 : // because ReuseForWrite's contract allows for removing the
176 1 : // old file and creating a new one. We don't know whether the
177 1 : // WAL was actually recycled.
178 1 : // TODO(jackson): Adding a boolean to the ReuseForWrite return
179 1 : // value indicating whether or not the file was actually
180 1 : // reused would allow us to skip the stat and use
181 1 : // recycleLog.FileSize.
182 1 : var finfo os.FileInfo
183 1 : finfo, err = newLogFile.Stat()
184 1 : if err == nil {
185 1 : newLogSize = uint64(finfo.Size())
186 1 : }
187 1 : err = firstError(err, m.recycler.Pop(recycleLog.FileNum))
188 1 : if err != nil {
189 0 : return nil, firstError(err, newLogFile.Close())
190 0 : }
191 : }
192 : // TODO(peter): RocksDB delays sync of the parent directory until the
193 : // first time the log is synced. Is that worthwhile?
194 2 : if err = m.walDir.Sync(); err != nil {
195 1 : err = firstError(err, newLogFile.Close())
196 1 : return nil, err
197 1 : }
198 2 : newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
199 2 : NoSyncOnClose: m.o.NoSyncOnClose,
200 2 : BytesPerSync: m.o.BytesPerSync,
201 2 : PreallocateSize: m.o.PreallocateSize(),
202 2 : })
203 2 : w := record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
204 2 : WALFsyncLatency: m.o.FsyncLatency,
205 2 : WALMinSyncInterval: m.o.MinSyncInterval,
206 2 : QueueSemChan: m.o.QueueSemChan,
207 2 : WriteWALSyncOffsets: m.o.WriteWALSyncOffsets,
208 2 : })
209 2 : m.w = &standaloneWriter{
210 2 : m: m,
211 2 : w: w,
212 2 : }
213 2 : m.mu.Lock()
214 2 : defer m.mu.Unlock()
215 2 : m.mu.queue = append(m.mu.queue, base.FileInfo{FileNum: newLogNum, FileSize: newLogSize})
216 2 : return m.w, nil
217 : }
218 :
219 : // ElevateWriteStallThresholdForFailover implements Manager.
220 2 : func (m *StandaloneManager) ElevateWriteStallThresholdForFailover() bool {
221 2 : return false
222 2 : }
223 :
224 : // Stats implements Manager.
225 2 : func (m *StandaloneManager) Stats() Stats {
226 2 : obsoleteLogsCount, obsoleteLogSize := m.recycler.Stats()
227 2 : m.mu.Lock()
228 2 : defer m.mu.Unlock()
229 2 : var fileSize uint64
230 2 : for i := range m.mu.queue {
231 2 : fileSize += m.mu.queue[i].FileSize
232 2 : }
233 2 : for i := range m.initialObsolete {
234 1 : if i == 0 || m.initialObsolete[i].NumWAL != m.initialObsolete[i-1].NumWAL {
235 1 : obsoleteLogsCount++
236 1 : }
237 1 : obsoleteLogSize += m.initialObsolete[i].ApproxFileSize
238 : }
239 2 : return Stats{
240 2 : ObsoleteFileCount: obsoleteLogsCount,
241 2 : ObsoleteFileSize: obsoleteLogSize,
242 2 : LiveFileCount: len(m.mu.queue),
243 2 : LiveFileSize: fileSize,
244 2 : }
245 : }
246 :
247 : // Close implements Manager.
248 2 : func (m *StandaloneManager) Close() error {
249 2 : var err error
250 2 : if m.w != nil {
251 1 : _, err = m.w.Close()
252 1 : }
253 2 : err = firstError(err, m.walDir.Close())
254 2 : if m.o.Primary.Lock != nil {
255 2 : err = firstError(err, m.o.Primary.Lock.Close())
256 2 : }
257 2 : return err
258 : }
259 :
260 : // RecyclerForTesting implements Manager.
261 1 : func (m *StandaloneManager) RecyclerForTesting() *LogRecycler {
262 1 : return &m.recycler
263 1 : }
264 :
265 : // firstError returns the first non-nil error of err0 and err1, or nil if both
266 : // are nil.
267 2 : func firstError(err0, err1 error) error {
268 2 : if err0 != nil {
269 1 : return err0
270 1 : }
271 2 : return err1
272 : }
273 :
274 : type standaloneWriter struct {
275 : m *StandaloneManager
276 : w *record.LogWriter
277 : }
278 :
279 : var _ Writer = &standaloneWriter{}
280 :
281 : // WriteRecord implements Writer.
282 : func (w *standaloneWriter) WriteRecord(
283 : p []byte, opts SyncOptions, _ RefCount,
284 2 : ) (logicalOffset int64, err error) {
285 2 : return w.w.SyncRecord(p, opts.Done, opts.Err)
286 2 : }
287 :
288 : // Close implements Writer.
289 2 : func (w *standaloneWriter) Close() (logicalOffset int64, err error) {
290 2 : logicalOffset = w.w.Size()
291 2 : // Close the log. This writes an EOF trailer signifying the end of the file
292 2 : // and syncs it to disk. The caller must close the previous log before
293 2 : // creating the new log file, otherwise a crash could leave both logs with
294 2 : // unclean tails, and DB.Open will treat the previous log as corrupt.
295 2 : err = w.w.Close()
296 2 : w.m.mu.Lock()
297 2 : defer w.m.mu.Unlock()
298 2 : i := len(w.m.mu.queue) - 1
299 2 : // The log may have grown past its original physical size. Update its file
300 2 : // size in the queue so we have a proper accounting of its file size.
301 2 : if w.m.mu.queue[i].FileSize < uint64(logicalOffset) {
302 2 : w.m.mu.queue[i].FileSize = uint64(logicalOffset)
303 2 : }
304 2 : w.m.w = nil
305 2 : return logicalOffset, err
306 : }
307 :
308 : // Metrics implements Writer.
309 2 : func (w *standaloneWriter) Metrics() record.LogWriterMetrics {
310 2 : return w.w.Metrics()
311 2 : }
|