Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package remoteobjcat
6 :
7 : import (
8 : "bufio"
9 : "encoding/binary"
10 : "io"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/objstorage"
16 : "github.com/cockroachdb/pebble/objstorage/remote"
17 : )
18 :
19 : // VersionEdit is a modification to the remote object state which can be encoded
20 : // into a record.
21 : //
22 : // TODO(radu): consider adding creation and deletion time for debugging purposes.
23 : type VersionEdit struct {
24 : NewObjects []RemoteObjectMetadata
25 : DeletedObjects []base.DiskFileNum
26 : CreatorID objstorage.CreatorID
27 : }
28 :
29 : const (
30 : // tagNewObject is followed by the FileNum, creator ID, creator FileNum,
31 : // cleanup method, optional new object tags, and ending with a 0 byte.
32 : tagNewObject = 1
33 : // tagDeletedObject is followed by the FileNum.
34 : tagDeletedObject = 2
35 : // tagCreatorID is followed by the Creator ID for this store. This ID can
36 : // never change.
37 : tagCreatorID = 3
38 : // tagNewObjectLocator is an optional tag inside the tagNewObject payload. It
39 : // is followed by the encoded length of the locator string and the string.
40 : tagNewObjectLocator = 4
41 : // tagNewObjectCustomName is an optional tag inside the tagNewObject payload.
42 : // It is followed by the encoded length of the custom object name string
43 : // followed by the string.
44 : tagNewObjectCustomName = 5
45 : )
46 :
47 : // Object type values. We don't want to encode FileType directly because it is
48 : // more general (and we want freedom to change it in the future).
49 : const (
50 : objTypeTable = 1
51 : )
52 :
53 1 : func objTypeToFileType(objType uint64) (base.FileType, error) {
54 1 : switch objType {
55 1 : case objTypeTable:
56 1 : return base.FileTypeTable, nil
57 0 : default:
58 0 : return 0, errors.Newf("unknown object type %d", objType)
59 : }
60 : }
61 :
62 1 : func fileTypeToObjType(fileType base.FileType) (uint64, error) {
63 1 : switch fileType {
64 1 : case base.FileTypeTable:
65 1 : return objTypeTable, nil
66 :
67 0 : default:
68 0 : return 0, errors.Newf("unknown object type for file type %d", fileType)
69 : }
70 : }
71 :
72 : // Encode encodes an edit to the specified writer.
73 1 : func (v *VersionEdit) Encode(w io.Writer) error {
74 1 : buf := make([]byte, 0, binary.MaxVarintLen64*(len(v.NewObjects)*10+len(v.DeletedObjects)*2+2))
75 1 : for _, meta := range v.NewObjects {
76 1 : objType, err := fileTypeToObjType(meta.FileType)
77 1 : if err != nil {
78 0 : return err
79 0 : }
80 1 : buf = binary.AppendUvarint(buf, uint64(tagNewObject))
81 1 : buf = binary.AppendUvarint(buf, uint64(meta.FileNum.FileNum()))
82 1 : buf = binary.AppendUvarint(buf, objType)
83 1 : buf = binary.AppendUvarint(buf, uint64(meta.CreatorID))
84 1 : buf = binary.AppendUvarint(buf, uint64(meta.CreatorFileNum.FileNum()))
85 1 : buf = binary.AppendUvarint(buf, uint64(meta.CleanupMethod))
86 1 : if meta.Locator != "" {
87 0 : buf = binary.AppendUvarint(buf, uint64(tagNewObjectLocator))
88 0 : buf = encodeString(buf, string(meta.Locator))
89 0 : }
90 1 : if meta.CustomObjectName != "" {
91 0 : buf = binary.AppendUvarint(buf, uint64(tagNewObjectCustomName))
92 0 : buf = encodeString(buf, meta.CustomObjectName)
93 0 : }
94 : // Append 0 as the terminator for optional new object tags.
95 1 : buf = binary.AppendUvarint(buf, 0)
96 : }
97 :
98 1 : for _, dfn := range v.DeletedObjects {
99 1 : buf = binary.AppendUvarint(buf, uint64(tagDeletedObject))
100 1 : buf = binary.AppendUvarint(buf, uint64(dfn.FileNum()))
101 1 : }
102 1 : if v.CreatorID.IsSet() {
103 1 : buf = binary.AppendUvarint(buf, uint64(tagCreatorID))
104 1 : buf = binary.AppendUvarint(buf, uint64(v.CreatorID))
105 1 : }
106 1 : _, err := w.Write(buf)
107 1 : return err
108 : }
109 :
110 : // Decode decodes an edit from the specified reader.
111 1 : func (v *VersionEdit) Decode(r io.Reader) error {
112 1 : br, ok := r.(io.ByteReader)
113 1 : if !ok {
114 1 : br = bufio.NewReader(r)
115 1 : }
116 1 : for {
117 1 : tag, err := binary.ReadUvarint(br)
118 1 : if err == io.EOF {
119 1 : break
120 : }
121 1 : if err != nil {
122 0 : return err
123 0 : }
124 :
125 1 : err = nil
126 1 : switch tag {
127 1 : case tagNewObject:
128 1 : var fileNum, creatorID, creatorFileNum, cleanupMethod uint64
129 1 : var locator, customName string
130 1 : var fileType base.FileType
131 1 : fileNum, err = binary.ReadUvarint(br)
132 1 : if err == nil {
133 1 : var objType uint64
134 1 : objType, err = binary.ReadUvarint(br)
135 1 : if err == nil {
136 1 : fileType, err = objTypeToFileType(objType)
137 1 : }
138 : }
139 1 : if err == nil {
140 1 : creatorID, err = binary.ReadUvarint(br)
141 1 : }
142 1 : if err == nil {
143 1 : creatorFileNum, err = binary.ReadUvarint(br)
144 1 : }
145 1 : if err == nil {
146 1 : cleanupMethod, err = binary.ReadUvarint(br)
147 1 : }
148 1 : for err == nil {
149 1 : var optionalTag uint64
150 1 : optionalTag, err = binary.ReadUvarint(br)
151 1 : if err != nil || optionalTag == 0 {
152 1 : break
153 : }
154 :
155 0 : switch optionalTag {
156 0 : case tagNewObjectLocator:
157 0 : locator, err = decodeString(br)
158 :
159 0 : case tagNewObjectCustomName:
160 0 : customName, err = decodeString(br)
161 :
162 0 : default:
163 0 : err = errors.Newf("unknown newObject tag %d", optionalTag)
164 : }
165 : }
166 :
167 1 : if err == nil {
168 1 : v.NewObjects = append(v.NewObjects, RemoteObjectMetadata{
169 1 : FileNum: base.FileNum(fileNum).DiskFileNum(),
170 1 : FileType: fileType,
171 1 : CreatorID: objstorage.CreatorID(creatorID),
172 1 : CreatorFileNum: base.FileNum(creatorFileNum).DiskFileNum(),
173 1 : CleanupMethod: objstorage.SharedCleanupMethod(cleanupMethod),
174 1 : Locator: remote.Locator(locator),
175 1 : CustomObjectName: customName,
176 1 : })
177 1 : }
178 :
179 1 : case tagDeletedObject:
180 1 : var fileNum uint64
181 1 : fileNum, err = binary.ReadUvarint(br)
182 1 : if err == nil {
183 1 : v.DeletedObjects = append(v.DeletedObjects, base.FileNum(fileNum).DiskFileNum())
184 1 : }
185 :
186 1 : case tagCreatorID:
187 1 : var id uint64
188 1 : id, err = binary.ReadUvarint(br)
189 1 : if err == nil {
190 1 : v.CreatorID = objstorage.CreatorID(id)
191 1 : }
192 :
193 0 : default:
194 0 : err = errors.Newf("unknown tag %d", tag)
195 : }
196 :
197 1 : if err != nil {
198 0 : if err == io.EOF {
199 0 : return errCorruptCatalog
200 0 : }
201 0 : return err
202 : }
203 : }
204 1 : return nil
205 : }
206 :
207 0 : func encodeString(buf []byte, s string) []byte {
208 0 : buf = binary.AppendUvarint(buf, uint64(len(s)))
209 0 : buf = append(buf, []byte(s)...)
210 0 : return buf
211 0 : }
212 :
213 0 : func decodeString(br io.ByteReader) (string, error) {
214 0 : length, err := binary.ReadUvarint(br)
215 0 : if err != nil || length == 0 {
216 0 : return "", err
217 0 : }
218 0 : buf := make([]byte, length)
219 0 : for i := range buf {
220 0 : buf[i], err = br.ReadByte()
221 0 : if err != nil {
222 0 : return "", err
223 0 : }
224 : }
225 0 : return string(buf), nil
226 : }
227 :
228 : var errCorruptCatalog = base.CorruptionErrorf("pebble: corrupt remote object catalog")
229 :
230 : // Apply the version edit to a creator ID and a map of objects.
231 : func (v *VersionEdit) Apply(
232 : creatorID *objstorage.CreatorID, objects map[base.DiskFileNum]RemoteObjectMetadata,
233 1 : ) error {
234 1 : if v.CreatorID.IsSet() {
235 1 : *creatorID = v.CreatorID
236 1 : }
237 1 : for _, meta := range v.NewObjects {
238 1 : if invariants.Enabled {
239 1 : if _, exists := objects[meta.FileNum]; exists {
240 0 : return errors.AssertionFailedf("version edit adds existing object %s", meta.FileNum)
241 0 : }
242 : }
243 1 : objects[meta.FileNum] = meta
244 : }
245 1 : for _, fileNum := range v.DeletedObjects {
246 1 : if invariants.Enabled {
247 1 : if _, exists := objects[fileNum]; !exists {
248 0 : return errors.AssertionFailedf("version edit deletes non-existent object %s", fileNum)
249 0 : }
250 : }
251 1 : delete(objects, fileNum)
252 : }
253 1 : return nil
254 : }
|