Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package remoteobjcat
6 :
7 : import (
8 : "bufio"
9 : "encoding/binary"
10 : "io"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/objstorage"
16 : "github.com/cockroachdb/pebble/objstorage/remote"
17 : )
18 :
19 : // VersionEdit is a modification to the remote object state which can be encoded
20 : // into a record.
21 : //
22 : // TODO(radu): consider adding creation and deletion time for debugging purposes.
23 : type VersionEdit struct {
24 : NewObjects []RemoteObjectMetadata
25 : DeletedObjects []base.DiskFileNum
26 : CreatorID objstorage.CreatorID
27 : }
28 :
29 : const (
30 : // tagNewObject is followed by the FileNum, creator ID, creator FileNum,
31 : // cleanup method, optional new object tags, and ending with a 0 byte.
32 : tagNewObject = 1
33 : // tagDeletedObject is followed by the FileNum.
34 : tagDeletedObject = 2
35 : // tagCreatorID is followed by the Creator ID for this store. This ID can
36 : // never change.
37 : tagCreatorID = 3
38 : // tagNewObjectLocator is an optional tag inside the tagNewObject payload. It
39 : // is followed by the encoded length of the locator string and the string.
40 : tagNewObjectLocator = 4
41 : // tagNewObjectCustomName is an optional tag inside the tagNewObject payload.
42 : // It is followed by the encoded length of the custom object name string
43 : // followed by the string.
44 : tagNewObjectCustomName = 5
45 : )
46 :
47 : // Object type values. We don't want to encode FileType directly because it is
48 : // more general (and we want freedom to change it in the future).
49 : const (
50 : objTypeTable = 1
51 : )
52 :
53 2 : func objTypeToFileType(objType uint64) (base.FileType, error) {
54 2 : switch objType {
55 2 : case objTypeTable:
56 2 : return base.FileTypeTable, nil
57 0 : default:
58 0 : return 0, errors.Newf("unknown object type %d", objType)
59 : }
60 : }
61 :
62 2 : func fileTypeToObjType(fileType base.FileType) (uint64, error) {
63 2 : switch fileType {
64 2 : case base.FileTypeTable:
65 2 : return objTypeTable, nil
66 :
67 0 : default:
68 0 : return 0, errors.Newf("unknown object type for file type %d", fileType)
69 : }
70 : }
71 :
72 : // Encode encodes an edit to the specified writer.
73 2 : func (v *VersionEdit) Encode(w io.Writer) error {
74 2 : buf := make([]byte, 0, binary.MaxVarintLen64*(len(v.NewObjects)*10+len(v.DeletedObjects)*2+2))
75 2 : for _, meta := range v.NewObjects {
76 2 : objType, err := fileTypeToObjType(meta.FileType)
77 2 : if err != nil {
78 0 : return err
79 0 : }
80 2 : buf = binary.AppendUvarint(buf, uint64(tagNewObject))
81 2 : buf = binary.AppendUvarint(buf, uint64(meta.FileNum))
82 2 : buf = binary.AppendUvarint(buf, objType)
83 2 : buf = binary.AppendUvarint(buf, uint64(meta.CreatorID))
84 2 : buf = binary.AppendUvarint(buf, uint64(meta.CreatorFileNum))
85 2 : buf = binary.AppendUvarint(buf, uint64(meta.CleanupMethod))
86 2 : if meta.Locator != "" {
87 2 : buf = binary.AppendUvarint(buf, uint64(tagNewObjectLocator))
88 2 : buf = encodeString(buf, string(meta.Locator))
89 2 : }
90 2 : if meta.CustomObjectName != "" {
91 2 : buf = binary.AppendUvarint(buf, uint64(tagNewObjectCustomName))
92 2 : buf = encodeString(buf, meta.CustomObjectName)
93 2 : }
94 : // Append 0 as the terminator for optional new object tags.
95 2 : buf = binary.AppendUvarint(buf, 0)
96 : }
97 :
98 2 : for _, dfn := range v.DeletedObjects {
99 2 : buf = binary.AppendUvarint(buf, uint64(tagDeletedObject))
100 2 : buf = binary.AppendUvarint(buf, uint64(dfn))
101 2 : }
102 2 : if v.CreatorID.IsSet() {
103 2 : buf = binary.AppendUvarint(buf, uint64(tagCreatorID))
104 2 : buf = binary.AppendUvarint(buf, uint64(v.CreatorID))
105 2 : }
106 2 : _, err := w.Write(buf)
107 2 : return err
108 : }
109 :
110 : // Decode decodes an edit from the specified reader.
111 2 : func (v *VersionEdit) Decode(r io.Reader) error {
112 2 : br, ok := r.(io.ByteReader)
113 2 : if !ok {
114 2 : br = bufio.NewReader(r)
115 2 : }
116 2 : for {
117 2 : tag, err := binary.ReadUvarint(br)
118 2 : if err == io.EOF {
119 2 : break
120 : }
121 2 : if err != nil {
122 0 : return err
123 0 : }
124 :
125 2 : err = nil
126 2 : switch tag {
127 2 : case tagNewObject:
128 2 : var fileNum, creatorID, creatorFileNum, cleanupMethod uint64
129 2 : var locator, customName string
130 2 : var fileType base.FileType
131 2 : fileNum, err = binary.ReadUvarint(br)
132 2 : if err == nil {
133 2 : var objType uint64
134 2 : objType, err = binary.ReadUvarint(br)
135 2 : if err == nil {
136 2 : fileType, err = objTypeToFileType(objType)
137 2 : }
138 : }
139 2 : if err == nil {
140 2 : creatorID, err = binary.ReadUvarint(br)
141 2 : }
142 2 : if err == nil {
143 2 : creatorFileNum, err = binary.ReadUvarint(br)
144 2 : }
145 2 : if err == nil {
146 2 : cleanupMethod, err = binary.ReadUvarint(br)
147 2 : }
148 2 : for err == nil {
149 2 : var optionalTag uint64
150 2 : optionalTag, err = binary.ReadUvarint(br)
151 2 : if err != nil || optionalTag == 0 {
152 2 : break
153 : }
154 :
155 2 : switch optionalTag {
156 2 : case tagNewObjectLocator:
157 2 : locator, err = decodeString(br)
158 :
159 2 : case tagNewObjectCustomName:
160 2 : customName, err = decodeString(br)
161 :
162 0 : default:
163 0 : err = errors.Newf("unknown newObject tag %d", optionalTag)
164 : }
165 : }
166 :
167 2 : if err == nil {
168 2 : v.NewObjects = append(v.NewObjects, RemoteObjectMetadata{
169 2 : FileNum: base.DiskFileNum(fileNum),
170 2 : FileType: fileType,
171 2 : CreatorID: objstorage.CreatorID(creatorID),
172 2 : CreatorFileNum: base.DiskFileNum(creatorFileNum),
173 2 : CleanupMethod: objstorage.SharedCleanupMethod(cleanupMethod),
174 2 : Locator: remote.Locator(locator),
175 2 : CustomObjectName: customName,
176 2 : })
177 2 : }
178 :
179 2 : case tagDeletedObject:
180 2 : var fileNum uint64
181 2 : fileNum, err = binary.ReadUvarint(br)
182 2 : if err == nil {
183 2 : v.DeletedObjects = append(v.DeletedObjects, base.DiskFileNum(fileNum))
184 2 : }
185 :
186 2 : case tagCreatorID:
187 2 : var id uint64
188 2 : id, err = binary.ReadUvarint(br)
189 2 : if err == nil {
190 2 : v.CreatorID = objstorage.CreatorID(id)
191 2 : }
192 :
193 0 : default:
194 0 : err = errors.Newf("unknown tag %d", tag)
195 : }
196 :
197 2 : if err != nil {
198 0 : if err == io.EOF {
199 0 : return errCorruptCatalog
200 0 : }
201 0 : return err
202 : }
203 : }
204 2 : return nil
205 : }
206 :
207 2 : func encodeString(buf []byte, s string) []byte {
208 2 : buf = binary.AppendUvarint(buf, uint64(len(s)))
209 2 : buf = append(buf, []byte(s)...)
210 2 : return buf
211 2 : }
212 :
213 2 : func decodeString(br io.ByteReader) (string, error) {
214 2 : length, err := binary.ReadUvarint(br)
215 2 : if err != nil || length == 0 {
216 0 : return "", err
217 0 : }
218 2 : buf := make([]byte, length)
219 2 : for i := range buf {
220 2 : buf[i], err = br.ReadByte()
221 2 : if err != nil {
222 0 : return "", err
223 0 : }
224 : }
225 2 : return string(buf), nil
226 : }
227 :
228 : var errCorruptCatalog = base.CorruptionErrorf("pebble: corrupt remote object catalog")
229 :
230 : // Apply the version edit to a creator ID and a map of objects.
231 : func (v *VersionEdit) Apply(
232 : creatorID *objstorage.CreatorID, objects map[base.DiskFileNum]RemoteObjectMetadata,
233 2 : ) error {
234 2 : if v.CreatorID.IsSet() {
235 2 : *creatorID = v.CreatorID
236 2 : }
237 2 : for _, meta := range v.NewObjects {
238 2 : if invariants.Enabled {
239 2 : if _, exists := objects[meta.FileNum]; exists {
240 0 : return base.AssertionFailedf("version edit adds existing object %s", meta.FileNum)
241 0 : }
242 : }
243 2 : objects[meta.FileNum] = meta
244 : }
245 2 : for _, fileNum := range v.DeletedObjects {
246 2 : if invariants.Enabled {
247 2 : if _, exists := objects[fileNum]; !exists {
248 0 : return base.AssertionFailedf("version edit deletes non-existent object %s", fileNum)
249 0 : }
250 : }
251 2 : delete(objects, fileNum)
252 : }
253 2 : return nil
254 : }
|