Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package ackseq 6 : 7 : import ( 8 : "sync" 9 : "sync/atomic" 10 : 11 : "github.com/cockroachdb/errors" 12 : ) 13 : 14 : const ( 15 : // The window size constants. These values specify a window that can hold ~1m 16 : // pending unacknowledged sequence numbers using 128 KB of memory. 17 : windowSize = 1 << 20 18 : windowMask = windowSize - 1 19 : windowBytes = (windowSize + 7) / 8 20 : ) 21 : 22 : // S keeps track of the largest sequence number such that all sequence numbers 23 : // in the range [0,v) have been acknowledged. 24 : type S struct { 25 : next atomic.Uint64 26 : mu struct { 27 : sync.Mutex 28 : base uint64 29 : window [windowBytes]uint8 30 : } 31 : } 32 : 33 : // New creates a new acknowledged sequence tracker with the specified base 34 : // sequence number. All of the sequence numbers in the range [0,base) are 35 : // considered acknowledged. Next() will return base upon first call. 36 0 : func New(base uint64) *S { 37 0 : s := &S{} 38 0 : s.next.Store(base) 39 0 : s.mu.base = base 40 0 : return s 41 0 : } 42 : 43 : // Next returns the next sequence number to use. 44 0 : func (s *S) Next() uint64 { 45 0 : return s.next.Add(1) - 1 46 0 : } 47 : 48 : // Ack acknowledges the specified seqNum, adjusting base as necessary, 49 : // returning the number of newly acknowledged sequence numbers. 50 0 : func (s *S) Ack(seqNum uint64) (int, error) { 51 0 : s.mu.Lock() 52 0 : if s.getLocked(seqNum) { 53 0 : defer s.mu.Unlock() 54 0 : return 0, errors.Errorf( 55 0 : "pending acks exceeds window size: %d has been acked, but %d has not", 56 0 : errors.Safe(seqNum), errors.Safe(s.mu.base)) 57 0 : } 58 : 59 0 : var count int 60 0 : s.setLocked(seqNum) 61 0 : for s.getLocked(s.mu.base) { 62 0 : s.clearLocked(s.mu.base) 63 0 : s.mu.base++ 64 0 : count++ 65 0 : } 66 0 : s.mu.Unlock() 67 0 : return count, nil 68 : } 69 : 70 0 : func (s *S) getLocked(seqNum uint64) bool { 71 0 : bit := seqNum & windowMask 72 0 : return (s.mu.window[bit/8] & (1 << (bit % 8))) != 0 73 0 : } 74 : 75 0 : func (s *S) setLocked(seqNum uint64) { 76 0 : bit := seqNum & windowMask 77 0 : s.mu.window[bit/8] |= (1 << (bit % 8)) 78 0 : } 79 : 80 0 : func (s *S) clearLocked(seqNum uint64) { 81 0 : bit := seqNum & windowMask 82 0 : s.mu.window[bit/8] &^= (1 << (bit % 8)) 83 0 : }