Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "time"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/humanize"
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "github.com/cockroachdb/redact"
18 : )
19 :
20 : // TableInfo exports the manifest.TableInfo type.
21 : type TableInfo = manifest.TableInfo
22 :
23 1 : func tablesTotalSize(tables []TableInfo) uint64 {
24 1 : var size uint64
25 1 : for i := range tables {
26 1 : size += tables[i].Size
27 1 : }
28 1 : return size
29 : }
30 :
31 1 : func formatFileNums(tables []TableInfo) string {
32 1 : var buf strings.Builder
33 1 : for i := range tables {
34 1 : if i > 0 {
35 1 : buf.WriteString(" ")
36 1 : }
37 1 : buf.WriteString(tables[i].FileNum.String())
38 : }
39 1 : return buf.String()
40 : }
41 :
42 : // LevelInfo contains info pertaining to a particular level.
43 : type LevelInfo struct {
44 : Level int
45 : Tables []TableInfo
46 : Score float64
47 : }
48 :
49 0 : func (i LevelInfo) String() string {
50 0 : return redact.StringWithoutMarkers(i)
51 0 : }
52 :
53 : // SafeFormat implements redact.SafeFormatter.
54 1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
55 1 : w.Printf("L%d [%s] (%s) Score=%.2f",
56 1 : redact.Safe(i.Level),
57 1 : redact.Safe(formatFileNums(i.Tables)),
58 1 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
59 1 : redact.Safe(i.Score))
60 1 : }
61 :
62 : // CompactionInfo contains the info for a compaction event.
63 : type CompactionInfo struct {
64 : // JobID is the ID of the compaction job.
65 : JobID int
66 : // Reason is the reason for the compaction.
67 : Reason string
68 : // Input contains the input tables for the compaction organized by level.
69 : Input []LevelInfo
70 : // Output contains the output tables generated by the compaction. The output
71 : // tables are empty for the compaction begin event.
72 : Output LevelInfo
73 : // Duration is the time spent compacting, including reading and writing
74 : // sstables.
75 : Duration time.Duration
76 : // TotalDuration is the total wall-time duration of the compaction,
77 : // including applying the compaction to the database. TotalDuration is
78 : // always ≥ Duration.
79 : TotalDuration time.Duration
80 : Done bool
81 : Err error
82 :
83 : SingleLevelOverlappingRatio float64
84 : MultiLevelOverlappingRatio float64
85 :
86 : // Annotations specifies additional info to appear in a compaction's event log line
87 : Annotations compactionAnnotations
88 : }
89 :
90 : type compactionAnnotations []string
91 :
92 : // SafeFormat implements redact.SafeFormatter.
93 1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
94 1 : if len(ca) == 0 {
95 1 : return
96 1 : }
97 1 : for i := range ca {
98 1 : if i != 0 {
99 0 : w.Print(" ")
100 0 : }
101 1 : w.Printf("%s", redact.SafeString(ca[i]))
102 : }
103 : }
104 :
105 1 : func (i CompactionInfo) String() string {
106 1 : return redact.StringWithoutMarkers(i)
107 1 : }
108 :
109 : // SafeFormat implements redact.SafeFormatter.
110 1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
111 1 : if i.Err != nil {
112 1 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
113 1 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
114 1 : return
115 1 : }
116 :
117 1 : if !i.Done {
118 1 : w.Printf("[JOB %d] compacting(%s) ",
119 1 : redact.Safe(i.JobID),
120 1 : redact.SafeString(i.Reason))
121 1 : w.Printf("%s", i.Annotations)
122 1 : w.Printf("%s; ", levelInfos(i.Input))
123 1 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f ", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
124 1 : return
125 1 : }
126 1 : outputSize := tablesTotalSize(i.Output.Tables)
127 1 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
128 1 : w.Printf("%s", i.Annotations)
129 1 : w.Print(levelInfos(i.Input))
130 1 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
131 1 : redact.Safe(i.Output.Level),
132 1 : redact.Safe(formatFileNums(i.Output.Tables)),
133 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
134 1 : redact.Safe(i.Duration.Seconds()),
135 1 : redact.Safe(i.TotalDuration.Seconds()),
136 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
137 : }
138 :
139 : type levelInfos []LevelInfo
140 :
141 1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
142 1 : for j, levelInfo := range i {
143 1 : if j > 0 {
144 1 : w.Printf(" + ")
145 1 : }
146 1 : w.Print(levelInfo)
147 : }
148 : }
149 :
150 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
151 : // file.
152 : type DiskSlowInfo = vfs.DiskSlowInfo
153 :
154 : // FlushInfo contains the info for a flush event.
155 : type FlushInfo struct {
156 : // JobID is the ID of the flush job.
157 : JobID int
158 : // Reason is the reason for the flush.
159 : Reason string
160 : // Input contains the count of input memtables that were flushed.
161 : Input int
162 : // InputBytes contains the total in-memory size of the memtable(s) that were
163 : // flushed. This size includes skiplist indexing data structures.
164 : InputBytes uint64
165 : // Output contains the ouptut table generated by the flush. The output info
166 : // is empty for the flush begin event.
167 : Output []TableInfo
168 : // Duration is the time spent flushing. This duration includes writing and
169 : // syncing all of the flushed keys to sstables.
170 : Duration time.Duration
171 : // TotalDuration is the total wall-time duration of the flush, including
172 : // applying the flush to the database. TotalDuration is always ≥ Duration.
173 : TotalDuration time.Duration
174 : // Ingest is set to true if the flush is handling tables that were added to
175 : // the flushable queue via an ingestion operation.
176 : Ingest bool
177 : // IngestLevels are the output levels for each ingested table in the flush.
178 : // This field is only populated when Ingest is true.
179 : IngestLevels []int
180 : Done bool
181 : Err error
182 : }
183 :
184 1 : func (i FlushInfo) String() string {
185 1 : return redact.StringWithoutMarkers(i)
186 1 : }
187 :
188 : // SafeFormat implements redact.SafeFormatter.
189 1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
190 1 : if i.Err != nil {
191 1 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
192 1 : return
193 1 : }
194 :
195 1 : plural := redact.SafeString("s")
196 1 : if i.Input == 1 {
197 1 : plural = ""
198 1 : }
199 1 : if !i.Done {
200 1 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
201 1 : if !i.Ingest {
202 1 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
203 1 : w.SafeString(plural)
204 1 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
205 1 : } else {
206 1 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
207 1 : }
208 1 : return
209 : }
210 :
211 1 : outputSize := tablesTotalSize(i.Output)
212 1 : if !i.Ingest {
213 1 : if invariants.Enabled && len(i.IngestLevels) > 0 {
214 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
215 : }
216 1 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
217 1 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
218 1 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
219 1 : redact.Safe(formatFileNums(i.Output)),
220 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
221 1 : redact.Safe(i.Duration.Seconds()),
222 1 : redact.Safe(i.TotalDuration.Seconds()),
223 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
224 1 : } else {
225 1 : if invariants.Enabled && len(i.IngestLevels) == 0 {
226 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
227 : }
228 1 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
229 1 : redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
230 1 : for j, level := range i.IngestLevels {
231 1 : file := i.Output[j]
232 1 : if j > 0 {
233 1 : w.Printf(" +")
234 1 : }
235 1 : w.Printf(" L%d:%s (%s)", level, redact.Safe(file.FileNum), humanize.Bytes.Uint64(file.Size))
236 : }
237 1 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
238 1 : redact.Safe(i.Duration.Seconds()),
239 1 : redact.Safe(i.TotalDuration.Seconds()),
240 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
241 : }
242 : }
243 :
244 : // ManifestCreateInfo contains info about a manifest creation event.
245 : type ManifestCreateInfo struct {
246 : // JobID is the ID of the job the caused the manifest to be created.
247 : JobID int
248 : Path string
249 : // The file number of the new Manifest.
250 : FileNum FileNum
251 : Err error
252 : }
253 :
254 1 : func (i ManifestCreateInfo) String() string {
255 1 : return redact.StringWithoutMarkers(i)
256 1 : }
257 :
258 : // SafeFormat implements redact.SafeFormatter.
259 1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
260 1 : if i.Err != nil {
261 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
262 0 : return
263 0 : }
264 1 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), redact.Safe(i.FileNum))
265 : }
266 :
267 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
268 : type ManifestDeleteInfo struct {
269 : // JobID is the ID of the job the caused the Manifest to be deleted.
270 : JobID int
271 : Path string
272 : FileNum FileNum
273 : Err error
274 : }
275 :
276 1 : func (i ManifestDeleteInfo) String() string {
277 1 : return redact.StringWithoutMarkers(i)
278 1 : }
279 :
280 : // SafeFormat implements redact.SafeFormatter.
281 1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
282 1 : if i.Err != nil {
283 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
284 0 : return
285 0 : }
286 1 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), redact.Safe(i.FileNum))
287 : }
288 :
289 : // TableCreateInfo contains the info for a table creation event.
290 : type TableCreateInfo struct {
291 : JobID int
292 : // Reason is the reason for the table creation: "compacting", "flushing", or
293 : // "ingesting".
294 : Reason string
295 : Path string
296 : FileNum FileNum
297 : }
298 :
299 1 : func (i TableCreateInfo) String() string {
300 1 : return redact.StringWithoutMarkers(i)
301 1 : }
302 :
303 : // SafeFormat implements redact.SafeFormatter.
304 1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
305 1 : w.Printf("[JOB %d] %s: sstable created %s",
306 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), redact.Safe(i.FileNum))
307 1 : }
308 :
309 : // TableDeleteInfo contains the info for a table deletion event.
310 : type TableDeleteInfo struct {
311 : JobID int
312 : Path string
313 : FileNum FileNum
314 : Err error
315 : }
316 :
317 1 : func (i TableDeleteInfo) String() string {
318 1 : return redact.StringWithoutMarkers(i)
319 1 : }
320 :
321 : // SafeFormat implements redact.SafeFormatter.
322 1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
323 1 : if i.Err != nil {
324 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
325 0 : redact.Safe(i.JobID), redact.Safe(i.FileNum), i.Err)
326 0 : return
327 0 : }
328 1 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), redact.Safe(i.FileNum))
329 : }
330 :
331 : // TableIngestInfo contains the info for a table ingestion event.
332 : type TableIngestInfo struct {
333 : // JobID is the ID of the job the caused the table to be ingested.
334 : JobID int
335 : Tables []struct {
336 : TableInfo
337 : Level int
338 : }
339 : // GlobalSeqNum is the sequence number that was assigned to all entries in
340 : // the ingested table.
341 : GlobalSeqNum uint64
342 : // flushable indicates whether the ingested sstable was treated as a
343 : // flushable.
344 : flushable bool
345 : Err error
346 : }
347 :
348 1 : func (i TableIngestInfo) String() string {
349 1 : return redact.StringWithoutMarkers(i)
350 1 : }
351 :
352 : // SafeFormat implements redact.SafeFormatter.
353 1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
354 1 : if i.Err != nil {
355 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
356 0 : return
357 0 : }
358 :
359 1 : if i.flushable {
360 1 : w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
361 1 : } else {
362 1 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
363 1 : }
364 :
365 1 : for j := range i.Tables {
366 1 : t := &i.Tables[j]
367 1 : if j > 0 {
368 1 : w.Printf(",")
369 1 : }
370 1 : levelStr := ""
371 1 : if !i.flushable {
372 1 : levelStr = fmt.Sprintf("L%d:", t.Level)
373 1 : }
374 1 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), redact.Safe(t.FileNum),
375 1 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
376 : }
377 : }
378 :
379 : // TableStatsInfo contains the info for a table stats loaded event.
380 : type TableStatsInfo struct {
381 : // JobID is the ID of the job that finished loading the initial tables'
382 : // stats.
383 : JobID int
384 : }
385 :
386 1 : func (i TableStatsInfo) String() string {
387 1 : return redact.StringWithoutMarkers(i)
388 1 : }
389 :
390 : // SafeFormat implements redact.SafeFormatter.
391 1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
392 1 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
393 1 : }
394 :
395 : // TableValidatedInfo contains information on the result of a validation run
396 : // on an sstable.
397 : type TableValidatedInfo struct {
398 : JobID int
399 : Meta *fileMetadata
400 : }
401 :
402 0 : func (i TableValidatedInfo) String() string {
403 0 : return redact.StringWithoutMarkers(i)
404 0 : }
405 :
406 : // SafeFormat implements redact.SafeFormatter.
407 0 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
408 0 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
409 0 : }
410 :
411 : // WALCreateInfo contains info about a WAL creation event.
412 : type WALCreateInfo struct {
413 : // JobID is the ID of the job the caused the WAL to be created.
414 : JobID int
415 : Path string
416 : // The file number of the new WAL.
417 : FileNum FileNum
418 : // The file number of a previous WAL which was recycled to create this
419 : // one. Zero if recycling did not take place.
420 : RecycledFileNum FileNum
421 : Err error
422 : }
423 :
424 1 : func (i WALCreateInfo) String() string {
425 1 : return redact.StringWithoutMarkers(i)
426 1 : }
427 :
428 : // SafeFormat implements redact.SafeFormatter.
429 1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
430 1 : if i.Err != nil {
431 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
432 0 : return
433 0 : }
434 :
435 1 : if i.RecycledFileNum == 0 {
436 1 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), redact.Safe(i.FileNum))
437 1 : return
438 1 : }
439 :
440 1 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
441 1 : redact.Safe(i.JobID), redact.Safe(i.FileNum), redact.Safe(i.RecycledFileNum))
442 : }
443 :
444 : // WALDeleteInfo contains the info for a WAL deletion event.
445 : type WALDeleteInfo struct {
446 : // JobID is the ID of the job the caused the WAL to be deleted.
447 : JobID int
448 : Path string
449 : FileNum FileNum
450 : Err error
451 : }
452 :
453 0 : func (i WALDeleteInfo) String() string {
454 0 : return redact.StringWithoutMarkers(i)
455 0 : }
456 :
457 : // SafeFormat implements redact.SafeFormatter.
458 1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
459 1 : if i.Err != nil {
460 1 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
461 1 : return
462 1 : }
463 0 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), redact.Safe(i.FileNum))
464 : }
465 :
466 : // WriteStallBeginInfo contains the info for a write stall begin event.
467 : type WriteStallBeginInfo struct {
468 : Reason string
469 : }
470 :
471 1 : func (i WriteStallBeginInfo) String() string {
472 1 : return redact.StringWithoutMarkers(i)
473 1 : }
474 :
475 : // SafeFormat implements redact.SafeFormatter.
476 1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
477 1 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
478 1 : }
479 :
480 : // EventListener contains a set of functions that will be invoked when various
481 : // significant DB events occur. Note that the functions should not run for an
482 : // excessive amount of time as they are invoked synchronously by the DB and may
483 : // block continued DB work. For a similar reason it is advisable to not perform
484 : // any synchronous calls back into the DB.
485 : type EventListener struct {
486 : // BackgroundError is invoked whenever an error occurs during a background
487 : // operation such as flush or compaction.
488 : BackgroundError func(error)
489 :
490 : // CompactionBegin is invoked after the inputs to a compaction have been
491 : // determined, but before the compaction has produced any output.
492 : CompactionBegin func(CompactionInfo)
493 :
494 : // CompactionEnd is invoked after a compaction has completed and the result
495 : // has been installed.
496 : CompactionEnd func(CompactionInfo)
497 :
498 : // DiskSlow is invoked after a disk write operation on a file created with a
499 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
500 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
501 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
502 : // MUST return without doing any IO, or blocking on anything (like a mutex)
503 : // that is waiting on IO. This is imperative in order to reliably monitor for
504 : // slowness, since if this goroutine gets stuck, the monitoring will stop
505 : // working.
506 : DiskSlow func(DiskSlowInfo)
507 :
508 : // FlushBegin is invoked after the inputs to a flush have been determined,
509 : // but before the flush has produced any output.
510 : FlushBegin func(FlushInfo)
511 :
512 : // FlushEnd is invoked after a flush has complated and the result has been
513 : // installed.
514 : FlushEnd func(FlushInfo)
515 :
516 : // FormatUpgrade is invoked after the database's FormatMajorVersion
517 : // is upgraded.
518 : FormatUpgrade func(FormatMajorVersion)
519 :
520 : // ManifestCreated is invoked after a manifest has been created.
521 : ManifestCreated func(ManifestCreateInfo)
522 :
523 : // ManifestDeleted is invoked after a manifest has been deleted.
524 : ManifestDeleted func(ManifestDeleteInfo)
525 :
526 : // TableCreated is invoked when a table has been created.
527 : TableCreated func(TableCreateInfo)
528 :
529 : // TableDeleted is invoked after a table has been deleted.
530 : TableDeleted func(TableDeleteInfo)
531 :
532 : // TableIngested is invoked after an externally created table has been
533 : // ingested via a call to DB.Ingest().
534 : TableIngested func(TableIngestInfo)
535 :
536 : // TableStatsLoaded is invoked at most once, when the table stats
537 : // collector has loaded statistics for all tables that existed at Open.
538 : TableStatsLoaded func(TableStatsInfo)
539 :
540 : // TableValidated is invoked after validation runs on an sstable.
541 : TableValidated func(TableValidatedInfo)
542 :
543 : // WALCreated is invoked after a WAL has been created.
544 : WALCreated func(WALCreateInfo)
545 :
546 : // WALDeleted is invoked after a WAL has been deleted.
547 : WALDeleted func(WALDeleteInfo)
548 :
549 : // WriteStallBegin is invoked when writes are intentionally delayed.
550 : WriteStallBegin func(WriteStallBeginInfo)
551 :
552 : // WriteStallEnd is invoked when delayed writes are released.
553 : WriteStallEnd func()
554 : }
555 :
556 : // EnsureDefaults ensures that background error events are logged to the
557 : // specified logger if a handler for those events hasn't been otherwise
558 : // specified. Ensure all handlers are non-nil so that we don't have to check
559 : // for nil-ness before invoking.
560 1 : func (l *EventListener) EnsureDefaults(logger Logger) {
561 1 : if l.BackgroundError == nil {
562 1 : if logger != nil {
563 1 : l.BackgroundError = func(err error) {
564 1 : logger.Infof("background error: %s", err)
565 1 : }
566 1 : } else {
567 1 : l.BackgroundError = func(error) {}
568 : }
569 : }
570 1 : if l.CompactionBegin == nil {
571 1 : l.CompactionBegin = func(info CompactionInfo) {}
572 : }
573 1 : if l.CompactionEnd == nil {
574 1 : l.CompactionEnd = func(info CompactionInfo) {}
575 : }
576 1 : if l.DiskSlow == nil {
577 1 : l.DiskSlow = func(info DiskSlowInfo) {}
578 : }
579 1 : if l.FlushBegin == nil {
580 1 : l.FlushBegin = func(info FlushInfo) {}
581 : }
582 1 : if l.FlushEnd == nil {
583 1 : l.FlushEnd = func(info FlushInfo) {}
584 : }
585 1 : if l.FormatUpgrade == nil {
586 1 : l.FormatUpgrade = func(v FormatMajorVersion) {}
587 : }
588 1 : if l.ManifestCreated == nil {
589 1 : l.ManifestCreated = func(info ManifestCreateInfo) {}
590 : }
591 1 : if l.ManifestDeleted == nil {
592 1 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
593 : }
594 1 : if l.TableCreated == nil {
595 1 : l.TableCreated = func(info TableCreateInfo) {}
596 : }
597 1 : if l.TableDeleted == nil {
598 1 : l.TableDeleted = func(info TableDeleteInfo) {}
599 : }
600 1 : if l.TableIngested == nil {
601 1 : l.TableIngested = func(info TableIngestInfo) {}
602 : }
603 1 : if l.TableStatsLoaded == nil {
604 1 : l.TableStatsLoaded = func(info TableStatsInfo) {}
605 : }
606 1 : if l.TableValidated == nil {
607 1 : l.TableValidated = func(validated TableValidatedInfo) {}
608 : }
609 1 : if l.WALCreated == nil {
610 1 : l.WALCreated = func(info WALCreateInfo) {}
611 : }
612 1 : if l.WALDeleted == nil {
613 1 : l.WALDeleted = func(info WALDeleteInfo) {}
614 : }
615 1 : if l.WriteStallBegin == nil {
616 1 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
617 : }
618 1 : if l.WriteStallEnd == nil {
619 1 : l.WriteStallEnd = func() {}
620 : }
621 : }
622 :
623 : // MakeLoggingEventListener creates an EventListener that logs all events to the
624 : // specified logger.
625 1 : func MakeLoggingEventListener(logger Logger) EventListener {
626 1 : if logger == nil {
627 1 : logger = DefaultLogger
628 1 : }
629 :
630 1 : return EventListener{
631 1 : BackgroundError: func(err error) {
632 1 : logger.Infof("background error: %s", err)
633 1 : },
634 1 : CompactionBegin: func(info CompactionInfo) {
635 1 : logger.Infof("%s", info)
636 1 : },
637 1 : CompactionEnd: func(info CompactionInfo) {
638 1 : logger.Infof("%s", info)
639 1 : },
640 0 : DiskSlow: func(info DiskSlowInfo) {
641 0 : logger.Infof("%s", info)
642 0 : },
643 1 : FlushBegin: func(info FlushInfo) {
644 1 : logger.Infof("%s", info)
645 1 : },
646 1 : FlushEnd: func(info FlushInfo) {
647 1 : logger.Infof("%s", info)
648 1 : },
649 1 : FormatUpgrade: func(v FormatMajorVersion) {
650 1 : logger.Infof("upgraded to format version: %s", v)
651 1 : },
652 1 : ManifestCreated: func(info ManifestCreateInfo) {
653 1 : logger.Infof("%s", info)
654 1 : },
655 1 : ManifestDeleted: func(info ManifestDeleteInfo) {
656 1 : logger.Infof("%s", info)
657 1 : },
658 1 : TableCreated: func(info TableCreateInfo) {
659 1 : logger.Infof("%s", info)
660 1 : },
661 1 : TableDeleted: func(info TableDeleteInfo) {
662 1 : logger.Infof("%s", info)
663 1 : },
664 1 : TableIngested: func(info TableIngestInfo) {
665 1 : logger.Infof("%s", info)
666 1 : },
667 1 : TableStatsLoaded: func(info TableStatsInfo) {
668 1 : logger.Infof("%s", info)
669 1 : },
670 0 : TableValidated: func(info TableValidatedInfo) {
671 0 : logger.Infof("%s", info)
672 0 : },
673 1 : WALCreated: func(info WALCreateInfo) {
674 1 : logger.Infof("%s", info)
675 1 : },
676 1 : WALDeleted: func(info WALDeleteInfo) {
677 1 : logger.Infof("%s", info)
678 1 : },
679 1 : WriteStallBegin: func(info WriteStallBeginInfo) {
680 1 : logger.Infof("%s", info)
681 1 : },
682 1 : WriteStallEnd: func() {
683 1 : logger.Infof("write stall ending")
684 1 : },
685 : }
686 : }
687 :
688 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
689 1 : func TeeEventListener(a, b EventListener) EventListener {
690 1 : a.EnsureDefaults(nil)
691 1 : b.EnsureDefaults(nil)
692 1 : return EventListener{
693 1 : BackgroundError: func(err error) {
694 1 : a.BackgroundError(err)
695 1 : b.BackgroundError(err)
696 1 : },
697 1 : CompactionBegin: func(info CompactionInfo) {
698 1 : a.CompactionBegin(info)
699 1 : b.CompactionBegin(info)
700 1 : },
701 1 : CompactionEnd: func(info CompactionInfo) {
702 1 : a.CompactionEnd(info)
703 1 : b.CompactionEnd(info)
704 1 : },
705 0 : DiskSlow: func(info DiskSlowInfo) {
706 0 : a.DiskSlow(info)
707 0 : b.DiskSlow(info)
708 0 : },
709 1 : FlushBegin: func(info FlushInfo) {
710 1 : a.FlushBegin(info)
711 1 : b.FlushBegin(info)
712 1 : },
713 1 : FlushEnd: func(info FlushInfo) {
714 1 : a.FlushEnd(info)
715 1 : b.FlushEnd(info)
716 1 : },
717 1 : FormatUpgrade: func(v FormatMajorVersion) {
718 1 : a.FormatUpgrade(v)
719 1 : b.FormatUpgrade(v)
720 1 : },
721 1 : ManifestCreated: func(info ManifestCreateInfo) {
722 1 : a.ManifestCreated(info)
723 1 : b.ManifestCreated(info)
724 1 : },
725 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
726 0 : a.ManifestDeleted(info)
727 0 : b.ManifestDeleted(info)
728 0 : },
729 1 : TableCreated: func(info TableCreateInfo) {
730 1 : a.TableCreated(info)
731 1 : b.TableCreated(info)
732 1 : },
733 1 : TableDeleted: func(info TableDeleteInfo) {
734 1 : a.TableDeleted(info)
735 1 : b.TableDeleted(info)
736 1 : },
737 1 : TableIngested: func(info TableIngestInfo) {
738 1 : a.TableIngested(info)
739 1 : b.TableIngested(info)
740 1 : },
741 1 : TableStatsLoaded: func(info TableStatsInfo) {
742 1 : a.TableStatsLoaded(info)
743 1 : b.TableStatsLoaded(info)
744 1 : },
745 0 : TableValidated: func(info TableValidatedInfo) {
746 0 : a.TableValidated(info)
747 0 : b.TableValidated(info)
748 0 : },
749 1 : WALCreated: func(info WALCreateInfo) {
750 1 : a.WALCreated(info)
751 1 : b.WALCreated(info)
752 1 : },
753 1 : WALDeleted: func(info WALDeleteInfo) {
754 1 : a.WALDeleted(info)
755 1 : b.WALDeleted(info)
756 1 : },
757 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
758 0 : a.WriteStallBegin(info)
759 0 : b.WriteStallBegin(info)
760 0 : },
761 0 : WriteStallEnd: func() {
762 0 : a.WriteStallEnd()
763 0 : b.WriteStallEnd()
764 0 : },
765 : }
766 : }
|