Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "time"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/humanize"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/redact"
19 : )
20 :
21 : // TableInfo exports the manifest.TableInfo type.
22 : type TableInfo = manifest.TableInfo
23 :
24 1 : func tablesTotalSize(tables []TableInfo) uint64 {
25 1 : var size uint64
26 1 : for i := range tables {
27 1 : size += tables[i].Size
28 1 : }
29 1 : return size
30 : }
31 :
32 1 : func formatFileNums(tables []TableInfo) string {
33 1 : var buf strings.Builder
34 1 : for i := range tables {
35 1 : if i > 0 {
36 1 : buf.WriteString(" ")
37 1 : }
38 1 : buf.WriteString(tables[i].FileNum.String())
39 : }
40 1 : return buf.String()
41 : }
42 :
43 : // LevelInfo contains info pertaining to a particular level.
44 : type LevelInfo struct {
45 : Level int
46 : Tables []TableInfo
47 : Score float64
48 : }
49 :
50 0 : func (i LevelInfo) String() string {
51 0 : return redact.StringWithoutMarkers(i)
52 0 : }
53 :
54 : // SafeFormat implements redact.SafeFormatter.
55 1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
56 1 : w.Printf("L%d [%s] (%s) Score=%.2f",
57 1 : redact.Safe(i.Level),
58 1 : redact.Safe(formatFileNums(i.Tables)),
59 1 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
60 1 : redact.Safe(i.Score))
61 1 : }
62 :
63 : // CompactionInfo contains the info for a compaction event.
64 : type CompactionInfo struct {
65 : // JobID is the ID of the compaction job.
66 : JobID int
67 : // Reason is the reason for the compaction.
68 : Reason string
69 : // Input contains the input tables for the compaction organized by level.
70 : Input []LevelInfo
71 : // Output contains the output tables generated by the compaction. The output
72 : // tables are empty for the compaction begin event.
73 : Output LevelInfo
74 : // Duration is the time spent compacting, including reading and writing
75 : // sstables.
76 : Duration time.Duration
77 : // TotalDuration is the total wall-time duration of the compaction,
78 : // including applying the compaction to the database. TotalDuration is
79 : // always ≥ Duration.
80 : TotalDuration time.Duration
81 : Done bool
82 : Err error
83 :
84 : SingleLevelOverlappingRatio float64
85 : MultiLevelOverlappingRatio float64
86 :
87 : // Annotations specifies additional info to appear in a compaction's event log line
88 : Annotations compactionAnnotations
89 : }
90 :
91 : type compactionAnnotations []string
92 :
93 : // SafeFormat implements redact.SafeFormatter.
94 1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
95 1 : if len(ca) == 0 {
96 0 : return
97 0 : }
98 1 : for i := range ca {
99 1 : if i != 0 {
100 0 : w.Print(" ")
101 0 : }
102 1 : w.Printf("%s", redact.SafeString(ca[i]))
103 : }
104 : }
105 :
106 1 : func (i CompactionInfo) String() string {
107 1 : return redact.StringWithoutMarkers(i)
108 1 : }
109 :
110 : // SafeFormat implements redact.SafeFormatter.
111 1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
112 1 : if i.Err != nil {
113 1 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
114 1 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
115 1 : return
116 1 : }
117 :
118 1 : if !i.Done {
119 1 : w.Printf("[JOB %d] compacting(%s) ",
120 1 : redact.Safe(i.JobID),
121 1 : redact.SafeString(i.Reason))
122 1 : if len(i.Annotations) > 0 {
123 1 : w.Printf("%s ", i.Annotations)
124 1 : }
125 1 : w.Printf("%s; ", levelInfos(i.Input))
126 1 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
127 1 : return
128 : }
129 1 : outputSize := tablesTotalSize(i.Output.Tables)
130 1 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
131 1 : if len(i.Annotations) > 0 {
132 1 : w.Printf("%s ", i.Annotations)
133 1 : }
134 1 : w.Print(levelInfos(i.Input))
135 1 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
136 1 : redact.Safe(i.Output.Level),
137 1 : redact.Safe(formatFileNums(i.Output.Tables)),
138 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
139 1 : redact.Safe(i.Duration.Seconds()),
140 1 : redact.Safe(i.TotalDuration.Seconds()),
141 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
142 : }
143 :
144 : type levelInfos []LevelInfo
145 :
146 1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
147 1 : for j, levelInfo := range i {
148 1 : if j > 0 {
149 1 : w.Printf(" + ")
150 1 : }
151 1 : w.Print(levelInfo)
152 : }
153 : }
154 :
155 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
156 : // file.
157 : type DiskSlowInfo = vfs.DiskSlowInfo
158 :
159 : // FlushInfo contains the info for a flush event.
160 : type FlushInfo struct {
161 : // JobID is the ID of the flush job.
162 : JobID int
163 : // Reason is the reason for the flush.
164 : Reason string
165 : // Input contains the count of input memtables that were flushed.
166 : Input int
167 : // InputBytes contains the total in-memory size of the memtable(s) that were
168 : // flushed. This size includes skiplist indexing data structures.
169 : InputBytes uint64
170 : // Output contains the ouptut table generated by the flush. The output info
171 : // is empty for the flush begin event.
172 : Output []TableInfo
173 : // Duration is the time spent flushing. This duration includes writing and
174 : // syncing all of the flushed keys to sstables.
175 : Duration time.Duration
176 : // TotalDuration is the total wall-time duration of the flush, including
177 : // applying the flush to the database. TotalDuration is always ≥ Duration.
178 : TotalDuration time.Duration
179 : // Ingest is set to true if the flush is handling tables that were added to
180 : // the flushable queue via an ingestion operation.
181 : Ingest bool
182 : // IngestLevels are the output levels for each ingested table in the flush.
183 : // This field is only populated when Ingest is true.
184 : IngestLevels []int
185 : Done bool
186 : Err error
187 : }
188 :
189 1 : func (i FlushInfo) String() string {
190 1 : return redact.StringWithoutMarkers(i)
191 1 : }
192 :
193 : // SafeFormat implements redact.SafeFormatter.
194 1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
195 1 : if i.Err != nil {
196 1 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
197 1 : return
198 1 : }
199 :
200 1 : plural := redact.SafeString("s")
201 1 : if i.Input == 1 {
202 1 : plural = ""
203 1 : }
204 1 : if !i.Done {
205 1 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
206 1 : if !i.Ingest {
207 1 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
208 1 : w.SafeString(plural)
209 1 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
210 1 : } else {
211 1 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
212 1 : }
213 1 : return
214 : }
215 :
216 1 : outputSize := tablesTotalSize(i.Output)
217 1 : if !i.Ingest {
218 1 : if invariants.Enabled && len(i.IngestLevels) > 0 {
219 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
220 : }
221 1 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
222 1 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
223 1 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
224 1 : redact.Safe(formatFileNums(i.Output)),
225 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
226 1 : redact.Safe(i.Duration.Seconds()),
227 1 : redact.Safe(i.TotalDuration.Seconds()),
228 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
229 1 : } else {
230 1 : if invariants.Enabled && len(i.IngestLevels) == 0 {
231 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
232 : }
233 1 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
234 1 : redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
235 1 : for j, level := range i.IngestLevels {
236 1 : file := i.Output[j]
237 1 : if j > 0 {
238 1 : w.Printf(" +")
239 1 : }
240 1 : w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
241 : }
242 1 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
243 1 : redact.Safe(i.Duration.Seconds()),
244 1 : redact.Safe(i.TotalDuration.Seconds()),
245 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
246 : }
247 : }
248 :
249 : // DownloadInfo contains the info for a DB.Download() event.
250 : type DownloadInfo struct {
251 : // JobID is the ID of the download job.
252 : JobID int
253 :
254 : Spans []DownloadSpan
255 :
256 : // Duration is the time since the operation was started.
257 : Duration time.Duration
258 : DownloadCompactionsLaunched int
259 :
260 : // RestartCount indicates that the download operation restarted because it
261 : // noticed that new external files were ingested. A DownloadBegin event with
262 : // RestartCount = 0 is the start of the operation; each time we restart it we
263 : // have another DownloadBegin event with RestartCount > 0.
264 : RestartCount int
265 : Done bool
266 : Err error
267 : }
268 :
269 1 : func (i DownloadInfo) String() string {
270 1 : return redact.StringWithoutMarkers(i)
271 1 : }
272 :
273 : // SafeFormat implements redact.SafeFormatter.
274 1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
275 1 : switch {
276 0 : case i.Err != nil:
277 0 : w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
278 :
279 1 : case i.Done:
280 1 : w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
281 1 : redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
282 :
283 1 : default:
284 1 : if i.RestartCount == 0 {
285 1 : w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
286 1 : } else {
287 0 : w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
288 0 : redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
289 0 : redact.Safe(i.DownloadCompactionsLaunched))
290 0 : }
291 : }
292 : }
293 :
294 : // ManifestCreateInfo contains info about a manifest creation event.
295 : type ManifestCreateInfo struct {
296 : // JobID is the ID of the job the caused the manifest to be created.
297 : JobID int
298 : Path string
299 : // The file number of the new Manifest.
300 : FileNum base.DiskFileNum
301 : Err error
302 : }
303 :
304 1 : func (i ManifestCreateInfo) String() string {
305 1 : return redact.StringWithoutMarkers(i)
306 1 : }
307 :
308 : // SafeFormat implements redact.SafeFormatter.
309 1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
310 1 : if i.Err != nil {
311 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
312 0 : return
313 0 : }
314 1 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
315 : }
316 :
317 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
318 : type ManifestDeleteInfo struct {
319 : // JobID is the ID of the job the caused the Manifest to be deleted.
320 : JobID int
321 : Path string
322 : FileNum base.DiskFileNum
323 : Err error
324 : }
325 :
326 1 : func (i ManifestDeleteInfo) String() string {
327 1 : return redact.StringWithoutMarkers(i)
328 1 : }
329 :
330 : // SafeFormat implements redact.SafeFormatter.
331 1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
332 1 : if i.Err != nil {
333 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
334 0 : return
335 0 : }
336 1 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
337 : }
338 :
339 : // TableCreateInfo contains the info for a table creation event.
340 : type TableCreateInfo struct {
341 : JobID int
342 : // Reason is the reason for the table creation: "compacting", "flushing", or
343 : // "ingesting".
344 : Reason string
345 : Path string
346 : FileNum base.DiskFileNum
347 : }
348 :
349 1 : func (i TableCreateInfo) String() string {
350 1 : return redact.StringWithoutMarkers(i)
351 1 : }
352 :
353 : // SafeFormat implements redact.SafeFormatter.
354 1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
355 1 : w.Printf("[JOB %d] %s: sstable created %s",
356 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
357 1 : }
358 :
359 : // TableDeleteInfo contains the info for a table deletion event.
360 : type TableDeleteInfo struct {
361 : JobID int
362 : Path string
363 : FileNum base.DiskFileNum
364 : Err error
365 : }
366 :
367 1 : func (i TableDeleteInfo) String() string {
368 1 : return redact.StringWithoutMarkers(i)
369 1 : }
370 :
371 : // SafeFormat implements redact.SafeFormatter.
372 1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
373 1 : if i.Err != nil {
374 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
375 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
376 0 : return
377 0 : }
378 1 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
379 : }
380 :
381 : // TableIngestInfo contains the info for a table ingestion event.
382 : type TableIngestInfo struct {
383 : // JobID is the ID of the job the caused the table to be ingested.
384 : JobID int
385 : Tables []struct {
386 : TableInfo
387 : Level int
388 : }
389 : // GlobalSeqNum is the sequence number that was assigned to all entries in
390 : // the ingested table.
391 : GlobalSeqNum base.SeqNum
392 : // flushable indicates whether the ingested sstable was treated as a
393 : // flushable.
394 : flushable bool
395 : Err error
396 : }
397 :
398 1 : func (i TableIngestInfo) String() string {
399 1 : return redact.StringWithoutMarkers(i)
400 1 : }
401 :
402 : // SafeFormat implements redact.SafeFormatter.
403 1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
404 1 : if i.Err != nil {
405 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
406 0 : return
407 0 : }
408 :
409 1 : if i.flushable {
410 1 : w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
411 1 : } else {
412 1 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
413 1 : }
414 :
415 1 : for j := range i.Tables {
416 1 : t := &i.Tables[j]
417 1 : if j > 0 {
418 1 : w.Printf(",")
419 1 : }
420 1 : levelStr := ""
421 1 : if !i.flushable {
422 1 : levelStr = fmt.Sprintf("L%d:", t.Level)
423 1 : }
424 1 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
425 1 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
426 : }
427 : }
428 :
429 : // TableStatsInfo contains the info for a table stats loaded event.
430 : type TableStatsInfo struct {
431 : // JobID is the ID of the job that finished loading the initial tables'
432 : // stats.
433 : JobID int
434 : }
435 :
436 1 : func (i TableStatsInfo) String() string {
437 1 : return redact.StringWithoutMarkers(i)
438 1 : }
439 :
440 : // SafeFormat implements redact.SafeFormatter.
441 1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
442 1 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
443 1 : }
444 :
445 : // TableValidatedInfo contains information on the result of a validation run
446 : // on an sstable.
447 : type TableValidatedInfo struct {
448 : JobID int
449 : Meta *fileMetadata
450 : }
451 :
452 1 : func (i TableValidatedInfo) String() string {
453 1 : return redact.StringWithoutMarkers(i)
454 1 : }
455 :
456 : // SafeFormat implements redact.SafeFormatter.
457 1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
458 1 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
459 1 : }
460 :
461 : // WALCreateInfo contains info about a WAL creation event.
462 : type WALCreateInfo struct {
463 : // JobID is the ID of the job the caused the WAL to be created.
464 : JobID int
465 : Path string
466 : // The file number of the new WAL.
467 : FileNum base.DiskFileNum
468 : // The file number of a previous WAL which was recycled to create this
469 : // one. Zero if recycling did not take place.
470 : RecycledFileNum base.DiskFileNum
471 : Err error
472 : }
473 :
474 1 : func (i WALCreateInfo) String() string {
475 1 : return redact.StringWithoutMarkers(i)
476 1 : }
477 :
478 : // SafeFormat implements redact.SafeFormatter.
479 1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
480 1 : if i.Err != nil {
481 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
482 0 : return
483 0 : }
484 :
485 1 : if i.RecycledFileNum == 0 {
486 1 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
487 1 : return
488 1 : }
489 :
490 0 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
491 0 : redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
492 : }
493 :
494 : // WALDeleteInfo contains the info for a WAL deletion event.
495 : //
496 : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
497 : // is insufficient to infer whether primary or secondary.
498 : type WALDeleteInfo struct {
499 : // JobID is the ID of the job the caused the WAL to be deleted.
500 : JobID int
501 : Path string
502 : FileNum base.DiskFileNum
503 : Err error
504 : }
505 :
506 1 : func (i WALDeleteInfo) String() string {
507 1 : return redact.StringWithoutMarkers(i)
508 1 : }
509 :
510 : // SafeFormat implements redact.SafeFormatter.
511 1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
512 1 : if i.Err != nil {
513 0 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
514 0 : return
515 0 : }
516 1 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
517 : }
518 :
519 : // WriteStallBeginInfo contains the info for a write stall begin event.
520 : type WriteStallBeginInfo struct {
521 : Reason string
522 : }
523 :
524 1 : func (i WriteStallBeginInfo) String() string {
525 1 : return redact.StringWithoutMarkers(i)
526 1 : }
527 :
528 : // SafeFormat implements redact.SafeFormatter.
529 1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
530 1 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
531 1 : }
532 :
533 : // EventListener contains a set of functions that will be invoked when various
534 : // significant DB events occur. Note that the functions should not run for an
535 : // excessive amount of time as they are invoked synchronously by the DB and may
536 : // block continued DB work. For a similar reason it is advisable to not perform
537 : // any synchronous calls back into the DB.
538 : type EventListener struct {
539 : // BackgroundError is invoked whenever an error occurs during a background
540 : // operation such as flush or compaction.
541 : BackgroundError func(error)
542 :
543 : // CompactionBegin is invoked after the inputs to a compaction have been
544 : // determined, but before the compaction has produced any output.
545 : CompactionBegin func(CompactionInfo)
546 :
547 : // CompactionEnd is invoked after a compaction has completed and the result
548 : // has been installed.
549 : CompactionEnd func(CompactionInfo)
550 :
551 : // DiskSlow is invoked after a disk write operation on a file created with a
552 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
553 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
554 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
555 : // MUST return without doing any IO, or blocking on anything (like a mutex)
556 : // that is waiting on IO. This is imperative in order to reliably monitor for
557 : // slowness, since if this goroutine gets stuck, the monitoring will stop
558 : // working.
559 : DiskSlow func(DiskSlowInfo)
560 :
561 : // FlushBegin is invoked after the inputs to a flush have been determined,
562 : // but before the flush has produced any output.
563 : FlushBegin func(FlushInfo)
564 :
565 : // FlushEnd is invoked after a flush has complated and the result has been
566 : // installed.
567 : FlushEnd func(FlushInfo)
568 :
569 : // DownloadBegin is invoked when a db.Download operation starts or restarts
570 : // (restarts are caused by new external tables being ingested during the
571 : // operation).
572 : DownloadBegin func(DownloadInfo)
573 :
574 : // DownloadEnd is invoked when a db.Download operation completes.
575 : DownloadEnd func(DownloadInfo)
576 :
577 : // FormatUpgrade is invoked after the database's FormatMajorVersion
578 : // is upgraded.
579 : FormatUpgrade func(FormatMajorVersion)
580 :
581 : // ManifestCreated is invoked after a manifest has been created.
582 : ManifestCreated func(ManifestCreateInfo)
583 :
584 : // ManifestDeleted is invoked after a manifest has been deleted.
585 : ManifestDeleted func(ManifestDeleteInfo)
586 :
587 : // TableCreated is invoked when a table has been created.
588 : TableCreated func(TableCreateInfo)
589 :
590 : // TableDeleted is invoked after a table has been deleted.
591 : TableDeleted func(TableDeleteInfo)
592 :
593 : // TableIngested is invoked after an externally created table has been
594 : // ingested via a call to DB.Ingest().
595 : TableIngested func(TableIngestInfo)
596 :
597 : // TableStatsLoaded is invoked at most once, when the table stats
598 : // collector has loaded statistics for all tables that existed at Open.
599 : TableStatsLoaded func(TableStatsInfo)
600 :
601 : // TableValidated is invoked after validation runs on an sstable.
602 : TableValidated func(TableValidatedInfo)
603 :
604 : // WALCreated is invoked after a WAL has been created.
605 : WALCreated func(WALCreateInfo)
606 :
607 : // WALDeleted is invoked after a WAL has been deleted.
608 : WALDeleted func(WALDeleteInfo)
609 :
610 : // WriteStallBegin is invoked when writes are intentionally delayed.
611 : WriteStallBegin func(WriteStallBeginInfo)
612 :
613 : // WriteStallEnd is invoked when delayed writes are released.
614 : WriteStallEnd func()
615 : }
616 :
617 : // EnsureDefaults ensures that background error events are logged to the
618 : // specified logger if a handler for those events hasn't been otherwise
619 : // specified. Ensure all handlers are non-nil so that we don't have to check
620 : // for nil-ness before invoking.
621 1 : func (l *EventListener) EnsureDefaults(logger Logger) {
622 1 : if l.BackgroundError == nil {
623 1 : if logger != nil {
624 1 : l.BackgroundError = func(err error) {
625 0 : logger.Errorf("background error: %s", err)
626 0 : }
627 0 : } else {
628 0 : l.BackgroundError = func(error) {}
629 : }
630 : }
631 1 : if l.CompactionBegin == nil {
632 1 : l.CompactionBegin = func(info CompactionInfo) {}
633 : }
634 1 : if l.CompactionEnd == nil {
635 1 : l.CompactionEnd = func(info CompactionInfo) {}
636 : }
637 1 : if l.DiskSlow == nil {
638 1 : l.DiskSlow = func(info DiskSlowInfo) {}
639 : }
640 1 : if l.FlushBegin == nil {
641 1 : l.FlushBegin = func(info FlushInfo) {}
642 : }
643 1 : if l.FlushEnd == nil {
644 1 : l.FlushEnd = func(info FlushInfo) {}
645 : }
646 1 : if l.DownloadBegin == nil {
647 1 : l.DownloadBegin = func(info DownloadInfo) {}
648 : }
649 1 : if l.DownloadEnd == nil {
650 1 : l.DownloadEnd = func(info DownloadInfo) {}
651 : }
652 1 : if l.FormatUpgrade == nil {
653 1 : l.FormatUpgrade = func(v FormatMajorVersion) {}
654 : }
655 1 : if l.ManifestCreated == nil {
656 1 : l.ManifestCreated = func(info ManifestCreateInfo) {}
657 : }
658 1 : if l.ManifestDeleted == nil {
659 1 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
660 : }
661 1 : if l.TableCreated == nil {
662 1 : l.TableCreated = func(info TableCreateInfo) {}
663 : }
664 1 : if l.TableDeleted == nil {
665 1 : l.TableDeleted = func(info TableDeleteInfo) {}
666 : }
667 1 : if l.TableIngested == nil {
668 1 : l.TableIngested = func(info TableIngestInfo) {}
669 : }
670 1 : if l.TableStatsLoaded == nil {
671 1 : l.TableStatsLoaded = func(info TableStatsInfo) {}
672 : }
673 1 : if l.TableValidated == nil {
674 1 : l.TableValidated = func(validated TableValidatedInfo) {}
675 : }
676 1 : if l.WALCreated == nil {
677 1 : l.WALCreated = func(info WALCreateInfo) {}
678 : }
679 1 : if l.WALDeleted == nil {
680 1 : l.WALDeleted = func(info WALDeleteInfo) {}
681 : }
682 1 : if l.WriteStallBegin == nil {
683 1 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
684 : }
685 1 : if l.WriteStallEnd == nil {
686 1 : l.WriteStallEnd = func() {}
687 : }
688 : }
689 :
690 : // MakeLoggingEventListener creates an EventListener that logs all events to the
691 : // specified logger.
692 1 : func MakeLoggingEventListener(logger Logger) EventListener {
693 1 : if logger == nil {
694 0 : logger = DefaultLogger
695 0 : }
696 :
697 1 : return EventListener{
698 1 : BackgroundError: func(err error) {
699 0 : logger.Errorf("background error: %s", err)
700 0 : },
701 1 : CompactionBegin: func(info CompactionInfo) {
702 1 : logger.Infof("%s", info)
703 1 : },
704 0 : CompactionEnd: func(info CompactionInfo) {
705 0 : logger.Infof("%s", info)
706 0 : },
707 0 : DiskSlow: func(info DiskSlowInfo) {
708 0 : logger.Infof("%s", info)
709 0 : },
710 1 : FlushBegin: func(info FlushInfo) {
711 1 : logger.Infof("%s", info)
712 1 : },
713 0 : FlushEnd: func(info FlushInfo) {
714 0 : logger.Infof("%s", info)
715 0 : },
716 1 : DownloadBegin: func(info DownloadInfo) {
717 1 : logger.Infof("%s", info)
718 1 : },
719 0 : DownloadEnd: func(info DownloadInfo) {
720 0 : logger.Infof("%s", info)
721 0 : },
722 1 : FormatUpgrade: func(v FormatMajorVersion) {
723 1 : logger.Infof("upgraded to format version: %s", v)
724 1 : },
725 0 : ManifestCreated: func(info ManifestCreateInfo) {
726 0 : logger.Infof("%s", info)
727 0 : },
728 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
729 0 : logger.Infof("%s", info)
730 0 : },
731 1 : TableCreated: func(info TableCreateInfo) {
732 1 : logger.Infof("%s", info)
733 1 : },
734 0 : TableDeleted: func(info TableDeleteInfo) {
735 0 : logger.Infof("%s", info)
736 0 : },
737 0 : TableIngested: func(info TableIngestInfo) {
738 0 : logger.Infof("%s", info)
739 0 : },
740 1 : TableStatsLoaded: func(info TableStatsInfo) {
741 1 : logger.Infof("%s", info)
742 1 : },
743 1 : TableValidated: func(info TableValidatedInfo) {
744 1 : logger.Infof("%s", info)
745 1 : },
746 0 : WALCreated: func(info WALCreateInfo) {
747 0 : logger.Infof("%s", info)
748 0 : },
749 0 : WALDeleted: func(info WALDeleteInfo) {
750 0 : logger.Infof("%s", info)
751 0 : },
752 1 : WriteStallBegin: func(info WriteStallBeginInfo) {
753 1 : logger.Infof("%s", info)
754 1 : },
755 1 : WriteStallEnd: func() {
756 1 : logger.Infof("write stall ending")
757 1 : },
758 : }
759 : }
760 :
761 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
762 0 : func TeeEventListener(a, b EventListener) EventListener {
763 0 : a.EnsureDefaults(nil)
764 0 : b.EnsureDefaults(nil)
765 0 : return EventListener{
766 0 : BackgroundError: func(err error) {
767 0 : a.BackgroundError(err)
768 0 : b.BackgroundError(err)
769 0 : },
770 0 : CompactionBegin: func(info CompactionInfo) {
771 0 : a.CompactionBegin(info)
772 0 : b.CompactionBegin(info)
773 0 : },
774 0 : CompactionEnd: func(info CompactionInfo) {
775 0 : a.CompactionEnd(info)
776 0 : b.CompactionEnd(info)
777 0 : },
778 0 : DiskSlow: func(info DiskSlowInfo) {
779 0 : a.DiskSlow(info)
780 0 : b.DiskSlow(info)
781 0 : },
782 0 : FlushBegin: func(info FlushInfo) {
783 0 : a.FlushBegin(info)
784 0 : b.FlushBegin(info)
785 0 : },
786 0 : FlushEnd: func(info FlushInfo) {
787 0 : a.FlushEnd(info)
788 0 : b.FlushEnd(info)
789 0 : },
790 0 : DownloadBegin: func(info DownloadInfo) {
791 0 : a.DownloadBegin(info)
792 0 : b.DownloadBegin(info)
793 0 : },
794 0 : DownloadEnd: func(info DownloadInfo) {
795 0 : a.DownloadEnd(info)
796 0 : b.DownloadEnd(info)
797 0 : },
798 0 : FormatUpgrade: func(v FormatMajorVersion) {
799 0 : a.FormatUpgrade(v)
800 0 : b.FormatUpgrade(v)
801 0 : },
802 0 : ManifestCreated: func(info ManifestCreateInfo) {
803 0 : a.ManifestCreated(info)
804 0 : b.ManifestCreated(info)
805 0 : },
806 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
807 0 : a.ManifestDeleted(info)
808 0 : b.ManifestDeleted(info)
809 0 : },
810 0 : TableCreated: func(info TableCreateInfo) {
811 0 : a.TableCreated(info)
812 0 : b.TableCreated(info)
813 0 : },
814 0 : TableDeleted: func(info TableDeleteInfo) {
815 0 : a.TableDeleted(info)
816 0 : b.TableDeleted(info)
817 0 : },
818 0 : TableIngested: func(info TableIngestInfo) {
819 0 : a.TableIngested(info)
820 0 : b.TableIngested(info)
821 0 : },
822 0 : TableStatsLoaded: func(info TableStatsInfo) {
823 0 : a.TableStatsLoaded(info)
824 0 : b.TableStatsLoaded(info)
825 0 : },
826 0 : TableValidated: func(info TableValidatedInfo) {
827 0 : a.TableValidated(info)
828 0 : b.TableValidated(info)
829 0 : },
830 0 : WALCreated: func(info WALCreateInfo) {
831 0 : a.WALCreated(info)
832 0 : b.WALCreated(info)
833 0 : },
834 0 : WALDeleted: func(info WALDeleteInfo) {
835 0 : a.WALDeleted(info)
836 0 : b.WALDeleted(info)
837 0 : },
838 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
839 0 : a.WriteStallBegin(info)
840 0 : b.WriteStallBegin(info)
841 0 : },
842 0 : WriteStallEnd: func() {
843 0 : a.WriteStallEnd()
844 0 : b.WriteStallEnd()
845 0 : },
846 : }
847 : }
|