Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "os"
9 : "sync"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/record"
13 : "github.com/cockroachdb/pebble/vfs"
14 : )
15 :
16 : // StandaloneManager implements Manager with a single log file per WAL (no
17 : // failover capability).
18 : type StandaloneManager struct {
19 : o Options
20 : recycler LogRecycler
21 : walDir vfs.File
22 : // initialObsolete holds the set of DeletableLogs that formed the logs
23 : // passed into Init. The initialObsolete logs are all obsolete. Once
24 : // returned via Manager.Obsolete, initialObsolete is cleared. The
25 : // initialObsolete logs are stored separately from mu.queue because they may
26 : // include logs that were NOT created by the standalone manager, and
27 : // multiple physical log files may form one logical WAL.
28 : initialObsolete []DeletableLog
29 :
30 : // External synchronization is relied on when accessing w in Manager.Create,
31 : // Writer.{WriteRecord,Close}.
32 : w *standaloneWriter
33 :
34 : mu struct {
35 : sync.Mutex
36 : // The queue of WALs, containing both flushed and unflushed WALs. The
37 : // FileInfo.FileNum is also the NumWAL, since there is one log file for
38 : // each WAL. The flushed logs are a prefix, the unflushed logs a suffix.
39 : // If w != nil, the last entry here is that active WAL. For the active
40 : // log, FileInfo.FileSize is the size when it was opened and can be
41 : // greater than zero because of log recycling.
42 : queue []base.FileInfo
43 : }
44 : }
45 :
46 : var _ Manager = &StandaloneManager{}
47 :
48 : // init implements Manager.
49 2 : func (m *StandaloneManager) init(o Options, initial Logs) error {
50 2 : if o.Secondary.FS != nil {
51 0 : return base.AssertionFailedf("cannot create StandaloneManager with a secondary")
52 0 : }
53 2 : var err error
54 2 : var walDir vfs.File
55 2 : if walDir, err = o.Primary.FS.OpenDir(o.Primary.Dirname); err != nil {
56 1 : return err
57 1 : }
58 2 : *m = StandaloneManager{
59 2 : o: o,
60 2 : walDir: walDir,
61 2 : }
62 2 : m.recycler.Init(o.MaxNumRecyclableLogs)
63 2 :
64 2 : closeAndReturnErr := func(err error) error {
65 0 : err = firstError(err, walDir.Close())
66 0 : return err
67 0 : }
68 2 : for _, ll := range initial {
69 2 : if m.recycler.MinRecycleLogNum() <= ll.Num {
70 2 : m.recycler.SetMinRecycleLogNum(ll.Num + 1)
71 2 : }
72 2 : m.initialObsolete, err = appendDeletableLogs(m.initialObsolete, ll)
73 2 : if err != nil {
74 0 : return closeAndReturnErr(err)
75 0 : }
76 : }
77 2 : return nil
78 : }
79 :
80 : // List implements Manager.
81 2 : func (m *StandaloneManager) List() (Logs, error) {
82 2 : m.mu.Lock()
83 2 : defer m.mu.Unlock()
84 2 : wals := make(Logs, len(m.mu.queue))
85 2 : for i := range m.mu.queue {
86 2 : wals[i] = LogicalLog{
87 2 : Num: NumWAL(m.mu.queue[i].FileNum),
88 2 : segments: []segment{{dir: m.o.Primary}},
89 2 : }
90 2 : }
91 2 : return wals, nil
92 : }
93 :
94 : // Obsolete implements Manager.
95 : func (m *StandaloneManager) Obsolete(
96 : minUnflushedNum NumWAL, noRecycle bool,
97 2 : ) (toDelete []DeletableLog, err error) {
98 2 : m.mu.Lock()
99 2 : defer m.mu.Unlock()
100 2 :
101 2 : // If this is the first call to Obsolete after Open, we may have deletable
102 2 : // logs outside the queue.
103 2 : toDelete, m.initialObsolete = m.initialObsolete, nil
104 2 :
105 2 : i := 0
106 2 : for ; i < len(m.mu.queue); i++ {
107 2 : fi := m.mu.queue[i]
108 2 : if fi.FileNum >= base.DiskFileNum(minUnflushedNum) {
109 2 : break
110 : }
111 2 : if noRecycle || !m.recycler.Add(fi) {
112 2 : toDelete = append(toDelete, DeletableLog{
113 2 : FS: m.o.Primary.FS,
114 2 : Path: m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(fi.FileNum), 000)),
115 2 : NumWAL: NumWAL(fi.FileNum),
116 2 : ApproxFileSize: fi.FileSize,
117 2 : })
118 2 : }
119 : }
120 2 : m.mu.queue = m.mu.queue[i:]
121 2 : return toDelete, nil
122 : }
123 :
124 : // Create implements Manager.
125 2 : func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) {
126 2 : // TODO(sumeer): check monotonicity of wn.
127 2 : newLogNum := base.DiskFileNum(wn)
128 2 : newLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(wn, 0))
129 2 :
130 2 : // Try to use a recycled log file. Recycling log files is an important
131 2 : // performance optimization as it is faster to sync a file that has
132 2 : // already been written, than one which is being written for the first
133 2 : // time. This is due to the need to sync file metadata when a file is
134 2 : // being written for the first time. Note this is true even if file
135 2 : // preallocation is performed (e.g. fallocate).
136 2 : var recycleLog base.FileInfo
137 2 : var recycleOK bool
138 2 : var newLogFile vfs.File
139 2 : var err error
140 2 : recycleLog, recycleOK = m.recycler.Peek()
141 2 : if recycleOK {
142 1 : recycleLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
143 1 : newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName, "pebble-wal")
144 1 : base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
145 2 : } else {
146 2 : newLogFile, err = m.o.Primary.FS.Create(newLogName, "pebble-wal")
147 2 : base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
148 2 : }
149 2 : createInfo := CreateInfo{
150 2 : JobID: jobID,
151 2 : Path: newLogName,
152 2 : IsSecondary: false,
153 2 : Num: wn,
154 2 : RecycledFileNum: recycleLog.FileNum,
155 2 : Err: nil,
156 2 : }
157 2 : defer func() {
158 2 : createInfo.Err = err
159 2 : m.o.EventListener.LogCreated(createInfo)
160 2 : }()
161 :
162 2 : if err != nil {
163 1 : return nil, err
164 1 : }
165 2 : var newLogSize uint64
166 2 : if recycleOK {
167 1 : // Figure out the recycled WAL size. This Stat is necessary
168 1 : // because ReuseForWrite's contract allows for removing the
169 1 : // old file and creating a new one. We don't know whether the
170 1 : // WAL was actually recycled.
171 1 : // TODO(jackson): Adding a boolean to the ReuseForWrite return
172 1 : // value indicating whether or not the file was actually
173 1 : // reused would allow us to skip the stat and use
174 1 : // recycleLog.FileSize.
175 1 : var finfo os.FileInfo
176 1 : finfo, err = newLogFile.Stat()
177 1 : if err == nil {
178 1 : newLogSize = uint64(finfo.Size())
179 1 : }
180 1 : err = firstError(err, m.recycler.Pop(recycleLog.FileNum))
181 1 : if err != nil {
182 0 : return nil, firstError(err, newLogFile.Close())
183 0 : }
184 : }
185 : // TODO(peter): RocksDB delays sync of the parent directory until the
186 : // first time the log is synced. Is that worthwhile?
187 2 : if err = m.walDir.Sync(); err != nil {
188 1 : err = firstError(err, newLogFile.Close())
189 1 : return nil, err
190 1 : }
191 2 : newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
192 2 : NoSyncOnClose: m.o.NoSyncOnClose,
193 2 : BytesPerSync: m.o.BytesPerSync,
194 2 : PreallocateSize: m.o.PreallocateSize(),
195 2 : })
196 2 : w := record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
197 2 : WALFsyncLatency: m.o.FsyncLatency,
198 2 : WALMinSyncInterval: m.o.MinSyncInterval,
199 2 : QueueSemChan: m.o.QueueSemChan,
200 2 : })
201 2 : m.w = &standaloneWriter{
202 2 : m: m,
203 2 : w: w,
204 2 : }
205 2 : m.mu.Lock()
206 2 : defer m.mu.Unlock()
207 2 : m.mu.queue = append(m.mu.queue, base.FileInfo{FileNum: newLogNum, FileSize: newLogSize})
208 2 : return m.w, nil
209 : }
210 :
211 : // ElevateWriteStallThresholdForFailover implements Manager.
212 2 : func (m *StandaloneManager) ElevateWriteStallThresholdForFailover() bool {
213 2 : return false
214 2 : }
215 :
216 : // Stats implements Manager.
217 2 : func (m *StandaloneManager) Stats() Stats {
218 2 : obsoleteLogsCount, obsoleteLogSize := m.recycler.Stats()
219 2 : m.mu.Lock()
220 2 : defer m.mu.Unlock()
221 2 : var fileSize uint64
222 2 : for i := range m.mu.queue {
223 2 : fileSize += m.mu.queue[i].FileSize
224 2 : }
225 2 : for i := range m.initialObsolete {
226 1 : if i == 0 || m.initialObsolete[i].NumWAL != m.initialObsolete[i-1].NumWAL {
227 1 : obsoleteLogsCount++
228 1 : }
229 1 : obsoleteLogSize += m.initialObsolete[i].ApproxFileSize
230 : }
231 2 : return Stats{
232 2 : ObsoleteFileCount: obsoleteLogsCount,
233 2 : ObsoleteFileSize: obsoleteLogSize,
234 2 : LiveFileCount: len(m.mu.queue),
235 2 : LiveFileSize: fileSize,
236 2 : }
237 : }
238 :
239 : // Close implements Manager.
240 2 : func (m *StandaloneManager) Close() error {
241 2 : var err error
242 2 : if m.w != nil {
243 1 : _, err = m.w.Close()
244 1 : }
245 2 : return firstError(err, m.walDir.Close())
246 : }
247 :
248 : // RecyclerForTesting implements Manager.
249 1 : func (m *StandaloneManager) RecyclerForTesting() *LogRecycler {
250 1 : return &m.recycler
251 1 : }
252 :
253 : // firstError returns the first non-nil error of err0 and err1, or nil if both
254 : // are nil.
255 2 : func firstError(err0, err1 error) error {
256 2 : if err0 != nil {
257 1 : return err0
258 1 : }
259 2 : return err1
260 : }
261 :
262 : type standaloneWriter struct {
263 : m *StandaloneManager
264 : w *record.LogWriter
265 : }
266 :
267 : var _ Writer = &standaloneWriter{}
268 :
269 : // WriteRecord implements Writer.
270 : func (w *standaloneWriter) WriteRecord(
271 : p []byte, opts SyncOptions, _ RefCount,
272 2 : ) (logicalOffset int64, err error) {
273 2 : return w.w.SyncRecord(p, opts.Done, opts.Err)
274 2 : }
275 :
276 : // Close implements Writer.
277 2 : func (w *standaloneWriter) Close() (logicalOffset int64, err error) {
278 2 : logicalOffset = w.w.Size()
279 2 : // Close the log. This writes an EOF trailer signifying the end of the file
280 2 : // and syncs it to disk. The caller must close the previous log before
281 2 : // creating the new log file, otherwise a crash could leave both logs with
282 2 : // unclean tails, and DB.Open will treat the previous log as corrupt.
283 2 : err = w.w.Close()
284 2 : w.m.mu.Lock()
285 2 : defer w.m.mu.Unlock()
286 2 : i := len(w.m.mu.queue) - 1
287 2 : // The log may have grown past its original physical size. Update its file
288 2 : // size in the queue so we have a proper accounting of its file size.
289 2 : if w.m.mu.queue[i].FileSize < uint64(logicalOffset) {
290 2 : w.m.mu.queue[i].FileSize = uint64(logicalOffset)
291 2 : }
292 2 : w.m.w = nil
293 2 : return logicalOffset, err
294 : }
295 :
296 : // Metrics implements Writer.
297 2 : func (w *standaloneWriter) Metrics() record.LogWriterMetrics {
298 2 : return w.w.Metrics()
299 2 : }
|