Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "time"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/humanize"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/redact"
19 : )
20 :
21 : // TableInfo exports the manifest.TableInfo type.
22 : type TableInfo = manifest.TableInfo
23 :
24 1 : func tablesTotalSize(tables []TableInfo) uint64 {
25 1 : var size uint64
26 1 : for i := range tables {
27 1 : size += tables[i].Size
28 1 : }
29 1 : return size
30 : }
31 :
32 1 : func formatFileNums(tables []TableInfo) string {
33 1 : var buf strings.Builder
34 1 : for i := range tables {
35 1 : if i > 0 {
36 1 : buf.WriteString(" ")
37 1 : }
38 1 : buf.WriteString(tables[i].FileNum.String())
39 : }
40 1 : return buf.String()
41 : }
42 :
43 : // LevelInfo contains info pertaining to a particular level.
44 : type LevelInfo struct {
45 : Level int
46 : Tables []TableInfo
47 : Score float64
48 : }
49 :
50 0 : func (i LevelInfo) String() string {
51 0 : return redact.StringWithoutMarkers(i)
52 0 : }
53 :
54 : // SafeFormat implements redact.SafeFormatter.
55 1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
56 1 : w.Printf("L%d [%s] (%s) Score=%.2f",
57 1 : redact.Safe(i.Level),
58 1 : redact.Safe(formatFileNums(i.Tables)),
59 1 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
60 1 : redact.Safe(i.Score))
61 1 : }
62 :
63 : // CompactionInfo contains the info for a compaction event.
64 : type CompactionInfo struct {
65 : // JobID is the ID of the compaction job.
66 : JobID int
67 : // Reason is the reason for the compaction.
68 : Reason string
69 : // Input contains the input tables for the compaction organized by level.
70 : Input []LevelInfo
71 : // Output contains the output tables generated by the compaction. The output
72 : // tables are empty for the compaction begin event.
73 : Output LevelInfo
74 : // Duration is the time spent compacting, including reading and writing
75 : // sstables.
76 : Duration time.Duration
77 : // TotalDuration is the total wall-time duration of the compaction,
78 : // including applying the compaction to the database. TotalDuration is
79 : // always ≥ Duration.
80 : TotalDuration time.Duration
81 : Done bool
82 : Err error
83 :
84 : SingleLevelOverlappingRatio float64
85 : MultiLevelOverlappingRatio float64
86 :
87 : // Annotations specifies additional info to appear in a compaction's event log line
88 : Annotations compactionAnnotations
89 : }
90 :
91 : type compactionAnnotations []string
92 :
93 : // SafeFormat implements redact.SafeFormatter.
94 1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
95 1 : if len(ca) == 0 {
96 1 : return
97 1 : }
98 1 : for i := range ca {
99 1 : if i != 0 {
100 0 : w.Print(" ")
101 0 : }
102 1 : w.Printf("%s", redact.SafeString(ca[i]))
103 : }
104 : }
105 :
106 1 : func (i CompactionInfo) String() string {
107 1 : return redact.StringWithoutMarkers(i)
108 1 : }
109 :
110 : // SafeFormat implements redact.SafeFormatter.
111 1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
112 1 : if i.Err != nil {
113 1 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
114 1 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
115 1 : return
116 1 : }
117 :
118 1 : if !i.Done {
119 1 : w.Printf("[JOB %d] compacting(%s) ",
120 1 : redact.Safe(i.JobID),
121 1 : redact.SafeString(i.Reason))
122 1 : w.Printf("%s", i.Annotations)
123 1 : w.Printf("%s; ", levelInfos(i.Input))
124 1 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
125 1 : return
126 1 : }
127 1 : outputSize := tablesTotalSize(i.Output.Tables)
128 1 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
129 1 : w.Printf("%s", i.Annotations)
130 1 : w.Print(levelInfos(i.Input))
131 1 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
132 1 : redact.Safe(i.Output.Level),
133 1 : redact.Safe(formatFileNums(i.Output.Tables)),
134 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
135 1 : redact.Safe(i.Duration.Seconds()),
136 1 : redact.Safe(i.TotalDuration.Seconds()),
137 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
138 : }
139 :
140 : type levelInfos []LevelInfo
141 :
142 1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
143 1 : for j, levelInfo := range i {
144 1 : if j > 0 {
145 1 : w.Printf(" + ")
146 1 : }
147 1 : w.Print(levelInfo)
148 : }
149 : }
150 :
151 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
152 : // file.
153 : type DiskSlowInfo = vfs.DiskSlowInfo
154 :
155 : // FlushInfo contains the info for a flush event.
156 : type FlushInfo struct {
157 : // JobID is the ID of the flush job.
158 : JobID int
159 : // Reason is the reason for the flush.
160 : Reason string
161 : // Input contains the count of input memtables that were flushed.
162 : Input int
163 : // InputBytes contains the total in-memory size of the memtable(s) that were
164 : // flushed. This size includes skiplist indexing data structures.
165 : InputBytes uint64
166 : // Output contains the ouptut table generated by the flush. The output info
167 : // is empty for the flush begin event.
168 : Output []TableInfo
169 : // Duration is the time spent flushing. This duration includes writing and
170 : // syncing all of the flushed keys to sstables.
171 : Duration time.Duration
172 : // TotalDuration is the total wall-time duration of the flush, including
173 : // applying the flush to the database. TotalDuration is always ≥ Duration.
174 : TotalDuration time.Duration
175 : // Ingest is set to true if the flush is handling tables that were added to
176 : // the flushable queue via an ingestion operation.
177 : Ingest bool
178 : // IngestLevels are the output levels for each ingested table in the flush.
179 : // This field is only populated when Ingest is true.
180 : IngestLevels []int
181 : Done bool
182 : Err error
183 : }
184 :
185 1 : func (i FlushInfo) String() string {
186 1 : return redact.StringWithoutMarkers(i)
187 1 : }
188 :
189 : // SafeFormat implements redact.SafeFormatter.
190 1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
191 1 : if i.Err != nil {
192 1 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
193 1 : return
194 1 : }
195 :
196 1 : plural := redact.SafeString("s")
197 1 : if i.Input == 1 {
198 1 : plural = ""
199 1 : }
200 1 : if !i.Done {
201 1 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
202 1 : if !i.Ingest {
203 1 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
204 1 : w.SafeString(plural)
205 1 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
206 1 : } else {
207 1 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
208 1 : }
209 1 : return
210 : }
211 :
212 1 : outputSize := tablesTotalSize(i.Output)
213 1 : if !i.Ingest {
214 1 : if invariants.Enabled && len(i.IngestLevels) > 0 {
215 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
216 : }
217 1 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
218 1 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
219 1 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
220 1 : redact.Safe(formatFileNums(i.Output)),
221 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
222 1 : redact.Safe(i.Duration.Seconds()),
223 1 : redact.Safe(i.TotalDuration.Seconds()),
224 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
225 1 : } else {
226 1 : if invariants.Enabled && len(i.IngestLevels) == 0 {
227 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
228 : }
229 1 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
230 1 : redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
231 1 : for j, level := range i.IngestLevels {
232 1 : file := i.Output[j]
233 1 : if j > 0 {
234 1 : w.Printf(" +")
235 1 : }
236 1 : w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
237 : }
238 1 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
239 1 : redact.Safe(i.Duration.Seconds()),
240 1 : redact.Safe(i.TotalDuration.Seconds()),
241 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
242 : }
243 : }
244 :
245 : // DownloadInfo contains the info for a DB.Download() event.
246 : type DownloadInfo struct {
247 : // JobID is the ID of the download job.
248 : JobID int
249 :
250 : Spans []DownloadSpan
251 :
252 : // Duration is the time since the operation was started.
253 : Duration time.Duration
254 : DownloadCompactionsLaunched int
255 :
256 : // RestartCount indicates that the download operation restarted because it
257 : // noticed that new external files were ingested. A DownloadBegin event with
258 : // RestartCount = 0 is the start of the operation; each time we restart it we
259 : // have another DownloadBegin event with RestartCount > 0.
260 : RestartCount int
261 : Done bool
262 : Err error
263 : }
264 :
265 1 : func (i DownloadInfo) String() string {
266 1 : return redact.StringWithoutMarkers(i)
267 1 : }
268 :
269 : // SafeFormat implements redact.SafeFormatter.
270 1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
271 1 : switch {
272 0 : case i.Err != nil:
273 0 : w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
274 :
275 1 : case i.Done:
276 1 : w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
277 1 : redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
278 :
279 1 : default:
280 1 : if i.RestartCount == 0 {
281 1 : w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
282 1 : } else {
283 0 : w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
284 0 : redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
285 0 : redact.Safe(i.DownloadCompactionsLaunched))
286 0 : }
287 : }
288 : }
289 :
290 : // ManifestCreateInfo contains info about a manifest creation event.
291 : type ManifestCreateInfo struct {
292 : // JobID is the ID of the job the caused the manifest to be created.
293 : JobID int
294 : Path string
295 : // The file number of the new Manifest.
296 : FileNum base.DiskFileNum
297 : Err error
298 : }
299 :
300 1 : func (i ManifestCreateInfo) String() string {
301 1 : return redact.StringWithoutMarkers(i)
302 1 : }
303 :
304 : // SafeFormat implements redact.SafeFormatter.
305 1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
306 1 : if i.Err != nil {
307 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
308 0 : return
309 0 : }
310 1 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
311 : }
312 :
313 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
314 : type ManifestDeleteInfo struct {
315 : // JobID is the ID of the job the caused the Manifest to be deleted.
316 : JobID int
317 : Path string
318 : FileNum base.DiskFileNum
319 : Err error
320 : }
321 :
322 1 : func (i ManifestDeleteInfo) String() string {
323 1 : return redact.StringWithoutMarkers(i)
324 1 : }
325 :
326 : // SafeFormat implements redact.SafeFormatter.
327 1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
328 1 : if i.Err != nil {
329 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
330 0 : return
331 0 : }
332 1 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
333 : }
334 :
335 : // TableCreateInfo contains the info for a table creation event.
336 : type TableCreateInfo struct {
337 : JobID int
338 : // Reason is the reason for the table creation: "compacting", "flushing", or
339 : // "ingesting".
340 : Reason string
341 : Path string
342 : FileNum base.DiskFileNum
343 : }
344 :
345 1 : func (i TableCreateInfo) String() string {
346 1 : return redact.StringWithoutMarkers(i)
347 1 : }
348 :
349 : // SafeFormat implements redact.SafeFormatter.
350 1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
351 1 : w.Printf("[JOB %d] %s: sstable created %s",
352 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
353 1 : }
354 :
355 : // TableDeleteInfo contains the info for a table deletion event.
356 : type TableDeleteInfo struct {
357 : JobID int
358 : Path string
359 : FileNum base.DiskFileNum
360 : Err error
361 : }
362 :
363 1 : func (i TableDeleteInfo) String() string {
364 1 : return redact.StringWithoutMarkers(i)
365 1 : }
366 :
367 : // SafeFormat implements redact.SafeFormatter.
368 1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
369 1 : if i.Err != nil {
370 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
371 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
372 0 : return
373 0 : }
374 1 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
375 : }
376 :
377 : // TableIngestInfo contains the info for a table ingestion event.
378 : type TableIngestInfo struct {
379 : // JobID is the ID of the job the caused the table to be ingested.
380 : JobID int
381 : Tables []struct {
382 : TableInfo
383 : Level int
384 : }
385 : // GlobalSeqNum is the sequence number that was assigned to all entries in
386 : // the ingested table.
387 : GlobalSeqNum base.SeqNum
388 : // flushable indicates whether the ingested sstable was treated as a
389 : // flushable.
390 : flushable bool
391 : Err error
392 : }
393 :
394 1 : func (i TableIngestInfo) String() string {
395 1 : return redact.StringWithoutMarkers(i)
396 1 : }
397 :
398 : // SafeFormat implements redact.SafeFormatter.
399 1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
400 1 : if i.Err != nil {
401 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
402 0 : return
403 0 : }
404 :
405 1 : if i.flushable {
406 1 : w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
407 1 : } else {
408 1 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
409 1 : }
410 :
411 1 : for j := range i.Tables {
412 1 : t := &i.Tables[j]
413 1 : if j > 0 {
414 1 : w.Printf(",")
415 1 : }
416 1 : levelStr := ""
417 1 : if !i.flushable {
418 1 : levelStr = fmt.Sprintf("L%d:", t.Level)
419 1 : }
420 1 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
421 1 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
422 : }
423 : }
424 :
425 : // TableStatsInfo contains the info for a table stats loaded event.
426 : type TableStatsInfo struct {
427 : // JobID is the ID of the job that finished loading the initial tables'
428 : // stats.
429 : JobID int
430 : }
431 :
432 1 : func (i TableStatsInfo) String() string {
433 1 : return redact.StringWithoutMarkers(i)
434 1 : }
435 :
436 : // SafeFormat implements redact.SafeFormatter.
437 1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
438 1 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
439 1 : }
440 :
441 : // TableValidatedInfo contains information on the result of a validation run
442 : // on an sstable.
443 : type TableValidatedInfo struct {
444 : JobID int
445 : Meta *fileMetadata
446 : }
447 :
448 1 : func (i TableValidatedInfo) String() string {
449 1 : return redact.StringWithoutMarkers(i)
450 1 : }
451 :
452 : // SafeFormat implements redact.SafeFormatter.
453 1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
454 1 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
455 1 : }
456 :
457 : // WALCreateInfo contains info about a WAL creation event.
458 : type WALCreateInfo struct {
459 : // JobID is the ID of the job the caused the WAL to be created.
460 : JobID int
461 : Path string
462 : // The file number of the new WAL.
463 : FileNum base.DiskFileNum
464 : // The file number of a previous WAL which was recycled to create this
465 : // one. Zero if recycling did not take place.
466 : RecycledFileNum base.DiskFileNum
467 : Err error
468 : }
469 :
470 1 : func (i WALCreateInfo) String() string {
471 1 : return redact.StringWithoutMarkers(i)
472 1 : }
473 :
474 : // SafeFormat implements redact.SafeFormatter.
475 1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
476 1 : if i.Err != nil {
477 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
478 0 : return
479 0 : }
480 :
481 1 : if i.RecycledFileNum == 0 {
482 1 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
483 1 : return
484 1 : }
485 :
486 1 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
487 1 : redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
488 : }
489 :
490 : // WALDeleteInfo contains the info for a WAL deletion event.
491 : //
492 : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
493 : // is insufficient to infer whether primary or secondary.
494 : type WALDeleteInfo struct {
495 : // JobID is the ID of the job the caused the WAL to be deleted.
496 : JobID int
497 : Path string
498 : FileNum base.DiskFileNum
499 : Err error
500 : }
501 :
502 1 : func (i WALDeleteInfo) String() string {
503 1 : return redact.StringWithoutMarkers(i)
504 1 : }
505 :
506 : // SafeFormat implements redact.SafeFormatter.
507 1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
508 1 : if i.Err != nil {
509 1 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
510 1 : return
511 1 : }
512 1 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
513 : }
514 :
515 : // WriteStallBeginInfo contains the info for a write stall begin event.
516 : type WriteStallBeginInfo struct {
517 : Reason string
518 : }
519 :
520 1 : func (i WriteStallBeginInfo) String() string {
521 1 : return redact.StringWithoutMarkers(i)
522 1 : }
523 :
524 : // SafeFormat implements redact.SafeFormatter.
525 1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
526 1 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
527 1 : }
528 :
529 : // EventListener contains a set of functions that will be invoked when various
530 : // significant DB events occur. Note that the functions should not run for an
531 : // excessive amount of time as they are invoked synchronously by the DB and may
532 : // block continued DB work. For a similar reason it is advisable to not perform
533 : // any synchronous calls back into the DB.
534 : type EventListener struct {
535 : // BackgroundError is invoked whenever an error occurs during a background
536 : // operation such as flush or compaction.
537 : BackgroundError func(error)
538 :
539 : // CompactionBegin is invoked after the inputs to a compaction have been
540 : // determined, but before the compaction has produced any output.
541 : CompactionBegin func(CompactionInfo)
542 :
543 : // CompactionEnd is invoked after a compaction has completed and the result
544 : // has been installed.
545 : CompactionEnd func(CompactionInfo)
546 :
547 : // DiskSlow is invoked after a disk write operation on a file created with a
548 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
549 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
550 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
551 : // MUST return without doing any IO, or blocking on anything (like a mutex)
552 : // that is waiting on IO. This is imperative in order to reliably monitor for
553 : // slowness, since if this goroutine gets stuck, the monitoring will stop
554 : // working.
555 : DiskSlow func(DiskSlowInfo)
556 :
557 : // FlushBegin is invoked after the inputs to a flush have been determined,
558 : // but before the flush has produced any output.
559 : FlushBegin func(FlushInfo)
560 :
561 : // FlushEnd is invoked after a flush has complated and the result has been
562 : // installed.
563 : FlushEnd func(FlushInfo)
564 :
565 : // DownloadBegin is invoked when a db.Download operation starts or restarts
566 : // (restarts are caused by new external tables being ingested during the
567 : // operation).
568 : DownloadBegin func(DownloadInfo)
569 :
570 : // DownloadEnd is invoked when a db.Download operation completes.
571 : DownloadEnd func(DownloadInfo)
572 :
573 : // FormatUpgrade is invoked after the database's FormatMajorVersion
574 : // is upgraded.
575 : FormatUpgrade func(FormatMajorVersion)
576 :
577 : // ManifestCreated is invoked after a manifest has been created.
578 : ManifestCreated func(ManifestCreateInfo)
579 :
580 : // ManifestDeleted is invoked after a manifest has been deleted.
581 : ManifestDeleted func(ManifestDeleteInfo)
582 :
583 : // TableCreated is invoked when a table has been created.
584 : TableCreated func(TableCreateInfo)
585 :
586 : // TableDeleted is invoked after a table has been deleted.
587 : TableDeleted func(TableDeleteInfo)
588 :
589 : // TableIngested is invoked after an externally created table has been
590 : // ingested via a call to DB.Ingest().
591 : TableIngested func(TableIngestInfo)
592 :
593 : // TableStatsLoaded is invoked at most once, when the table stats
594 : // collector has loaded statistics for all tables that existed at Open.
595 : TableStatsLoaded func(TableStatsInfo)
596 :
597 : // TableValidated is invoked after validation runs on an sstable.
598 : TableValidated func(TableValidatedInfo)
599 :
600 : // WALCreated is invoked after a WAL has been created.
601 : WALCreated func(WALCreateInfo)
602 :
603 : // WALDeleted is invoked after a WAL has been deleted.
604 : WALDeleted func(WALDeleteInfo)
605 :
606 : // WriteStallBegin is invoked when writes are intentionally delayed.
607 : WriteStallBegin func(WriteStallBeginInfo)
608 :
609 : // WriteStallEnd is invoked when delayed writes are released.
610 : WriteStallEnd func()
611 : }
612 :
613 : // EnsureDefaults ensures that background error events are logged to the
614 : // specified logger if a handler for those events hasn't been otherwise
615 : // specified. Ensure all handlers are non-nil so that we don't have to check
616 : // for nil-ness before invoking.
617 1 : func (l *EventListener) EnsureDefaults(logger Logger) {
618 1 : if l.BackgroundError == nil {
619 1 : if logger != nil {
620 1 : l.BackgroundError = func(err error) {
621 1 : logger.Errorf("background error: %s", err)
622 1 : }
623 1 : } else {
624 1 : l.BackgroundError = func(error) {}
625 : }
626 : }
627 1 : if l.CompactionBegin == nil {
628 1 : l.CompactionBegin = func(info CompactionInfo) {}
629 : }
630 1 : if l.CompactionEnd == nil {
631 1 : l.CompactionEnd = func(info CompactionInfo) {}
632 : }
633 1 : if l.DiskSlow == nil {
634 1 : l.DiskSlow = func(info DiskSlowInfo) {}
635 : }
636 1 : if l.FlushBegin == nil {
637 1 : l.FlushBegin = func(info FlushInfo) {}
638 : }
639 1 : if l.FlushEnd == nil {
640 1 : l.FlushEnd = func(info FlushInfo) {}
641 : }
642 1 : if l.DownloadBegin == nil {
643 1 : l.DownloadBegin = func(info DownloadInfo) {}
644 : }
645 1 : if l.DownloadEnd == nil {
646 1 : l.DownloadEnd = func(info DownloadInfo) {}
647 : }
648 1 : if l.FormatUpgrade == nil {
649 1 : l.FormatUpgrade = func(v FormatMajorVersion) {}
650 : }
651 1 : if l.ManifestCreated == nil {
652 1 : l.ManifestCreated = func(info ManifestCreateInfo) {}
653 : }
654 1 : if l.ManifestDeleted == nil {
655 1 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
656 : }
657 1 : if l.TableCreated == nil {
658 1 : l.TableCreated = func(info TableCreateInfo) {}
659 : }
660 1 : if l.TableDeleted == nil {
661 1 : l.TableDeleted = func(info TableDeleteInfo) {}
662 : }
663 1 : if l.TableIngested == nil {
664 1 : l.TableIngested = func(info TableIngestInfo) {}
665 : }
666 1 : if l.TableStatsLoaded == nil {
667 1 : l.TableStatsLoaded = func(info TableStatsInfo) {}
668 : }
669 1 : if l.TableValidated == nil {
670 1 : l.TableValidated = func(validated TableValidatedInfo) {}
671 : }
672 1 : if l.WALCreated == nil {
673 1 : l.WALCreated = func(info WALCreateInfo) {}
674 : }
675 1 : if l.WALDeleted == nil {
676 1 : l.WALDeleted = func(info WALDeleteInfo) {}
677 : }
678 1 : if l.WriteStallBegin == nil {
679 1 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
680 : }
681 1 : if l.WriteStallEnd == nil {
682 1 : l.WriteStallEnd = func() {}
683 : }
684 : }
685 :
686 : // MakeLoggingEventListener creates an EventListener that logs all events to the
687 : // specified logger.
688 1 : func MakeLoggingEventListener(logger Logger) EventListener {
689 1 : if logger == nil {
690 1 : logger = DefaultLogger
691 1 : }
692 :
693 1 : return EventListener{
694 1 : BackgroundError: func(err error) {
695 1 : logger.Errorf("background error: %s", err)
696 1 : },
697 1 : CompactionBegin: func(info CompactionInfo) {
698 1 : logger.Infof("%s", info)
699 1 : },
700 1 : CompactionEnd: func(info CompactionInfo) {
701 1 : logger.Infof("%s", info)
702 1 : },
703 0 : DiskSlow: func(info DiskSlowInfo) {
704 0 : logger.Infof("%s", info)
705 0 : },
706 1 : FlushBegin: func(info FlushInfo) {
707 1 : logger.Infof("%s", info)
708 1 : },
709 1 : FlushEnd: func(info FlushInfo) {
710 1 : logger.Infof("%s", info)
711 1 : },
712 1 : DownloadBegin: func(info DownloadInfo) {
713 1 : logger.Infof("%s", info)
714 1 : },
715 1 : DownloadEnd: func(info DownloadInfo) {
716 1 : logger.Infof("%s", info)
717 1 : },
718 1 : FormatUpgrade: func(v FormatMajorVersion) {
719 1 : logger.Infof("upgraded to format version: %s", v)
720 1 : },
721 1 : ManifestCreated: func(info ManifestCreateInfo) {
722 1 : logger.Infof("%s", info)
723 1 : },
724 1 : ManifestDeleted: func(info ManifestDeleteInfo) {
725 1 : logger.Infof("%s", info)
726 1 : },
727 1 : TableCreated: func(info TableCreateInfo) {
728 1 : logger.Infof("%s", info)
729 1 : },
730 1 : TableDeleted: func(info TableDeleteInfo) {
731 1 : logger.Infof("%s", info)
732 1 : },
733 1 : TableIngested: func(info TableIngestInfo) {
734 1 : logger.Infof("%s", info)
735 1 : },
736 1 : TableStatsLoaded: func(info TableStatsInfo) {
737 1 : logger.Infof("%s", info)
738 1 : },
739 1 : TableValidated: func(info TableValidatedInfo) {
740 1 : logger.Infof("%s", info)
741 1 : },
742 1 : WALCreated: func(info WALCreateInfo) {
743 1 : logger.Infof("%s", info)
744 1 : },
745 1 : WALDeleted: func(info WALDeleteInfo) {
746 1 : logger.Infof("%s", info)
747 1 : },
748 1 : WriteStallBegin: func(info WriteStallBeginInfo) {
749 1 : logger.Infof("%s", info)
750 1 : },
751 1 : WriteStallEnd: func() {
752 1 : logger.Infof("write stall ending")
753 1 : },
754 : }
755 : }
756 :
757 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
758 1 : func TeeEventListener(a, b EventListener) EventListener {
759 1 : a.EnsureDefaults(nil)
760 1 : b.EnsureDefaults(nil)
761 1 : return EventListener{
762 1 : BackgroundError: func(err error) {
763 1 : a.BackgroundError(err)
764 1 : b.BackgroundError(err)
765 1 : },
766 1 : CompactionBegin: func(info CompactionInfo) {
767 1 : a.CompactionBegin(info)
768 1 : b.CompactionBegin(info)
769 1 : },
770 1 : CompactionEnd: func(info CompactionInfo) {
771 1 : a.CompactionEnd(info)
772 1 : b.CompactionEnd(info)
773 1 : },
774 0 : DiskSlow: func(info DiskSlowInfo) {
775 0 : a.DiskSlow(info)
776 0 : b.DiskSlow(info)
777 0 : },
778 1 : FlushBegin: func(info FlushInfo) {
779 1 : a.FlushBegin(info)
780 1 : b.FlushBegin(info)
781 1 : },
782 1 : FlushEnd: func(info FlushInfo) {
783 1 : a.FlushEnd(info)
784 1 : b.FlushEnd(info)
785 1 : },
786 0 : DownloadBegin: func(info DownloadInfo) {
787 0 : a.DownloadBegin(info)
788 0 : b.DownloadBegin(info)
789 0 : },
790 0 : DownloadEnd: func(info DownloadInfo) {
791 0 : a.DownloadEnd(info)
792 0 : b.DownloadEnd(info)
793 0 : },
794 0 : FormatUpgrade: func(v FormatMajorVersion) {
795 0 : a.FormatUpgrade(v)
796 0 : b.FormatUpgrade(v)
797 0 : },
798 1 : ManifestCreated: func(info ManifestCreateInfo) {
799 1 : a.ManifestCreated(info)
800 1 : b.ManifestCreated(info)
801 1 : },
802 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
803 0 : a.ManifestDeleted(info)
804 0 : b.ManifestDeleted(info)
805 0 : },
806 1 : TableCreated: func(info TableCreateInfo) {
807 1 : a.TableCreated(info)
808 1 : b.TableCreated(info)
809 1 : },
810 1 : TableDeleted: func(info TableDeleteInfo) {
811 1 : a.TableDeleted(info)
812 1 : b.TableDeleted(info)
813 1 : },
814 1 : TableIngested: func(info TableIngestInfo) {
815 1 : a.TableIngested(info)
816 1 : b.TableIngested(info)
817 1 : },
818 1 : TableStatsLoaded: func(info TableStatsInfo) {
819 1 : a.TableStatsLoaded(info)
820 1 : b.TableStatsLoaded(info)
821 1 : },
822 0 : TableValidated: func(info TableValidatedInfo) {
823 0 : a.TableValidated(info)
824 0 : b.TableValidated(info)
825 0 : },
826 1 : WALCreated: func(info WALCreateInfo) {
827 1 : a.WALCreated(info)
828 1 : b.WALCreated(info)
829 1 : },
830 1 : WALDeleted: func(info WALDeleteInfo) {
831 1 : a.WALDeleted(info)
832 1 : b.WALDeleted(info)
833 1 : },
834 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
835 0 : a.WriteStallBegin(info)
836 0 : b.WriteStallBegin(info)
837 0 : },
838 0 : WriteStallEnd: func() {
839 0 : a.WriteStallEnd()
840 0 : b.WriteStallEnd()
841 0 : },
842 : }
843 : }
|