Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "os"
9 : "sync"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/record"
13 : "github.com/cockroachdb/pebble/vfs"
14 : )
15 :
16 : // StandaloneManager implements Manager with a single log file per WAL (no
17 : // failover capability).
18 : type StandaloneManager struct {
19 : o Options
20 : recycler LogRecycler
21 : walDir vfs.File
22 : // initialObsolete holds the set of DeletableLogs that formed the logs
23 : // passed into Init. The initialObsolete logs are all obsolete. Once
24 : // returned via Manager.Obsolete, initialObsolete is cleared. The
25 : // initialObsolete logs are stored separately from mu.queue because they may
26 : // include logs that were NOT created by the standalone manager, and
27 : // multiple physical log files may form one logical WAL.
28 : initialObsolete []DeletableLog
29 :
30 : // External synchronization is relied on when accessing w in Manager.Create,
31 : // Writer.{WriteRecord,Close}.
32 : w *standaloneWriter
33 :
34 : mu struct {
35 : sync.Mutex
36 : // The queue of WALs, containing both flushed and unflushed WALs. The
37 : // FileInfo.FileNum is also the NumWAL, since there is one log file for
38 : // each WAL. The flushed logs are a prefix, the unflushed logs a suffix.
39 : // If w != nil, the last entry here is that active WAL. For the active
40 : // log, FileInfo.FileSize is the size when it was opened and can be
41 : // greater than zero because of log recycling.
42 : queue []base.FileInfo
43 : }
44 : }
45 :
46 : var _ Manager = &StandaloneManager{}
47 :
48 : // init implements Manager.
49 1 : func (m *StandaloneManager) init(o Options, initial Logs) error {
50 1 : if o.Secondary.FS != nil {
51 0 : return base.AssertionFailedf("cannot create StandaloneManager with a secondary")
52 0 : }
53 1 : var err error
54 1 : var walDir vfs.File
55 1 : if walDir, err = o.Primary.FS.OpenDir(o.Primary.Dirname); err != nil {
56 0 : return err
57 0 : }
58 1 : *m = StandaloneManager{
59 1 : o: o,
60 1 : walDir: walDir,
61 1 : }
62 1 : m.recycler.Init(o.MaxNumRecyclableLogs)
63 1 :
64 1 : closeAndReturnErr := func(err error) error {
65 0 : err = firstError(err, walDir.Close())
66 0 : return err
67 0 : }
68 1 : for _, ll := range initial {
69 1 : if m.recycler.MinRecycleLogNum() <= ll.Num {
70 1 : m.recycler.SetMinRecycleLogNum(ll.Num + 1)
71 1 : }
72 1 : m.initialObsolete, err = appendDeletableLogs(m.initialObsolete, ll)
73 1 : if err != nil {
74 0 : return closeAndReturnErr(err)
75 0 : }
76 : }
77 1 : return nil
78 : }
79 :
80 : // List implements Manager.
81 1 : func (m *StandaloneManager) List() (Logs, error) {
82 1 : m.mu.Lock()
83 1 : defer m.mu.Unlock()
84 1 : wals := make(Logs, len(m.mu.queue))
85 1 : for i := range m.mu.queue {
86 1 : wals[i] = LogicalLog{
87 1 : Num: NumWAL(m.mu.queue[i].FileNum),
88 1 : segments: []segment{{dir: m.o.Primary}},
89 1 : }
90 1 : }
91 1 : return wals, nil
92 : }
93 :
94 : // Obsolete implements Manager.
95 : func (m *StandaloneManager) Obsolete(
96 : minUnflushedNum NumWAL, noRecycle bool,
97 1 : ) (toDelete []DeletableLog, err error) {
98 1 : m.mu.Lock()
99 1 : defer m.mu.Unlock()
100 1 :
101 1 : // If this is the first call to Obsolete after Open, we may have deletable
102 1 : // logs outside the queue.
103 1 : toDelete, m.initialObsolete = m.initialObsolete, nil
104 1 :
105 1 : i := 0
106 1 : for ; i < len(m.mu.queue); i++ {
107 1 : fi := m.mu.queue[i]
108 1 : if fi.FileNum >= base.DiskFileNum(minUnflushedNum) {
109 1 : break
110 : }
111 1 : if noRecycle || !m.recycler.Add(fi) {
112 1 : toDelete = append(toDelete, DeletableLog{
113 1 : FS: m.o.Primary.FS,
114 1 : Path: m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(fi.FileNum), 000)),
115 1 : NumWAL: NumWAL(fi.FileNum),
116 1 : ApproxFileSize: fi.FileSize,
117 1 : })
118 1 : }
119 : }
120 1 : m.mu.queue = m.mu.queue[i:]
121 1 : return toDelete, nil
122 : }
123 :
124 : // Create implements Manager.
125 1 : func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) {
126 1 : // TODO(sumeer): check monotonicity of wn.
127 1 : newLogNum := base.DiskFileNum(wn)
128 1 : newLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(wn, 0))
129 1 :
130 1 : // Try to use a recycled log file. Recycling log files is an important
131 1 : // performance optimization as it is faster to sync a file that has
132 1 : // already been written, than one which is being written for the first
133 1 : // time. This is due to the need to sync file metadata when a file is
134 1 : // being written for the first time. Note this is true even if file
135 1 : // preallocation is performed (e.g. fallocate).
136 1 : var recycleLog base.FileInfo
137 1 : var recycleOK bool
138 1 : var newLogFile vfs.File
139 1 : var err error
140 1 : recycleLog, recycleOK = m.recycler.Peek()
141 1 : if recycleOK {
142 0 : recycleLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
143 0 : newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName, "pebble-wal")
144 0 : base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
145 1 : } else {
146 1 : newLogFile, err = m.o.Primary.FS.Create(newLogName, "pebble-wal")
147 1 : base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
148 1 : }
149 1 : createInfo := CreateInfo{
150 1 : JobID: jobID,
151 1 : Path: newLogName,
152 1 : IsSecondary: false,
153 1 : Num: wn,
154 1 : RecycledFileNum: recycleLog.FileNum,
155 1 : Err: nil,
156 1 : }
157 1 : defer func() {
158 1 : createInfo.Err = err
159 1 : m.o.EventListener.LogCreated(createInfo)
160 1 : }()
161 :
162 1 : if err != nil {
163 0 : return nil, err
164 0 : }
165 1 : var newLogSize uint64
166 1 : if recycleOK {
167 0 : // Figure out the recycled WAL size. This Stat is necessary
168 0 : // because ReuseForWrite's contract allows for removing the
169 0 : // old file and creating a new one. We don't know whether the
170 0 : // WAL was actually recycled.
171 0 : // TODO(jackson): Adding a boolean to the ReuseForWrite return
172 0 : // value indicating whether or not the file was actually
173 0 : // reused would allow us to skip the stat and use
174 0 : // recycleLog.FileSize.
175 0 : var finfo os.FileInfo
176 0 : finfo, err = newLogFile.Stat()
177 0 : if err == nil {
178 0 : newLogSize = uint64(finfo.Size())
179 0 : }
180 0 : err = firstError(err, m.recycler.Pop(recycleLog.FileNum))
181 0 : if err != nil {
182 0 : return nil, firstError(err, newLogFile.Close())
183 0 : }
184 : }
185 : // TODO(peter): RocksDB delays sync of the parent directory until the
186 : // first time the log is synced. Is that worthwhile?
187 1 : if err = m.walDir.Sync(); err != nil {
188 0 : err = firstError(err, newLogFile.Close())
189 0 : return nil, err
190 0 : }
191 1 : newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
192 1 : NoSyncOnClose: m.o.NoSyncOnClose,
193 1 : BytesPerSync: m.o.BytesPerSync,
194 1 : PreallocateSize: m.o.PreallocateSize(),
195 1 : })
196 1 : w := record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
197 1 : WALFsyncLatency: m.o.FsyncLatency,
198 1 : WALMinSyncInterval: m.o.MinSyncInterval,
199 1 : QueueSemChan: m.o.QueueSemChan,
200 1 : })
201 1 : m.w = &standaloneWriter{
202 1 : m: m,
203 1 : w: w,
204 1 : }
205 1 : m.mu.Lock()
206 1 : defer m.mu.Unlock()
207 1 : m.mu.queue = append(m.mu.queue, base.FileInfo{FileNum: newLogNum, FileSize: newLogSize})
208 1 : return m.w, nil
209 : }
210 :
211 : // ElevateWriteStallThresholdForFailover implements Manager.
212 1 : func (m *StandaloneManager) ElevateWriteStallThresholdForFailover() bool {
213 1 : return false
214 1 : }
215 :
216 : // Stats implements Manager.
217 1 : func (m *StandaloneManager) Stats() Stats {
218 1 : obsoleteLogsCount, obsoleteLogSize := m.recycler.Stats()
219 1 : m.mu.Lock()
220 1 : defer m.mu.Unlock()
221 1 : var fileSize uint64
222 1 : for i := range m.mu.queue {
223 1 : fileSize += m.mu.queue[i].FileSize
224 1 : }
225 1 : for i := range m.initialObsolete {
226 0 : if i == 0 || m.initialObsolete[i].NumWAL != m.initialObsolete[i-1].NumWAL {
227 0 : obsoleteLogsCount++
228 0 : }
229 0 : obsoleteLogSize += m.initialObsolete[i].ApproxFileSize
230 : }
231 1 : return Stats{
232 1 : ObsoleteFileCount: obsoleteLogsCount,
233 1 : ObsoleteFileSize: obsoleteLogSize,
234 1 : LiveFileCount: len(m.mu.queue),
235 1 : LiveFileSize: fileSize,
236 1 : }
237 : }
238 :
239 : // Close implements Manager.
240 1 : func (m *StandaloneManager) Close() error {
241 1 : var err error
242 1 : if m.w != nil {
243 0 : _, err = m.w.Close()
244 0 : }
245 1 : return firstError(err, m.walDir.Close())
246 : }
247 :
248 : // RecyclerForTesting implements Manager.
249 0 : func (m *StandaloneManager) RecyclerForTesting() *LogRecycler {
250 0 : return &m.recycler
251 0 : }
252 :
253 : // firstError returns the first non-nil error of err0 and err1, or nil if both
254 : // are nil.
255 1 : func firstError(err0, err1 error) error {
256 1 : if err0 != nil {
257 0 : return err0
258 0 : }
259 1 : return err1
260 : }
261 :
262 : type standaloneWriter struct {
263 : m *StandaloneManager
264 : w *record.LogWriter
265 : }
266 :
267 : var _ Writer = &standaloneWriter{}
268 :
269 : // WriteRecord implements Writer.
270 : func (w *standaloneWriter) WriteRecord(
271 : p []byte, opts SyncOptions, _ RefCount,
272 1 : ) (logicalOffset int64, err error) {
273 1 : return w.w.SyncRecord(p, opts.Done, opts.Err)
274 1 : }
275 :
276 : // Close implements Writer.
277 1 : func (w *standaloneWriter) Close() (logicalOffset int64, err error) {
278 1 : logicalOffset = w.w.Size()
279 1 : // Close the log. This writes an EOF trailer signifying the end of the file
280 1 : // and syncs it to disk. The caller must close the previous log before
281 1 : // creating the new log file, otherwise a crash could leave both logs with
282 1 : // unclean tails, and DB.Open will treat the previous log as corrupt.
283 1 : err = w.w.Close()
284 1 : w.m.mu.Lock()
285 1 : defer w.m.mu.Unlock()
286 1 : i := len(w.m.mu.queue) - 1
287 1 : // The log may have grown past its original physical size. Update its file
288 1 : // size in the queue so we have a proper accounting of its file size.
289 1 : if w.m.mu.queue[i].FileSize < uint64(logicalOffset) {
290 1 : w.m.mu.queue[i].FileSize = uint64(logicalOffset)
291 1 : }
292 1 : w.m.w = nil
293 1 : return logicalOffset, err
294 : }
295 :
296 : // Metrics implements Writer.
297 1 : func (w *standaloneWriter) Metrics() record.LogWriterMetrics {
298 1 : return w.w.Metrics()
299 1 : }
|