Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package tool
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "io"
12 : "os"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble"
16 : "github.com/cockroachdb/pebble/batchrepr"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/rangekey"
19 : "github.com/cockroachdb/pebble/record"
20 : "github.com/cockroachdb/pebble/sstable"
21 : "github.com/cockroachdb/pebble/wal"
22 : "github.com/spf13/cobra"
23 : )
24 :
25 : // walT implements WAL-level tools, including both configuration state and the
26 : // commands themselves.
27 : type walT struct {
28 : Root *cobra.Command
29 : Dump *cobra.Command
30 : DumpMerged *cobra.Command
31 :
32 : opts *pebble.Options
33 : fmtKey keyFormatter
34 : fmtValue valueFormatter
35 :
36 : defaultComparer string
37 : comparers sstable.Comparers
38 : verbose bool
39 : }
40 :
41 1 : func newWAL(opts *pebble.Options, comparers sstable.Comparers, defaultComparer string) *walT {
42 1 : w := &walT{
43 1 : opts: opts,
44 1 : }
45 1 : w.fmtKey.mustSet("quoted")
46 1 : w.fmtValue.mustSet("size")
47 1 : w.comparers = comparers
48 1 : w.defaultComparer = defaultComparer
49 1 :
50 1 : w.Root = &cobra.Command{
51 1 : Use: "wal",
52 1 : Short: "WAL introspection tools",
53 1 : }
54 1 : w.Dump = &cobra.Command{
55 1 : Use: "dump <wal-files>",
56 1 : Short: "print WAL contents",
57 1 : Long: `
58 1 : Print the contents of the WAL files.
59 1 : `,
60 1 : Args: cobra.MinimumNArgs(1),
61 1 : Run: w.runDump,
62 1 : }
63 1 : w.DumpMerged = &cobra.Command{
64 1 : Use: "dump-merged <wal-files>",
65 1 : Short: "print WAL contents",
66 1 : Long: `
67 1 : Print the merged contents of multiple WAL segment files that
68 1 : together form a single logical WAL.
69 1 : `,
70 1 : Args: cobra.MinimumNArgs(1),
71 1 : Run: w.runDumpMerged,
72 1 : }
73 1 :
74 1 : w.Root.AddCommand(w.Dump)
75 1 : w.Root.AddCommand(w.DumpMerged)
76 1 : w.Root.PersistentFlags().BoolVarP(&w.verbose, "verbose", "v", false, "verbose output")
77 1 :
78 1 : w.Dump.Flags().Var(
79 1 : &w.fmtKey, "key", "key formatter")
80 1 : w.Dump.Flags().Var(
81 1 : &w.fmtValue, "value", "value formatter")
82 1 : return w
83 1 : }
84 :
85 : type errAndArg struct {
86 : err error
87 : arg string
88 : }
89 :
90 1 : func (w *walT) runDump(cmd *cobra.Command, args []string) {
91 1 : stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
92 1 : w.fmtKey.setForComparer(w.defaultComparer, w.comparers)
93 1 : w.fmtValue.setForComparer(w.defaultComparer, w.comparers)
94 1 : var errs []errAndArg
95 1 : logErr := func(arg string, offset int64, err error) {
96 0 : err = errors.Wrapf(err, "%s: offset %d: ", arg, offset)
97 0 : fmt.Fprintf(stderr, "%s\n", err)
98 0 : errs = append(errs, errAndArg{
99 0 : arg: arg,
100 0 : err: err,
101 0 : })
102 0 : }
103 :
104 1 : for _, arg := range args {
105 1 : func() {
106 1 : // Parse the filename in order to extract the file number. This is
107 1 : // necessary in case WAL recycling was used (which it is usually is). If
108 1 : // we can't parse the filename or it isn't a log file, we'll plow ahead
109 1 : // anyways (which will likely fail when we try to read the file).
110 1 : fileNum, _, ok := wal.ParseLogFilename(arg)
111 1 : if !ok {
112 0 : fileNum = 0
113 0 : }
114 :
115 1 : f, err := w.opts.FS.Open(arg)
116 1 : if err != nil {
117 0 : fmt.Fprintf(stderr, "%s\n", err)
118 0 : return
119 0 : }
120 1 : defer f.Close()
121 1 :
122 1 : fmt.Fprintf(stdout, "%s\n", arg)
123 1 :
124 1 : var b pebble.Batch
125 1 : var buf bytes.Buffer
126 1 : rr := record.NewReader(f, base.DiskFileNum(fileNum))
127 1 : for {
128 1 : offset := rr.Offset()
129 1 : r, err := rr.Next()
130 1 : if err == nil {
131 1 : buf.Reset()
132 1 : _, err = io.Copy(&buf, r)
133 1 : }
134 1 : if err != nil {
135 1 : // It is common to encounter a zeroed or invalid chunk due to WAL
136 1 : // preallocation and WAL recycling. We need to distinguish these
137 1 : // errors from EOF in order to recognize that the record was
138 1 : // truncated, but want to otherwise treat them like EOF.
139 1 : switch {
140 0 : case errors.Is(err, record.ErrZeroedChunk):
141 0 : fmt.Fprintf(stdout, "EOF [%s] (may be due to WAL preallocation)\n", err)
142 0 : case errors.Is(err, record.ErrInvalidChunk):
143 0 : fmt.Fprintf(stdout, "EOF [%s] (may be due to WAL recycling)\n", err)
144 1 : default:
145 1 : fmt.Fprintf(stdout, "%s\n", err)
146 : }
147 1 : return
148 : }
149 :
150 1 : b = pebble.Batch{}
151 1 : if err := b.SetRepr(buf.Bytes()); err != nil {
152 0 : fmt.Fprintf(stdout, "corrupt batch within log file %q: %v", arg, err)
153 0 : return
154 0 : }
155 1 : fmt.Fprintf(stdout, "%d(%d) seq=%d count=%d, len=%d\n",
156 1 : offset, len(b.Repr()), b.SeqNum(), b.Count(), buf.Len())
157 1 : w.dumpBatch(stdout, &b, b.Reader(), func(err error) {
158 0 : logErr(arg, offset, err)
159 0 : })
160 : }
161 : }()
162 : }
163 1 : if len(errs) > 0 {
164 0 : fmt.Fprintln(stderr, "Errors: ")
165 0 : for _, ea := range errs {
166 0 : fmt.Fprintf(stderr, "%s: %s\n", ea.arg, ea.err)
167 0 : }
168 : }
169 : }
170 :
171 0 : func (w *walT) runDumpMerged(cmd *cobra.Command, args []string) {
172 0 : stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
173 0 : w.fmtKey.setForComparer(w.defaultComparer, w.comparers)
174 0 : w.fmtValue.setForComparer(w.defaultComparer, w.comparers)
175 0 : var a wal.FileAccumulator
176 0 : for _, arg := range args {
177 0 : isLog, err := a.MaybeAccumulate(w.opts.FS, arg)
178 0 : if !isLog {
179 0 : fmt.Fprintf(stderr, "%q does not parse as a log file\n", arg)
180 0 : os.Exit(1)
181 0 : } else if err != nil {
182 0 : fmt.Fprintf(stderr, "%s: %s\n", arg, err)
183 0 : os.Exit(1)
184 0 : }
185 : }
186 0 : logs := a.Finish()
187 0 : var errs []error
188 0 : for _, log := range logs {
189 0 : fmt.Fprintf(stdout, "log file %s contains %d segment files:\n", log.Num, log.NumSegments())
190 0 : errs = append(errs, w.runDumpMergedOne(cmd, log)...)
191 0 : }
192 0 : if len(errs) > 0 {
193 0 : fmt.Fprintln(stderr, "Errors: ")
194 0 : for _, err := range errs {
195 0 : fmt.Fprintf(stderr, "%s\n", err)
196 0 : }
197 : }
198 : }
199 :
200 0 : func (w *walT) runDumpMergedOne(cmd *cobra.Command, ll wal.LogicalLog) []error {
201 0 : stdout, stderr := cmd.OutOrStdout(), cmd.OutOrStderr()
202 0 : var buf bytes.Buffer
203 0 : var errs []error
204 0 : var b pebble.Batch
205 0 :
206 0 : logErr := func(offset wal.Offset, err error) {
207 0 : err = errors.Wrapf(err, "offset %s: ", offset)
208 0 : fmt.Fprintln(stderr, err)
209 0 : errs = append(errs, err)
210 0 : }
211 :
212 0 : rr := ll.OpenForRead()
213 0 : for {
214 0 : buf.Reset()
215 0 : r, offset, err := rr.NextRecord()
216 0 : if err == nil {
217 0 : _, err = io.Copy(&buf, r)
218 0 : }
219 0 : if err != nil {
220 0 : // It is common to encounter a zeroed or invalid chunk due to WAL
221 0 : // preallocation and WAL recycling. We need to distinguish these
222 0 : // errors from EOF in order to recognize that the record was
223 0 : // truncated and to avoid replaying subsequent WALs, but want
224 0 : // to otherwise treat them like EOF.
225 0 : if err == io.EOF {
226 0 : break
227 0 : } else if record.IsInvalidRecord(err) {
228 0 : break
229 : }
230 0 : return append(errs, err)
231 : }
232 0 : if buf.Len() < batchrepr.HeaderLen {
233 0 : logErr(offset, errors.Newf("%d-byte batch too short", buf.Len()))
234 0 : continue
235 : }
236 0 : b = pebble.Batch{}
237 0 : if err := b.SetRepr(buf.Bytes()); err != nil {
238 0 : logErr(offset, errors.Newf("unable to parse batch: %x", buf.Bytes()))
239 0 : continue
240 : }
241 0 : fmt.Fprintf(stdout, "%s(%d) seq=%d count=%d, len=%d\n",
242 0 : offset, len(b.Repr()), b.SeqNum(), b.Count(), buf.Len())
243 0 : w.dumpBatch(stdout, &b, b.Reader(), func(err error) {
244 0 : logErr(offset, err)
245 0 : })
246 : }
247 0 : return nil
248 : }
249 :
250 : func (w *walT) dumpBatch(
251 : stdout io.Writer, b *pebble.Batch, r batchrepr.Reader, logErr func(error),
252 1 : ) {
253 1 : for idx := 0; ; idx++ {
254 1 : kind, ukey, value, ok, err := r.Next()
255 1 : if !ok {
256 1 : if err != nil {
257 0 : logErr(errors.Newf("unable to decode %d'th key in batch; %s", idx, err))
258 0 : }
259 1 : break
260 : }
261 1 : fmt.Fprintf(stdout, " %s(", kind)
262 1 : switch kind {
263 1 : case base.InternalKeyKindDelete:
264 1 : fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey))
265 1 : case base.InternalKeyKindSet:
266 1 : fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtValue.fn(ukey, value))
267 0 : case base.InternalKeyKindMerge:
268 0 : fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtValue.fn(ukey, value))
269 0 : case base.InternalKeyKindLogData:
270 0 : fmt.Fprintf(stdout, "<%d>", len(value))
271 0 : case base.InternalKeyKindIngestSST:
272 0 : fileNum, _ := binary.Uvarint(ukey)
273 0 : fmt.Fprintf(stdout, "%s", base.FileNum(fileNum))
274 0 : case base.InternalKeyKindExcise:
275 0 : fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtKey.fn(value))
276 0 : case base.InternalKeyKindSingleDelete:
277 0 : fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey))
278 0 : case base.InternalKeyKindSetWithDelete:
279 0 : fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey))
280 0 : case base.InternalKeyKindRangeDelete:
281 0 : fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtKey.fn(value))
282 1 : case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
283 1 : ik := base.MakeInternalKey(ukey, b.SeqNum()+base.SeqNum(idx), kind)
284 1 : s, err := rangekey.Decode(ik, value, nil)
285 1 : if err != nil {
286 0 : logErr(errors.Newf("%s: error decoding %s", w.fmtKey.fn(ukey), err))
287 1 : } else {
288 1 : fmt.Fprintf(stdout, "%s", s.Pretty(w.fmtKey.fn))
289 1 : }
290 0 : case base.InternalKeyKindDeleteSized:
291 0 : v, _ := binary.Uvarint(value)
292 0 : fmt.Fprintf(stdout, "%s,%d", w.fmtKey.fn(ukey), v)
293 0 : default:
294 0 : err := errors.Newf("invalid key kind %d in key at index %d/%d of batch with seqnum %d at offset %d",
295 0 : kind, idx, b.Count(), b.SeqNum())
296 0 : fmt.Fprintf(stdout, "<error: %s>", err)
297 0 : logErr(err)
298 : }
299 1 : fmt.Fprintf(stdout, ")\n")
300 : }
301 : }
|