Coverage Report

Created: 2025-02-21 07:11

/rust/registry/src/index.crates.io-6f17d22bba15001f/surrealkv-0.8.1/src/store.rs
Line
Count
Source (jump to first uncovered line)
1
use ahash::{HashMap, HashMapExt};
2
use async_channel::{bounded, Receiver, Sender};
3
use bytes::{Bytes, BytesMut};
4
use futures::{select, FutureExt};
5
use parking_lot::RwLock;
6
use quick_cache::sync::Cache;
7
use revision::Revisioned;
8
use std::path::Path;
9
use std::sync::atomic::AtomicBool;
10
use std::sync::atomic::Ordering;
11
use std::sync::Arc;
12
use std::vec;
13
14
use crate::compaction::restore_from_compaction;
15
use crate::entry::{encode_entries, Entry, Record};
16
use crate::error::{Error, Result};
17
use crate::indexer::{IndexValue, Indexer};
18
use crate::log::{Aol, Error as LogError, MultiSegmentReader, Options as LogOptions, SegmentRef};
19
use crate::manifest::Manifest;
20
use crate::option::Options;
21
use crate::oracle::Oracle;
22
use crate::reader::{Reader, RecordReader};
23
use crate::repair::{repair_last_corrupted_segment, restore_repair_files};
24
use crate::stats::StorageStats;
25
use crate::transaction::{Durability, Mode, Transaction};
26
27
pub(crate) struct StoreInner {
28
    pub(crate) core: Arc<Core>,
29
    pub(crate) is_closed: AtomicBool,
30
    pub(crate) is_compacting: AtomicBool,
31
    stop_tx: Sender<()>,
32
    done_rx: Receiver<()>,
33
    pub(crate) stats: Arc<StorageStats>,
34
}
35
36
// Inner representation of the store. The wrapper will handle the asynchronous closing of the store.
37
impl StoreInner {
38
    /// Creates a new MVCC key-value store with the given options.
39
    /// It creates a new core with the options and wraps it in an atomic reference counter.
40
    /// It returns the store.
41
21.4k
    pub fn new(opts: Options) -> Result<Self> {
42
21.4k
        // TODO: make this channel size configurable
43
21.4k
        let (writes_tx, writes_rx) = bounded(10000);
44
21.4k
        let (stop_tx, stop_rx) = bounded(1);
45
46
21.4k
        let core = Arc::new(Core::new(opts, writes_tx)?);
47
21.4k
        let (task_runner, done_rx) = TaskRunner::new(core.clone(), writes_rx, stop_rx);
48
21.4k
        task_runner.spawn();
49
21.4k
50
21.4k
        Ok(Self {
51
21.4k
            core,
52
21.4k
            stop_tx,
53
21.4k
            done_rx,
54
21.4k
            is_closed: AtomicBool::new(false),
55
21.4k
            is_compacting: AtomicBool::new(false),
56
21.4k
            stats: Arc::new(StorageStats::new()),
57
21.4k
        })
58
21.4k
    }
59
60
    /// Closes the store. It sends a stop signal to the writer and waits for the done signal.
61
0
    pub async fn close(&self) -> Result<()> {
62
0
        if self.is_closed.load(Ordering::SeqCst) {
63
0
            return Ok(());
64
0
        }
65
0
66
0
        if self.is_compacting.load(Ordering::SeqCst) {
67
0
            return Err(Error::CompactionAlreadyInProgress);
68
0
        }
69
0
70
0
        // Send stop signal
71
0
        self.stop_tx
72
0
            .send(())
73
0
            .await
74
0
            .map_err(|e| Error::SendError(format!("{}", e)))?;
75
76
        // Wait for done signal
77
0
        self.done_rx.recv().await.map_err(|e| {
78
0
            Error::ReceiveError(format!("Error waiting for task runner to complete: {}", e))
79
0
        })?;
80
81
0
        self.core.close()?;
82
83
0
        self.is_closed.store(true, Ordering::Relaxed);
84
0
85
0
        Ok(())
86
0
    }
87
}
88
89
/// An MVCC-based transactional key-value store.
90
///
91
/// The store is closed asynchronously when it is dropped.
92
/// If you need to guarantee that the store is closed before the program continues, use the `close` method.
93
94
// This is a wrapper around the inner store to allow for asynchronous closing of the store.
95
#[derive(Default)]
96
pub struct Store {
97
    pub(crate) inner: Option<StoreInner>,
98
}
99
100
impl Store {
101
    /// Creates a new MVCC key-value store with the given options.
102
21.4k
    pub fn new(opts: Options) -> Result<Self> {
103
21.4k
        Ok(Self {
104
21.4k
            inner: Some(StoreInner::new(opts)?),
105
        })
106
21.4k
    }
107
108
    /// Begins a new read-write transaction.
109
    /// It creates a new transaction with the core and read-write mode, and sets the read timestamp from the oracle.
110
    /// It returns the transaction.
111
206k
    pub fn begin(&self) -> Result<Transaction> {
112
206k
        let txn = Transaction::new(self.inner.as_ref().unwrap().core.clone(), Mode::ReadWrite)?;
113
206k
        Ok(txn)
114
206k
    }
115
116
    /// Begins a new transaction with the given mode.
117
    /// It creates a new transaction with the core and the given mode, and sets the read timestamp from the oracle.
118
    /// It returns the transaction.
119
0
    pub fn begin_with_mode(&self, mode: Mode) -> Result<Transaction> {
120
0
        let txn = Transaction::new(self.inner.as_ref().unwrap().core.clone(), mode)?;
121
0
        Ok(txn)
122
0
    }
123
124
    /// Executes a function in a read-only transaction.
125
    /// It begins a new read-only transaction and executes the function with the transaction.
126
    /// It returns the result of the function.
127
0
    pub fn view(&self, f: impl FnOnce(&mut Transaction) -> Result<()>) -> Result<()> {
128
0
        let mut txn = self.begin_with_mode(Mode::ReadOnly)?;
129
0
        f(&mut txn)?;
130
131
0
        Ok(())
132
0
    }
133
134
    /// Executes a function in a read-write transaction and commits the transaction.
135
    /// It begins a new read-write transaction, executes the function with the transaction, and commits the transaction.
136
    /// It returns the result of the function.
137
0
    pub async fn write(
138
0
        self: Arc<Self>,
139
0
        f: impl FnOnce(&mut Transaction) -> Result<()>,
140
0
    ) -> Result<()> {
141
0
        let mut txn = self.begin_with_mode(Mode::ReadWrite)?;
142
0
        f(&mut txn)?;
143
0
        txn.commit().await?;
144
145
0
        Ok(())
146
0
    }
147
148
    /// Closes the inner store
149
0
    pub async fn close(&self) -> Result<()> {
150
0
        if let Some(inner) = self.inner.as_ref() {
151
0
            inner.close().await?;
152
0
        }
153
154
0
        Ok(())
155
0
    }
156
157
    /// Compacts the store.
158
0
    pub async fn compact(&self) -> Result<()> {
159
0
        if let Some(inner) = self.inner.as_ref() {
160
0
            inner.compact().await?;
161
0
        }
162
163
0
        Ok(())
164
0
    }
165
}
166
167
impl Drop for Store {
168
21.3k
    fn drop(&mut self) {
169
21.3k
        if let Some(inner) = self.inner.take() {
170
            // Try to get existing runtime handle first
171
21.3k
            if let Ok(handle) = tokio::runtime::Handle::try_current() {
172
21.3k
                // We're in a runtime, spawn normally
173
21.3k
                handle.spawn(async move {
174
0
                    if let Err(err) = inner.close().await {
175
0
                        // TODO: use log/tracing instead of eprintln
176
0
                        eprintln!("Error closing store: {}", err);
177
0
                    }
178
21.3k
                });
179
21.3k
            } else {
180
0
                eprintln!("No runtime available for closing the store correctly");
181
0
            }
182
0
        }
183
21.3k
    }
184
}
185
186
pub(crate) struct TaskRunner {
187
    core: Arc<Core>,
188
    writes_rx: Receiver<Task>,
189
    stop_rx: Receiver<()>,
190
    // Done channel to signal completion
191
    done_tx: Arc<Sender<()>>,
192
}
193
194
impl TaskRunner {
195
21.4k
    fn new(
196
21.4k
        core: Arc<Core>,
197
21.4k
        writes_rx: Receiver<Task>,
198
21.4k
        stop_rx: Receiver<()>,
199
21.4k
    ) -> (Self, Receiver<()>) {
200
21.4k
        let (done_tx, done_rx) = bounded(1);
201
21.4k
        (
202
21.4k
            Self {
203
21.4k
                core,
204
21.4k
                writes_rx,
205
21.4k
                stop_rx,
206
21.4k
                done_tx: Arc::new(done_tx),
207
21.4k
            },
208
21.4k
            done_rx,
209
21.4k
        )
210
21.4k
    }
211
212
21.4k
    fn spawn(self) {
213
21.4k
        let done_tx = self.done_tx.clone();
214
21.4k
215
21.4k
        #[cfg(not(target_arch = "wasm32"))]
216
21.4k
        tokio::spawn(self.run(done_tx));
217
21.4k
218
21.4k
        #[cfg(target_arch = "wasm32")]
219
21.4k
        wasm_bindgen_futures::spawn_local(self.run(done_tx));
220
21.4k
    }
221
222
21.4k
    async fn run(self, done_tx: Arc<Sender<()>>) {
223
        loop {
224
23.8k
            select! {
225
23.8k
                req = self.writes_rx.recv().fuse() => {
226
21.0k
                    match req {
227
21.0k
                        Ok(task) => self.handle_task(task).await,
228
0
                        Err(_) => break,
229
                    }
230
                },
231
23.8k
                _ = self.stop_rx.recv().fuse() => {
232
                    // Consume all remaining items in writes_rx
233
0
                    while let Ok(task) = self.writes_rx.try_recv() {
234
0
                        self.handle_task(task).await;
235
                    }
236
0
                    break;
237
                },
238
            }
239
        }
240
241
        // Signal completion
242
0
        let _ = done_tx.send(()).await;
243
0
    }
244
245
21.0k
    async fn handle_task(&self, task: Task) {
246
21.0k
        let core = self.core.clone();
247
21.0k
        if let Err(err) = core.write_request(task).await {
248
0
            eprintln!("failed to write: {:?}", err);
249
21.0k
        }
250
21.0k
    }
251
}
252
253
/// Core of the key-value store.
254
pub struct Core {
255
    /// Index for store.
256
    pub(crate) indexer: RwLock<Indexer>,
257
    /// Options for store.
258
    pub(crate) opts: Options,
259
    /// Commit log for store.
260
    pub(crate) clog: Option<Arc<RwLock<Aol>>>,
261
    /// Manifest for store to track Store state.
262
    pub(crate) manifest: Option<RwLock<Aol>>,
263
    /// Transaction ID Oracle for store.
264
    pub(crate) oracle: Arc<Oracle>,
265
    /// Value cache for store.
266
    /// The assumption for this cache is that it should be useful for
267
    /// storing offsets that are frequently accessed (especially in
268
    /// the case of range scans)
269
    pub(crate) value_cache: Cache<(u64, u64), Bytes>,
270
    /// Flag to indicate if the store is closed.
271
    is_closed: AtomicBool,
272
    /// Channel to send write requests to the writer
273
    writes_tx: Sender<Task>,
274
}
275
/// A Task contains multiple entries to be written to the disk.
276
#[derive(Clone)]
277
pub struct Task {
278
    /// Entries contained in this task
279
    entries: Vec<Entry>,
280
    /// Use channel to notify that the value has been persisted to disk
281
    done: Option<Sender<Result<()>>>,
282
    /// Transaction ID
283
    tx_id: u64,
284
    /// Durability
285
    durability: Durability,
286
}
287
288
impl Core {
289
21.4k
    fn initialize_indexer() -> Indexer {
290
21.4k
        Indexer::new()
291
21.4k
    }
292
293
    // This function initializes the manifest log for the database to store all settings.
294
0
    pub(crate) fn initialize_manifest(dir: &Path) -> Result<Aol> {
295
0
        let manifest_subdir = dir.join("manifest");
296
0
        let mopts = LogOptions::default().with_file_extension("manifest".to_string());
297
0
        Aol::open(&manifest_subdir, &mopts).map_err(Error::from)
298
0
    }
299
300
    // This function initializes the commit log (clog) for the database.
301
0
    fn initialize_clog(opts: &Options) -> Result<Aol> {
302
0
        // It first constructs the path to the clog subdirectory within the database directory.
303
0
        let clog_subdir = opts.dir.join("clog");
304
0
305
0
        // Then it creates a LogOptions object to configure the clog.
306
0
        // The maximum file size for the clog is set to the max_segment_size option from the database options.
307
0
        // The file extension for the clog files is set to "clog".
308
0
        let copts = LogOptions::default()
309
0
            .with_max_file_size(opts.max_segment_size)
310
0
            .with_file_extension("clog".to_string());
311
0
312
0
        // It then attempts to restore any repair files in the clog subdirectory.
313
0
        // If this fails, the error is propagated up to the caller of the function.
314
0
        // This is required because the repair operation may have failed, and the
315
0
        // store should not be opened with existing repair files.
316
0
        //
317
0
        // Even though we are restoring the corrupted files, it will get repaired
318
0
        // during in the load_index function.
319
0
        restore_repair_files(clog_subdir.as_path().to_str().unwrap())?;
320
321
        // Finally, it attempts to open the clog with the specified options.
322
        // If this fails, the error is converted to a database error and then propagated up to the caller of the function.
323
0
        Aol::open(&clog_subdir, &copts).map_err(Error::from)
324
0
    }
325
326
    /// Creates a new Core with the given options.
327
    /// It initializes a new Indexer, opens or creates the manifest file,
328
    /// loads or creates metadata from the manifest file, updates the options with the loaded metadata,
329
    /// opens or creates the commit log file, loads the index from the commit log if it exists, creates
330
    /// and initializes an Oracle, creates and initializes a value cache, and constructs and returns
331
    /// the Core instance.
332
21.4k
    pub fn new(opts: Options, writes_tx: Sender<Task>) -> Result<Self> {
333
21.4k
        // Initialize a new Indexer with the provided options.
334
21.4k
        let mut indexer = Self::initialize_indexer();
335
21.4k
336
21.4k
        let mut manifest = None;
337
21.4k
        let mut clog = None;
338
21.4k
339
21.4k
        if opts.should_persist_data() {
340
            // Determine options for the manifest file and open or create it.
341
0
            manifest = Some(Self::initialize_manifest(&opts.dir)?);
342
343
            // Load options from the manifest file.
344
0
            let opts = Core::load_manifest(&opts, manifest.as_mut().unwrap())?;
345
346
            // Determine options for the commit log file and open or create it.
347
0
            clog = Some(Self::initialize_clog(&opts)?);
348
349
            // Restore the store from a compaction process if necessary.
350
0
            restore_from_compaction(&opts)?;
351
352
            // Load the index from the commit log if it exists.
353
0
            if clog.as_ref().unwrap().size()? > 0 {
354
0
                Core::load_index(&opts, clog.as_mut().unwrap(), &mut indexer)?;
355
0
            }
356
21.4k
        }
357
358
        // Create and initialize an Oracle.
359
21.4k
        let oracle = Oracle::new(&opts);
360
21.4k
        oracle.set_ts(indexer.version());
361
21.4k
362
21.4k
        // Create and initialize value cache.
363
21.4k
        let value_cache = Cache::new(opts.max_value_cache_size as usize);
364
21.4k
365
21.4k
        // Construct and return the Core instance.
366
21.4k
        Ok(Self {
367
21.4k
            indexer: RwLock::new(indexer),
368
21.4k
            opts,
369
21.4k
            manifest: manifest.map(RwLock::new),
370
21.4k
            clog: clog.map(|c| Arc::new(RwLock::new(c))),
371
21.4k
            oracle: Arc::new(oracle),
372
21.4k
            value_cache,
373
21.4k
            is_closed: AtomicBool::new(false),
374
21.4k
            writes_tx,
375
21.4k
        })
376
21.4k
    }
377
378
433k
    pub(crate) fn read_ts(&self) -> u64 {
379
433k
        self.oracle.read_ts()
380
433k
    }
381
382
    // The load_index function is responsible for loading the index from the log.
383
0
    fn load_index(opts: &Options, clog: &mut Aol, indexer: &mut Indexer) -> Result<u64> {
384
0
        // The directory where the log segments are stored is determined.
385
0
        let clog_subdir = opts.dir.join("clog");
386
0
387
0
        // The segments are read from the directory.
388
0
        let sr = SegmentRef::read_segments_from_directory(clog_subdir.as_path())
389
0
            .expect("should read segments");
390
391
        // A MultiSegmentReader is created to read from multiple segments.
392
0
        let reader = MultiSegmentReader::new(sr)?;
393
394
        // A Reader is created from the MultiSegmentReader with the maximum segment size and block size.
395
0
        let reader = Reader::new_from(reader);
396
0
397
0
        // A RecordReader is created from the Reader to read transactions.
398
0
        let mut tx_reader = RecordReader::new(reader);
399
0
400
0
        // A Record is created to hold the transactions. The maximum number of entries per transaction is specified.
401
0
        let mut tx = Record::new();
402
0
403
0
        // An Option is created to hold the segment ID and offset in case of corruption.
404
0
        let mut corruption_info: Option<(u64, u64)> = None;
405
0
406
0
        let mut num_entries = 0;
407
        // A loop is started to read transactions.
408
        loop {
409
            // The Record is reset for each iteration.
410
0
            tx.reset();
411
0
412
0
            // The RecordReader attempts to read into the Record.
413
0
            match tx_reader.read_into(&mut tx) {
414
                // If the read is successful, the entries are processed.
415
0
                Ok((segment_id, value_offset)) => {
416
0
                    Core::process_entry(&tx, opts, segment_id, value_offset, indexer)?;
417
0
                    num_entries += 1;
418
                }
419
420
                // If the end of the file is reached, the loop is broken.
421
0
                Err(Error::LogError(LogError::Eof)) => break,
422
423
                // If a corruption error is encountered, the segment ID and offset are stored and the loop is broken.
424
0
                Err(Error::LogError(LogError::Corruption(err))) => {
425
0
                    eprintln!("Corruption error: {:?}", err);
426
0
                    corruption_info = Some((err.segment_id, err.offset));
427
0
                    break;
428
                }
429
430
                // If any other error is encountered, it is returned immediately.
431
0
                Err(err) => return Err(err),
432
            };
433
        }
434
435
        // If a corruption was encountered, the last segment is repaired using the stored segment ID and offset.
436
        // The reason why the last segment is repaired is because the last segment is the one that was being actively
437
        // written to and acts like the active WAL file. Any corruption in the previous immutable segments is pure
438
        // corruption of the data and should be handled by the user.
439
0
        if let Some((corrupted_segment_id, corrupted_offset)) = corruption_info {
440
0
            eprintln!(
441
0
                "Repairing corrupted segment with id: {} and offset: {}",
442
0
                corrupted_segment_id, corrupted_offset
443
0
            );
444
0
            repair_last_corrupted_segment(clog, corrupted_segment_id, corrupted_offset)?;
445
0
        }
446
447
0
        Ok(num_entries)
448
0
    }
449
450
0
    fn process_entry(
451
0
        entry: &Record,
452
0
        opts: &Options,
453
0
        segment_id: u64,
454
0
        value_offset: u64,
455
0
        indexer: &mut Indexer,
456
0
    ) -> Result<()> {
457
0
        if entry.metadata.as_ref().is_some_and(|metadata| {
458
0
            metadata.is_deleted() || metadata.is_tombstone() && !opts.enable_versions
459
0
        }) {
460
0
            indexer.delete(&mut entry.key[..].into());
461
0
        } else {
462
0
            let index_value = IndexValue::new_disk(
463
0
                segment_id,
464
0
                value_offset,
465
0
                entry.metadata.clone(),
466
0
                &entry.value,
467
0
                opts.max_value_threshold,
468
0
            );
469
0
470
0
            if opts.enable_versions {
471
0
                indexer.insert(
472
0
                    &mut entry.key[..].into(),
473
0
                    index_value,
474
0
                    entry.id,
475
0
                    entry.ts,
476
0
                    false,
477
0
                )?;
478
            } else {
479
0
                indexer.insert_or_replace(
480
0
                    &mut entry.key[..].into(),
481
0
                    index_value,
482
0
                    entry.id,
483
0
                    entry.ts,
484
0
                    false,
485
0
                )?;
486
            }
487
        }
488
489
0
        Ok(())
490
0
    }
491
492
0
    fn load_manifest(current_opts: &Options, manifest: &mut Aol) -> Result<Options> {
493
        // Load existing manifests if any, else create a new one
494
0
        let existing_manifest = if manifest.size()? > 0 {
495
0
            Core::read_manifest(&current_opts.dir)?
496
        } else {
497
0
            Manifest::new()
498
        };
499
500
        // Validate the current options against the existing manifest's options
501
0
        Core::validate_options(current_opts)?;
502
503
        // Check if the current options are already the last option in the manifest
504
0
        if existing_manifest.extract_last_option().as_ref() == Some(current_opts) {
505
0
            return Ok(current_opts.clone());
506
0
        }
507
0
508
0
        // If not, create a changeset with an update operation for the current options
509
0
        let changeset = Manifest::with_update_option_change(current_opts);
510
511
        // Serialize the changeset and append it to the manifest
512
0
        let buf = changeset.serialize()?;
513
0
        manifest.append(&buf)?;
514
515
0
        Ok(current_opts.clone())
516
0
    }
517
518
0
    fn validate_options(opts: &Options) -> Result<()> {
519
0
        if opts.max_compaction_segment_size < opts.max_segment_size {
520
0
            return Err(Error::CompactionSegmentSizeTooSmall);
521
0
        }
522
0
523
0
        Ok(())
524
0
    }
525
526
    /// Loads the latest options from the manifest log.
527
0
    pub(crate) fn read_manifest(dir: &Path) -> Result<Manifest> {
528
0
        let manifest_subdir = dir.join("manifest");
529
0
        let sr = SegmentRef::read_segments_from_directory(manifest_subdir.as_path())
530
0
            .expect("should read segments");
531
0
        let reader = MultiSegmentReader::new(sr)?;
532
0
        let mut reader = Reader::new_from(reader);
533
0
534
0
        let mut manifests = Manifest::new(); // Initialize with an empty Vec
535
536
        loop {
537
            // Read the next transaction record from the log.
538
0
            let mut len_buf = [0; 4];
539
0
            let res = reader.read(&mut len_buf); // Read 4 bytes for the length
540
0
            if let Err(e) = res {
541
0
                if let Error::LogError(LogError::Eof) = e {
542
0
                    break;
543
                } else {
544
0
                    return Err(e);
545
                }
546
0
            }
547
0
548
0
            let len = u32::from_be_bytes(len_buf) as usize; // Convert bytes to length
549
0
            let mut md_bytes = vec![0u8; len];
550
0
            reader.read(&mut md_bytes)?; // Read the actual metadata
551
0
            let manifest = Manifest::deserialize_revisioned(&mut md_bytes.as_slice())?;
552
0
            manifests.changes.extend(manifest.changes);
553
        }
554
555
0
        Ok(manifests)
556
0
    }
557
558
0
    fn is_closed(&self) -> bool {
559
0
        self.is_closed.load(Ordering::Relaxed)
560
0
    }
561
562
0
    pub(crate) fn close(&self) -> Result<()> {
563
0
        if self.is_closed() {
564
0
            return Ok(());
565
0
        }
566
567
        // Close the commit log if it exists
568
0
        if let Some(clog) = &self.clog {
569
0
            clog.write().close()?;
570
0
        }
571
572
        // Close the manifest if it exists
573
0
        if let Some(manifest) = &self.manifest {
574
0
            manifest.write().close()?;
575
0
        }
576
0
        self.is_closed.store(true, Ordering::Relaxed);
577
0
578
0
        Ok(())
579
0
    }
580
581
21.0k
    pub(crate) async fn write_request(&self, req: Task) -> Result<()> {
582
21.0k
        let done = req.done.clone();
583
21.0k
584
21.0k
        let result = self.write_entries(req);
585
586
21.0k
        if let Some(done) = done {
587
21.0k
            done.send(result.clone()).await?;
588
0
        }
589
590
21.0k
        result
591
21.0k
    }
592
593
21.0k
    fn write_entries(&self, req: Task) -> Result<()> {
594
21.0k
        if req.entries.is_empty() {
595
0
            return Ok(());
596
21.0k
        }
597
21.0k
598
21.0k
        if self.opts.should_persist_data() {
599
0
            self.write_entries_to_disk(req)
600
        } else {
601
21.0k
            self.write_index_in_memory(&req)
602
        }
603
21.0k
    }
604
605
0
    fn write_entries_to_disk(&self, req: Task) -> Result<()> {
606
0
        // TODO: This buf can be reused by defining on core level
607
0
        let mut buf = BytesMut::new();
608
0
        let mut values_offsets = HashMap::with_capacity(req.entries.len());
609
0
610
0
        encode_entries(&req.entries, req.tx_id, &mut buf, &mut values_offsets);
611
612
0
        let (segment_id, current_offset) = self.append_log(&buf, req.durability)?;
613
614
0
        values_offsets.iter_mut().for_each(|(_, val_off)| {
615
0
            *val_off += current_offset;
616
0
        });
617
0
618
0
        self.write_entries_to_index(&req, |entry| {
619
0
            let offset = *values_offsets.get(&entry.key).unwrap();
620
0
            IndexValue::new_disk(
621
0
                segment_id,
622
0
                offset,
623
0
                entry.metadata.clone(),
624
0
                &entry.value,
625
0
                self.opts.max_value_threshold,
626
0
            )
627
0
        })
628
0
    }
629
630
0
    fn append_log(&self, tx_record: &BytesMut, durability: Durability) -> Result<(u64, u64)> {
631
0
        let mut clog = self.clog.as_ref().unwrap().write();
632
633
0
        let (segment_id, offset) = match durability {
634
            Durability::Immediate => {
635
                // Immediate durability means that the transaction is made to
636
                // fsync the data to disk before returning.
637
0
                let (segment_id, offset, _) = clog.append(tx_record)?;
638
0
                clog.sync()?;
639
0
                (segment_id, offset)
640
            }
641
            Durability::Eventual => {
642
                // Eventual durability means that the transaction is made to
643
                // write to disk using the write_all method. But it does not
644
                // fsync the data to disk before returning.
645
0
                let (segment_id, offset, _) = clog.append(tx_record)?;
646
0
                (segment_id, offset)
647
            }
648
        };
649
650
0
        Ok((segment_id, offset))
651
0
    }
652
653
21.0k
    fn write_entries_to_index<F>(&self, task: &Task, encode_entry: F) -> Result<()>
654
21.0k
    where
655
21.0k
        F: Fn(&Entry) -> IndexValue,
656
21.0k
    {
657
21.0k
        let mut index = self.indexer.write();
658
659
455k
        for entry in &task.entries {
660
            // If the entry is marked as deleted or a tombstone
661
            // with the replace flag set, delete it.
662
434k
            if let Some(metadata) = entry.metadata.as_ref() {
663
32.7k
                if metadata.is_deleted() || metadata.is_tombstone() && entry.replace {
664
32.7k
                    index.delete(&mut entry.key[..].into());
665
32.7k
                    continue;
666
0
                }
667
402k
            }
668
669
402k
            let index_value = encode_entry(entry);
670
402k
671
402k
            if !entry.replace {
672
0
                index.insert(
673
0
                    &mut entry.key[..].into(),
674
0
                    index_value,
675
0
                    task.tx_id,
676
0
                    entry.ts,
677
0
                    true,
678
0
                )?;
679
            } else {
680
402k
                index.insert_or_replace(
681
402k
                    &mut entry.key[..].into(),
682
402k
                    index_value,
683
402k
                    task.tx_id,
684
402k
                    entry.ts,
685
402k
                    true,
686
402k
                )?;
687
            }
688
        }
689
690
21.0k
        Ok(())
691
21.0k
    }
Unexecuted instantiation: <surrealkv::store::Core>::write_entries_to_index::<<surrealkv::store::Core>::write_entries_to_disk::{closure#1}>
<surrealkv::store::Core>::write_entries_to_index::<<surrealkv::store::Core>::write_index_in_memory::{closure#0}>
Line
Count
Source
653
21.0k
    fn write_entries_to_index<F>(&self, task: &Task, encode_entry: F) -> Result<()>
654
21.0k
    where
655
21.0k
        F: Fn(&Entry) -> IndexValue,
656
21.0k
    {
657
21.0k
        let mut index = self.indexer.write();
658
659
455k
        for entry in &task.entries {
660
            // If the entry is marked as deleted or a tombstone
661
            // with the replace flag set, delete it.
662
434k
            if let Some(metadata) = entry.metadata.as_ref() {
663
32.7k
                if metadata.is_deleted() || metadata.is_tombstone() && entry.replace {
664
32.7k
                    index.delete(&mut entry.key[..].into());
665
32.7k
                    continue;
666
0
                }
667
402k
            }
668
669
402k
            let index_value = encode_entry(entry);
670
402k
671
402k
            if !entry.replace {
672
0
                index.insert(
673
0
                    &mut entry.key[..].into(),
674
0
                    index_value,
675
0
                    task.tx_id,
676
0
                    entry.ts,
677
0
                    true,
678
0
                )?;
679
            } else {
680
402k
                index.insert_or_replace(
681
402k
                    &mut entry.key[..].into(),
682
402k
                    index_value,
683
402k
                    task.tx_id,
684
402k
                    entry.ts,
685
402k
                    true,
686
402k
                )?;
687
            }
688
        }
689
690
21.0k
        Ok(())
691
21.0k
    }
692
693
21.0k
    fn write_index_in_memory(&self, task: &Task) -> Result<()> {
694
402k
        self.write_entries_to_index(task, |entry| {
695
402k
            IndexValue::new_mem(entry.metadata.clone(), entry.value.clone())
696
402k
        })
697
21.0k
    }
698
699
21.0k
    pub(crate) async fn send_to_write_channel(
700
21.0k
        &self,
701
21.0k
        entries: Vec<Entry>,
702
21.0k
        tx_id: u64,
703
21.0k
        durability: Durability,
704
21.0k
    ) -> Result<Receiver<Result<()>>> {
705
21.0k
        let (tx, rx) = bounded(1);
706
21.0k
        let req = Task {
707
21.0k
            entries,
708
21.0k
            done: Some(tx),
709
21.0k
            tx_id,
710
21.0k
            durability,
711
21.0k
        };
712
21.0k
        self.writes_tx.send(req).await?;
713
21.0k
        Ok(rx)
714
21.0k
    }
<surrealkv::store::Core>::send_to_write_channel::{closure#0}
Line
Count
Source
704
21.0k
    ) -> Result<Receiver<Result<()>>> {
705
21.0k
        let (tx, rx) = bounded(1);
706
21.0k
        let req = Task {
707
21.0k
            entries,
708
21.0k
            done: Some(tx),
709
21.0k
            tx_id,
710
21.0k
            durability,
711
21.0k
        };
712
21.0k
        self.writes_tx.send(req).await?;
713
21.0k
        Ok(rx)
714
21.0k
    }
Unexecuted instantiation: <surrealkv::store::Core>::send_to_write_channel::{closure#0}
715
716
    /// Resolves the value from the given offset in the commit log.
717
    /// If the offset exists in the value cache, it returns the cached value.
718
    /// Otherwise, it reads the value from the commit log, caches it, and returns it.
719
0
    pub(crate) fn resolve_from_offset(
720
0
        &self,
721
0
        segment_id: u64,
722
0
        value_offset: u64,
723
0
        value_len: usize,
724
0
    ) -> Result<Vec<u8>> {
725
0
        // Attempt to return the cached value if it exists
726
0
        let cache_key = (segment_id, value_offset);
727
728
0
        if let Some(value) = self.value_cache.get(&cache_key) {
729
0
            return Ok(value.to_vec());
730
0
        }
731
0
732
0
        // If the value is not in the cache, read it from the commit log
733
0
        let mut buf = vec![0; value_len];
734
0
        let clog = self.clog.as_ref().unwrap().read();
735
0
        clog.read_at(&mut buf, segment_id, value_offset)?;
736
737
        // Cache the newly read value for future use
738
0
        self.value_cache.insert(cache_key, Bytes::from(buf.clone()));
739
0
740
0
        Ok(buf)
741
0
    }
742
}
743
744
#[cfg(test)]
745
mod tests {
746
    use rand::prelude::SliceRandom;
747
    use rand::Rng;
748
749
    use std::sync::Arc;
750
751
    use crate::log::Error as LogError;
752
    use crate::option::Options;
753
    use crate::store::Core;
754
    use crate::store::{Store, Task, TaskRunner};
755
    use crate::transaction::Durability;
756
    use crate::{Error, IsolationLevel};
757
758
    use async_channel::bounded;
759
    use std::sync::atomic::{AtomicU64, Ordering};
760
761
    use bytes::Bytes;
762
    use tempdir::TempDir;
763
764
    fn create_temp_directory() -> TempDir {
765
        TempDir::new("test").unwrap()
766
    }
767
768
    #[tokio::test]
769
    async fn bulk_insert_and_reload() {
770
        // Create a temporary directory for testing
771
        let temp_dir = create_temp_directory();
772
773
        // Create store options with the test directory
774
        let mut opts = Options::new();
775
        opts.dir = temp_dir.path().to_path_buf();
776
777
        // Create a new store instance with VariableKey as the key type
778
        let store = Store::new(opts).expect("should create store");
779
780
        // Number of keys to generate
781
        let num_keys = 10000;
782
783
        // Create a vector to store the generated keys
784
        let mut keys: Vec<Bytes> = Vec::new();
785
786
        for (counter, _) in (1..=num_keys).enumerate() {
787
            // Convert the counter to Bytes
788
            let key_bytes = Bytes::from(counter.to_le_bytes().to_vec());
789
790
            // Add the key to the vector
791
            keys.push(key_bytes);
792
        }
793
794
        let default_value = Bytes::from("default_value".to_string());
795
796
        // Write the keys to the store
797
        for key in keys.iter() {
798
            // Start a new write transaction
799
            let mut txn = store.begin().unwrap();
800
            txn.set(key, &default_value).unwrap();
801
            txn.commit().await.unwrap();
802
        }
803
804
        // Read the keys to the store
805
        for key in keys.iter() {
806
            // Start a new read transaction
807
            let mut txn = store.begin().unwrap();
808
            let val = txn.get(key).unwrap().unwrap();
809
            // Assert that the value retrieved in txn matches default_value
810
            assert_eq!(val, default_value.as_ref());
811
        }
812
813
        // Drop the store to simulate closing it
814
        store.close().await.unwrap();
815
816
        // Create a new store instance but with values read from disk
817
        let mut opts = Options::new();
818
        opts.dir = temp_dir.path().to_path_buf();
819
        // This is to ensure values are read from disk
820
        opts.max_value_threshold = 0;
821
822
        let store = Store::new(opts).expect("should create store");
823
824
        // Read the keys to the store
825
        for key in keys.iter() {
826
            // Start a new read transaction
827
            let mut txn = store.begin().unwrap();
828
            let val = txn.get(key).unwrap().unwrap();
829
            // Assert that the value retrieved in txn matches default_value
830
            assert_eq!(val, default_value.as_ref());
831
        }
832
    }
833
834
    #[tokio::test]
835
    async fn store_open_and_update_options() {
836
        // // Create a temporary directory for testing
837
        let temp_dir = create_temp_directory();
838
839
        // Create store options with the test directory
840
        let mut opts = Options::new();
841
        opts.dir = temp_dir.path().to_path_buf();
842
843
        // Create a new store instance with VariableKey as the key type
844
        let store = Store::new(opts.clone()).expect("should create store");
845
        store.close().await.unwrap();
846
847
        // Update the options and use them to update the new store instance
848
        let mut opts = opts.clone();
849
        opts.max_value_cache_size = 5;
850
851
        let store = Store::new(opts.clone()).expect("should create store");
852
        let store_opts = store.inner.as_ref().unwrap().core.opts.clone();
853
        assert_eq!(store_opts, opts);
854
    }
855
856
    #[tokio::test]
857
    async fn insert_close_reopen() {
858
        // Create a temporary directory for testing
859
        let temp_dir = create_temp_directory();
860
861
        // Create store options with the test directory
862
        let mut opts = Options::new();
863
        opts.dir = temp_dir.path().to_path_buf();
864
865
        let num_ops = 10;
866
867
        for i in 1..=10 {
868
            // (Re)open the store
869
            let store = Store::new(opts.clone()).expect("should create store");
870
871
            // Append num_ops items to the store
872
            for j in 0..num_ops {
873
                let id = (i - 1) * num_ops + j;
874
                let key = format!("key{}", id);
875
                let value = format!("value{}", id);
876
                let mut txn = store.begin().unwrap();
877
                txn.set(key.as_bytes(), value.as_bytes()).unwrap();
878
                txn.commit().await.unwrap();
879
            }
880
881
            // Test that the items are still in the store
882
            for j in 0..(num_ops * i) {
883
                let key = format!("key{}", j);
884
                let value = format!("value{}", j);
885
                let value = value.into_bytes();
886
                let mut txn = store.begin().unwrap();
887
                let val = txn.get(key.as_bytes()).unwrap().unwrap();
888
889
                assert_eq!(val, value);
890
            }
891
892
            // Close the store again
893
            store.close().await.unwrap();
894
        }
895
    }
896
897
    #[tokio::test]
898
    async fn clone_store() {
899
        // Create a temporary directory for testing
900
        let temp_dir = create_temp_directory();
901
902
        // Create store options with the test directory
903
        let mut opts = Options::new();
904
        opts.dir = temp_dir.path().to_path_buf();
905
906
        // Create a new store instance with VariableKey as the key type
907
        let store = Arc::new(Store::new(opts).expect("should create store"));
908
909
        // Number of keys to generate
910
        let num_keys = 100;
911
912
        // Create a vector to store the generated keys
913
        let mut keys: Vec<Bytes> = Vec::new();
914
915
        for (counter, _) in (1..=num_keys).enumerate() {
916
            // Convert the counter to Bytes
917
            let key_bytes = Bytes::from(counter.to_le_bytes().to_vec());
918
919
            // Add the key to the vector
920
            keys.push(key_bytes);
921
        }
922
923
        let default_value = Bytes::from("default_value".to_string());
924
        let store1 = store.clone();
925
926
        // Write the keys to the store
927
        for key in keys.iter() {
928
            // Start a new write transaction
929
            let mut txn = store1.begin().unwrap();
930
            txn.set(key, &default_value).unwrap();
931
            txn.commit().await.unwrap();
932
        }
933
    }
934
935
    async fn concurrent_task(store: Arc<Store>, key: &[u8], value: &[u8]) {
936
        let mut txn = store.begin().unwrap();
937
        txn.set(key, value).unwrap();
938
        txn.commit().await.unwrap();
939
    }
940
941
    #[tokio::test(flavor = "multi_thread")]
942
    async fn concurrent_test() {
943
        let mut opts = Options::new();
944
        opts.dir = create_temp_directory().path().to_path_buf();
945
        let db = Arc::new(Store::new(opts).expect("should create store"));
946
947
        let key1 = b"key1";
948
        let value1 = b"value1";
949
        let key2 = b"key2";
950
        let value2 = b"value2";
951
952
        let task1 = tokio::spawn(concurrent_task(db.clone(), key1, value1));
953
        let task2 = tokio::spawn(concurrent_task(db.clone(), key2, value2));
954
955
        let _ = tokio::try_join!(task1, task2).expect("Tasks failed");
956
    }
957
958
    #[tokio::test]
959
    async fn insert_then_read_then_delete_then_read() {
960
        // Create a temporary directory for testing
961
        let temp_dir = create_temp_directory();
962
963
        // Create store options with the test directory
964
        let mut opts = Options::new();
965
        opts.dir = temp_dir.path().to_path_buf();
966
        opts.max_value_threshold = 0;
967
        opts.max_value_cache_size = 0;
968
969
        // Create a new store instance with VariableKey as the key type
970
        let store = Store::new(opts).expect("should create store");
971
972
        // Number of keys to generate
973
        let num_keys = 5000;
974
975
        // Create a vector to store the generated keys
976
        let mut keys: Vec<Bytes> = Vec::new();
977
978
        for (counter, _) in (1..=num_keys).enumerate() {
979
            // Convert the counter to Bytes
980
            let key_bytes = Bytes::from(counter.to_le_bytes().to_vec());
981
982
            // Add the key to the vector
983
            keys.push(key_bytes);
984
        }
985
986
        let default_value = Bytes::from("default_value".to_string());
987
988
        // Write the keys to the store
989
        for key in keys.iter() {
990
            let mut txn = store.begin().unwrap();
991
            txn.set(key, &default_value).unwrap();
992
            txn.commit().await.unwrap();
993
        }
994
995
        // Read the keys from the store
996
        for key in keys.iter() {
997
            let mut txn = store.begin().unwrap();
998
            let val = txn.get(key).unwrap().unwrap();
999
            assert_eq!(val, default_value.as_ref());
1000
        }
1001
1002
        // Delete the keys from the store
1003
        for key in keys.iter() {
1004
            let mut txn = store.begin().unwrap();
1005
            txn.delete(key).unwrap();
1006
            txn.commit().await.unwrap();
1007
        }
1008
1009
        // ReWrite the keys to the store
1010
        for key in keys.iter() {
1011
            let mut txn = store.begin().unwrap();
1012
            txn.set(key, &default_value).unwrap();
1013
            txn.commit().await.unwrap();
1014
        }
1015
1016
        // Read the keys from the store
1017
        for key in keys.iter() {
1018
            let mut txn = store.begin().unwrap();
1019
            let val = txn.get(key).unwrap().unwrap();
1020
            assert_eq!(val, default_value.as_ref());
1021
        }
1022
    }
1023
1024
    #[tokio::test]
1025
    async fn records_not_lost_when_store_is_closed() {
1026
        // Create a temporary directory for testing
1027
        let temp_dir = create_temp_directory();
1028
1029
        // Create store options with the test directory
1030
        let mut opts = Options::new();
1031
        opts.dir = temp_dir.path().to_path_buf();
1032
1033
        let key = "key".as_bytes();
1034
        let value = "value".as_bytes();
1035
1036
        {
1037
            // Create a new store instance
1038
            let store = Store::new(opts.clone()).expect("should create store");
1039
1040
            // Insert an item into the store
1041
            let mut txn = store.begin().unwrap();
1042
            txn.set(key, value).unwrap();
1043
            txn.commit().await.unwrap();
1044
1045
            drop(txn);
1046
            store.close().await.unwrap();
1047
        }
1048
        {
1049
            // Reopen the store
1050
            let store = Store::new(opts.clone()).expect("should create store");
1051
1052
            // Test that the item is still in the store
1053
            let mut txn = store.begin().unwrap();
1054
            let val = txn.get(key).unwrap();
1055
1056
            assert_eq!(val.unwrap(), value);
1057
        }
1058
    }
1059
1060
    async fn test_records_when_store_is_dropped(
1061
        durability: Durability,
1062
        wait: bool,
1063
        should_exist: bool,
1064
    ) {
1065
        // Create a temporary directory for testing
1066
        let temp_dir = create_temp_directory();
1067
1068
        // Create store options with the test directory
1069
        let mut opts = Options::new();
1070
        opts.dir = temp_dir.path().to_path_buf();
1071
1072
        let key = "key".as_bytes();
1073
        let value = "value".as_bytes();
1074
1075
        {
1076
            // Create a new store instance
1077
            let store = Store::new(opts.clone()).expect("should create store");
1078
1079
            // Insert an item into the store
1080
            let mut txn = store.begin().unwrap();
1081
            txn.set_durability(durability);
1082
            txn.set(key, value).unwrap();
1083
            txn.commit().await.unwrap();
1084
1085
            drop(txn);
1086
            drop(store);
1087
        }
1088
1089
        if wait {
1090
            // Give some room for the store to close asynchronously
1091
            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1092
        }
1093
1094
        {
1095
            // Reopen the store
1096
            let store = Store::new(opts.clone()).expect("should create store");
1097
1098
            // Test that the item is still in the store
1099
            let mut txn = store.begin().unwrap();
1100
            let val = txn.get(key).unwrap();
1101
1102
            if should_exist {
1103
                assert_eq!(val.unwrap(), value);
1104
            } else {
1105
                assert!(val.is_none());
1106
            }
1107
        }
1108
    }
1109
1110
    #[tokio::test]
1111
    async fn eventual_durability_records_persist_after_drop() {
1112
        test_records_when_store_is_dropped(Durability::Eventual, true, true).await;
1113
    }
1114
1115
    #[tokio::test]
1116
    async fn eventual_durability_records_persist_without_wait() {
1117
        test_records_when_store_is_dropped(Durability::Eventual, false, true).await;
1118
    }
1119
1120
    #[tokio::test]
1121
    async fn strong_durability_records_persist() {
1122
        test_records_when_store_is_dropped(Durability::Immediate, true, true).await;
1123
        test_records_when_store_is_dropped(Durability::Immediate, false, true).await;
1124
    }
1125
1126
    #[tokio::test]
1127
    async fn store_closed_twice_without_error() {
1128
        // Create a temporary directory for testing
1129
        let temp_dir = create_temp_directory();
1130
1131
        // Create store options with the test directory
1132
        let mut opts = Options::new();
1133
        opts.dir = temp_dir.path().to_path_buf();
1134
1135
        // Create a new store instance
1136
        let store = Store::new(opts.clone()).expect("should create store");
1137
1138
        // Close the store once
1139
        assert!(
1140
            store.close().await.is_ok(),
1141
            "should close store without error"
1142
        );
1143
1144
        // Close the store a second time
1145
        assert!(
1146
            store.close().await.is_ok(),
1147
            "should close store without error"
1148
        );
1149
    }
1150
1151
    /// Returns pairs of key, value
1152
    fn gen_data(count: usize, key_size: usize, value_size: usize) -> Vec<(Vec<u8>, Vec<u8>)> {
1153
        let mut pairs = vec![];
1154
1155
        for _ in 0..count {
1156
            let key: Vec<u8> = (0..key_size).map(|_| rand::thread_rng().gen()).collect();
1157
            let value: Vec<u8> = (0..value_size).map(|_| rand::thread_rng().gen()).collect();
1158
            pairs.push((key, value));
1159
        }
1160
1161
        pairs
1162
    }
1163
1164
    async fn test_durability(durability: Durability, wait_enabled: bool) {
1165
        // Create a temporary directory for testing
1166
        let temp_dir = create_temp_directory();
1167
1168
        // Create store options with the test directory
1169
        let mut opts = Options::new();
1170
        opts.dir = temp_dir.path().to_path_buf();
1171
1172
        let num_elements = 100;
1173
        let pairs = gen_data(num_elements, 16, 20);
1174
1175
        {
1176
            // Create a new store instance
1177
            let store = Store::new(opts.clone()).expect("should create store");
1178
1179
            let mut txn = store.begin().unwrap();
1180
            txn.set_durability(durability);
1181
1182
            {
1183
                for i in 0..num_elements {
1184
                    let (key, value) = &pairs[i % pairs.len()];
1185
                    txn.set(key.as_slice(), value.as_slice()).unwrap();
1186
                }
1187
            }
1188
            txn.commit().await.unwrap();
1189
1190
            drop(store);
1191
        }
1192
1193
        // Wait for a while to let close be called on drop as it is executed asynchronously
1194
        if wait_enabled {
1195
            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1196
        }
1197
1198
        let store = Store::new(opts.clone()).expect("should create store");
1199
        let mut txn = store.begin().unwrap();
1200
1201
        let mut key_order: Vec<usize> = (0..num_elements).collect();
1202
        key_order.shuffle(&mut rand::thread_rng());
1203
1204
        {
1205
            for i in &key_order {
1206
                let (key, value) = &pairs[*i % pairs.len()];
1207
                let val = txn.get(key.as_slice()).unwrap().unwrap();
1208
                assert_eq!(&val, value);
1209
            }
1210
        }
1211
    }
1212
1213
    #[tokio::test]
1214
    async fn eventual_durability() {
1215
        test_durability(Durability::Eventual, false).await;
1216
    }
1217
1218
    #[tokio::test]
1219
    async fn immediate_durability() {
1220
        test_durability(Durability::Immediate, false).await;
1221
    }
1222
1223
    #[tokio::test]
1224
    async fn store_without_persistance() {
1225
        // Create a temporary directory for testing
1226
        let temp_dir = create_temp_directory();
1227
1228
        // Create store options with the test directory
1229
        let mut opts = Options::new();
1230
        opts.dir = temp_dir.path().to_path_buf();
1231
        opts.disk_persistence = false;
1232
1233
        // Create a new store instance with VariableKey as the key type
1234
        let store = Store::new(opts.clone()).expect("should create store");
1235
1236
        // Number of keys to generate
1237
        let num_keys = 10000;
1238
1239
        // Create a vector to store the generated keys
1240
        let mut keys: Vec<Bytes> = Vec::new();
1241
1242
        for (counter, _) in (1..=num_keys).enumerate() {
1243
            // Convert the counter to Bytes
1244
            let key_bytes = Bytes::from(counter.to_le_bytes().to_vec());
1245
1246
            // Add the key to the vector
1247
            keys.push(key_bytes);
1248
        }
1249
1250
        let default_value = Bytes::from("default_value".to_string());
1251
1252
        // Write the keys to the store
1253
        for key in keys.iter() {
1254
            // Start a new write transaction
1255
            let mut txn = store.begin().unwrap();
1256
            txn.set(key, &default_value).unwrap();
1257
            txn.commit().await.unwrap();
1258
        }
1259
1260
        // Read the keys to the store
1261
        for key in keys.iter() {
1262
            // Start a new read transaction
1263
            let mut txn = store.begin().unwrap();
1264
            let val = txn.get(key).unwrap().unwrap();
1265
            // Assert that the value retrieved in txn matches default_value
1266
            assert_eq!(val, default_value.as_ref());
1267
        }
1268
1269
        // Drop the store to simulate closing it
1270
        store.close().await.unwrap();
1271
1272
        let store = Store::new(opts).expect("should create store");
1273
1274
        // No keys should be found in the store
1275
        for key in keys.iter() {
1276
            // Start a new read transaction
1277
            let mut txn = store.begin().unwrap();
1278
            assert!(txn.get(key).unwrap().is_none());
1279
        }
1280
    }
1281
1282
    #[tokio::test]
1283
    async fn basic_compaction1() {
1284
        // Create a temporary directory for testing
1285
        let temp_dir = create_temp_directory();
1286
1287
        // Create store options with the test directory
1288
        let mut opts = Options::new();
1289
        opts.dir = temp_dir.path().to_path_buf();
1290
        opts.max_value_threshold = 0;
1291
        opts.max_value_cache_size = 0;
1292
1293
        // Create a new store instance with VariableKey as the key type
1294
        let store = Store::new(opts.clone()).expect("should create store");
1295
1296
        // Number of keys to generate and write
1297
        let num_keys_to_write = 1;
1298
1299
        // Create a vector to store the generated keys
1300
        let mut keys: Vec<Bytes> = Vec::new();
1301
1302
        for counter in 1usize..=num_keys_to_write {
1303
            // Convert the counter to Bytes
1304
            let key_bytes = Bytes::from(counter.to_le_bytes().to_vec());
1305
1306
            // Add the key to the vector
1307
            keys.push(key_bytes);
1308
        }
1309
1310
        let default_value = Bytes::from("default_value".to_string());
1311
        let default_value2 = Bytes::from("default_value2".to_string());
1312
1313
        // Write the keys to the store
1314
        for key in keys.iter() {
1315
            let mut txn = store.begin().unwrap();
1316
            txn.set(key, &default_value).unwrap();
1317
            txn.commit().await.unwrap();
1318
        }
1319
1320
        for key in keys.iter() {
1321
            let mut txn = store.begin().unwrap();
1322
            txn.set(key, &default_value2).unwrap();
1323
            txn.commit().await.unwrap();
1324
        }
1325
1326
        let key_bytes = Bytes::from(2usize.to_le_bytes().to_vec());
1327
        let mut txn = store.begin().unwrap();
1328
        txn.set(&key_bytes, &default_value2).unwrap();
1329
        txn.commit().await.unwrap();
1330
1331
        // Delete the first 5 keys from the store
1332
        for key in keys.iter() {
1333
            let mut txn = store.begin().unwrap();
1334
            txn.delete(key).unwrap();
1335
            txn.commit().await.unwrap();
1336
        }
1337
1338
        store.inner.as_ref().unwrap().compact().await.unwrap();
1339
        store.close().await.unwrap();
1340
1341
        let reopened_store = Store::new(opts).expect("should reopen store");
1342
        for key in keys.iter() {
1343
            let mut txn = reopened_store.begin().unwrap();
1344
            assert!(txn.get(key).unwrap().is_none());
1345
        }
1346
    }
1347
1348
    #[tokio::test]
1349
    async fn test_store_with_varying_segment_sizes() {
1350
        let temp_dir = create_temp_directory();
1351
        let mut opts = Options::new();
1352
        opts.dir = temp_dir.path().to_path_buf();
1353
        opts.max_segment_size = 84; // Initial max segment size
1354
1355
        let k1 = Bytes::from("k1");
1356
        let k2 = Bytes::from("k2");
1357
        let k3 = Bytes::from("k3");
1358
        let k4 = Bytes::from("k4");
1359
        let val = Bytes::from("val");
1360
1361
        // Step 1: Open store with initial max segment size and commit a record
1362
        let store = Store::new(opts.clone()).expect("should create store");
1363
        {
1364
            let mut txn = store.begin().unwrap();
1365
            txn.set(&k1.clone(), &val.clone()).unwrap();
1366
            txn.commit().await.unwrap();
1367
        }
1368
        store.close().await.expect("should close store");
1369
1370
        // Step 2: Reopen store with a smaller max segment size and append a record
1371
        opts.max_segment_size = 37; // Smaller max segment size
1372
        let store = Store::new(opts.clone()).expect("should create store");
1373
        {
1374
            let mut txn = store.begin().unwrap();
1375
            txn.set(&k2.clone(), &val).unwrap();
1376
            txn.commit().await.unwrap();
1377
1378
            // Verify the first record
1379
            let mut txn = store.begin().unwrap();
1380
            let val = txn.get(&k1).unwrap().unwrap();
1381
            assert_eq!(val, val);
1382
1383
            // Verify the second record
1384
            let val2 = txn.get(&k2).unwrap().unwrap();
1385
            assert_eq!(val2, val);
1386
        }
1387
        store.close().await.expect("should close store");
1388
1389
        // Step 3: Reopen store with a larger max segment size and append a record
1390
        opts.max_segment_size = 121; // Larger max segment size
1391
        let store = Store::new(opts.clone()).expect("should create store");
1392
        {
1393
            let mut txn = store.begin().unwrap();
1394
            txn.set(&k3.clone(), &val).unwrap();
1395
            txn.commit().await.unwrap();
1396
1397
            // Verify the first record
1398
            let mut txn = store.begin().unwrap();
1399
            let val = txn.get(&k1).unwrap().unwrap();
1400
            assert_eq!(val, val);
1401
1402
            // Verify the second record
1403
            let val2 = txn.get(&k2).unwrap().unwrap();
1404
            assert_eq!(val2, val);
1405
1406
            // Verify the third record
1407
            let val3 = txn.get(&k3).unwrap().unwrap();
1408
            assert_eq!(val3, val);
1409
        }
1410
        store.close().await.expect("should close store");
1411
1412
        // Step 4: Reopen store with a max segment size smaller than the record
1413
        opts.max_segment_size = 36; // Smallest max segment size
1414
        let store = Store::new(opts.clone()).expect("should create store");
1415
        {
1416
            let mut txn = store.begin().unwrap();
1417
            txn.set(&k4.clone(), &val).unwrap();
1418
            let err = txn.commit().await.err().unwrap();
1419
            match err {
1420
                Error::LogError(LogError::RecordTooLarge) => (),
1421
                _ => panic!("expected RecordTooLarge error"),
1422
            };
1423
        }
1424
        store.close().await.expect("should close store");
1425
    }
1426
1427
    #[tokio::test]
1428
    async fn load_index_with_disabled_versions() {
1429
        let opts = Options {
1430
            dir: create_temp_directory().path().to_path_buf(),
1431
            enable_versions: false,
1432
            ..Default::default()
1433
        };
1434
1435
        let key = b"key";
1436
        let value1 = b"value1";
1437
        let value2 = b"value2";
1438
1439
        let store = Store::new(opts.clone()).unwrap();
1440
        let mut txn1 = store.begin().unwrap();
1441
        txn1.set(key, value1).unwrap();
1442
        txn1.commit().await.unwrap();
1443
1444
        let mut txn2 = store.begin().unwrap();
1445
        txn2.set(key, value2).unwrap();
1446
        txn2.commit().await.unwrap();
1447
        store.close().await.unwrap();
1448
1449
        let store = Store::new(opts.clone()).unwrap();
1450
        let txn = store.begin_with_mode(crate::Mode::ReadOnly).unwrap();
1451
        let history = txn.get_all_versions(key).unwrap();
1452
        assert_eq!(history.len(), 1);
1453
        assert_eq!(history[0].0, value2);
1454
        store.close().await.unwrap();
1455
    }
1456
1457
    #[tokio::test]
1458
    async fn insert_with_disabled_versions() {
1459
        let opts = Options {
1460
            dir: create_temp_directory().path().to_path_buf(),
1461
            enable_versions: false,
1462
            ..Default::default()
1463
        };
1464
1465
        let key = b"key";
1466
        let value1 = b"value1";
1467
        let value2 = b"value2";
1468
1469
        let store = Store::new(opts.clone()).unwrap();
1470
1471
        let mut txn1 = store.begin().unwrap();
1472
        txn1.set(key, value1).unwrap();
1473
        txn1.commit().await.unwrap();
1474
1475
        let mut txn2 = store.begin().unwrap();
1476
        txn2.set(key, value2).unwrap();
1477
        txn2.commit().await.unwrap();
1478
1479
        let txn = store.begin_with_mode(crate::Mode::ReadOnly).unwrap();
1480
        let history = txn.get_all_versions(key).unwrap();
1481
        assert_eq!(history.len(), 1);
1482
        assert_eq!(history[0].0, value2);
1483
1484
        store.close().await.unwrap();
1485
    }
1486
1487
    #[tokio::test]
1488
    async fn insert_with_disabled_versions_in_memory() {
1489
        let opts = Options {
1490
            dir: create_temp_directory().path().to_path_buf(),
1491
            enable_versions: false,
1492
            disk_persistence: false,
1493
            ..Default::default()
1494
        };
1495
1496
        let key = b"key";
1497
        let value1 = b"value1";
1498
        let value2 = b"value2";
1499
1500
        let store = Store::new(opts.clone()).unwrap();
1501
1502
        let mut txn1 = store.begin().unwrap();
1503
        txn1.set(key, value1).unwrap();
1504
        txn1.commit().await.unwrap();
1505
1506
        let mut txn2 = store.begin().unwrap();
1507
        txn2.set(key, value2).unwrap();
1508
        txn2.commit().await.unwrap();
1509
1510
        let txn = store.begin_with_mode(crate::Mode::ReadOnly).unwrap();
1511
        let history = txn.get_all_versions(key).unwrap();
1512
        assert_eq!(history.len(), 1);
1513
        assert_eq!(history[0].0, value2);
1514
1515
        store.close().await.unwrap();
1516
    }
1517
1518
    #[tokio::test]
1519
    async fn insert_with_replace() {
1520
        let opts = Options {
1521
            dir: create_temp_directory().path().to_path_buf(),
1522
            enable_versions: true,
1523
            ..Default::default()
1524
        };
1525
1526
        let key = b"key";
1527
        let value1 = b"value1";
1528
        let value2 = b"value2";
1529
1530
        let store = Store::new(opts.clone()).unwrap();
1531
1532
        let mut txn1 = store.begin().unwrap();
1533
        txn1.set(key, value1).unwrap();
1534
        txn1.commit().await.unwrap();
1535
1536
        let mut txn2 = store.begin().unwrap();
1537
        txn2.insert_or_replace(key, value2).unwrap();
1538
        txn2.commit().await.unwrap();
1539
1540
        let txn = store.begin_with_mode(crate::Mode::ReadOnly).unwrap();
1541
        let history = txn.get_all_versions(key).unwrap();
1542
        assert_eq!(history.len(), 1);
1543
        assert_eq!(history[0].0, value2);
1544
1545
        store.close().await.unwrap();
1546
    }
1547
1548
    #[tokio::test]
1549
    async fn insert_with_replace_in_memory() {
1550
        let opts = Options {
1551
            dir: create_temp_directory().path().to_path_buf(),
1552
            enable_versions: true,
1553
            disk_persistence: false,
1554
            ..Default::default()
1555
        };
1556
1557
        let key = b"key";
1558
        let value1 = b"value1";
1559
        let value2 = b"value2";
1560
1561
        let store = Store::new(opts.clone()).unwrap();
1562
1563
        let mut txn1 = store.begin().unwrap();
1564
        txn1.set(key, value1).unwrap();
1565
        txn1.commit().await.unwrap();
1566
1567
        let mut txn2 = store.begin().unwrap();
1568
        txn2.insert_or_replace(key, value2).unwrap();
1569
        txn2.commit().await.unwrap();
1570
1571
        let txn = store.begin_with_mode(crate::Mode::ReadOnly).unwrap();
1572
        let history = txn.get_all_versions(key).unwrap();
1573
        assert_eq!(history.len(), 1);
1574
        assert_eq!(history[0].0, value2);
1575
1576
        store.close().await.unwrap();
1577
    }
1578
1579
    #[tokio::test]
1580
    async fn delete_with_versions_disabled() {
1581
        let opts = Options {
1582
            dir: create_temp_directory().path().to_path_buf(),
1583
            enable_versions: false,
1584
            ..Default::default()
1585
        };
1586
1587
        let key = Bytes::from("key");
1588
        let value1 = b"value1";
1589
1590
        let store = Store::new(opts.clone()).unwrap();
1591
1592
        let mut txn1 = store.begin().unwrap();
1593
        txn1.set_at_ts(&key, value1, 1).unwrap();
1594
        txn1.commit().await.unwrap();
1595
1596
        let mut txn2 = store.begin().unwrap();
1597
        txn2.soft_delete(&key).unwrap();
1598
        txn2.commit().await.unwrap();
1599
1600
        let txn3 = store.begin().unwrap();
1601
        let versions: Vec<_> = txn3
1602
            .scan_all_versions(key.as_ref()..=key.as_ref(), None)
1603
            .collect();
1604
        assert!(versions.is_empty());
1605
1606
        store.close().await.unwrap();
1607
    }
1608
1609
    #[tokio::test]
1610
    async fn delete_with_versions_disabled_in_memory() {
1611
        let opts = Options {
1612
            dir: create_temp_directory().path().to_path_buf(),
1613
            enable_versions: false,
1614
            disk_persistence: false,
1615
            ..Default::default()
1616
        };
1617
1618
        let key = Bytes::from("key");
1619
        let value1 = b"value1";
1620
1621
        let store = Store::new(opts.clone()).unwrap();
1622
1623
        let mut txn1 = store.begin().unwrap();
1624
        txn1.set_at_ts(&key, value1, 1).unwrap();
1625
        txn1.commit().await.unwrap();
1626
1627
        let mut txn2 = store.begin().unwrap();
1628
        txn2.soft_delete(&key).unwrap();
1629
        txn2.commit().await.unwrap();
1630
1631
        let txn3 = store.begin().unwrap();
1632
        let versions: Vec<_> = txn3
1633
            .scan_all_versions(key.as_ref()..=key.as_ref(), None)
1634
            .collect();
1635
        assert!(versions.is_empty());
1636
1637
        store.close().await.unwrap();
1638
    }
1639
1640
    // Common setup logic for creating a store
1641
    fn create_store(dir: Option<TempDir>, is_ssi: bool) -> (Store, TempDir) {
1642
        let temp_dir = dir.unwrap_or_else(create_temp_directory);
1643
        let mut opts = Options::new();
1644
        opts.dir = temp_dir.path().to_path_buf();
1645
        if is_ssi {
1646
            opts.isolation_level = IsolationLevel::SerializableSnapshotIsolation;
1647
        }
1648
        (
1649
            Store::new(opts.clone()).expect("should create store"),
1650
            temp_dir,
1651
        )
1652
    }
1653
1654
    #[tokio::test]
1655
    async fn test_store_version_after_reopen() {
1656
        let (store, temp_dir) = create_store(None, false);
1657
1658
        // Define the number of keys, key size, and value size
1659
        let num_keys = 10000u32;
1660
        let key_size = 32;
1661
        let value_size = 32;
1662
1663
        // Generate keys and values
1664
        let keys: Vec<Bytes> = (0..num_keys)
1665
            .map(|i| {
1666
                let mut key = vec![0; key_size];
1667
                key[..4].copy_from_slice(&i.to_be_bytes());
1668
                Bytes::from(key)
1669
            })
1670
            .collect();
1671
1672
        let value = Bytes::from(vec![0; value_size]);
1673
1674
        // Insert keys into the store
1675
        {
1676
            for key in &keys {
1677
                let mut txn = store.begin().unwrap();
1678
                txn.set(key, &value).unwrap();
1679
                txn.commit().await.unwrap();
1680
            }
1681
        }
1682
1683
        // Close the store
1684
        store.close().await.unwrap();
1685
1686
        // Reopen the store
1687
        let (store, _) = create_store(Some(temp_dir), false);
1688
1689
        // Verify if the indexer version is set correctly
1690
        assert_eq!(
1691
            store.inner.as_ref().unwrap().core.indexer.read().version(),
1692
            num_keys as u64
1693
        );
1694
1695
        // Verify that the keys are present in the store
1696
        let txn = store.begin_with_mode(crate::Mode::ReadOnly).unwrap();
1697
        for key in &keys {
1698
            let history = txn.get_all_versions(key).unwrap();
1699
            assert_eq!(history.len(), 1);
1700
            assert_eq!(history[0].0, value);
1701
        }
1702
1703
        store.close().await.unwrap();
1704
    }
1705
1706
    #[tokio::test]
1707
    async fn test_incremental_transaction_ids_post_store_open() {
1708
        let (store, temp_dir) = create_store(None, false);
1709
1710
        let total_records = 1000;
1711
        let multiple_keys_records = total_records / 2;
1712
1713
        // Define keys and values
1714
        let keys: Vec<Bytes> = (1..=total_records)
1715
            .map(|i| Bytes::from(format!("key{}", i)))
1716
            .collect();
1717
        let values: Vec<Bytes> = (1..=total_records)
1718
            .map(|i| Bytes::from(format!("value{}", i)))
1719
            .collect();
1720
1721
        // Insert multiple transactions with single keys
1722
        for (i, key) in keys.iter().enumerate().take(multiple_keys_records) {
1723
            let mut txn = store.begin().unwrap();
1724
            txn.set(key, &values[i]).unwrap();
1725
            txn.commit().await.unwrap();
1726
        }
1727
1728
        // Insert multiple transactions with multiple keys
1729
        for (i, key) in keys
1730
            .iter()
1731
            .enumerate()
1732
            .skip(multiple_keys_records)
1733
            .take(multiple_keys_records)
1734
        {
1735
            let mut txn = store.begin().unwrap();
1736
            txn.set(key, &values[i]).unwrap();
1737
            txn.set(&keys[(i + 1) % keys.len()], &values[(i + 1) % values.len()])
1738
                .unwrap();
1739
            txn.commit().await.unwrap();
1740
        }
1741
1742
        // Close the store
1743
        store.close().await.unwrap();
1744
1745
        // Add delay to ensure that the store is closed
1746
        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1747
1748
        // Reopen the store
1749
        let (store, _) = create_store(Some(temp_dir), false);
1750
1751
        // Commit a new transaction with a single key
1752
        {
1753
            let mut txn = store.begin().unwrap();
1754
            txn.set(&keys[0], &values[0]).unwrap();
1755
            txn.commit().await.unwrap();
1756
1757
            let res = txn.get_versionstamp().unwrap();
1758
1759
            assert_eq!(res.0, total_records as u64 + 1);
1760
        }
1761
1762
        // Commit another new transaction with multiple keys
1763
        {
1764
            let mut txn = store.begin().unwrap();
1765
            txn.set(&keys[1], &values[1]).unwrap();
1766
            txn.set(&keys[2], &values[2]).unwrap();
1767
            txn.commit().await.unwrap();
1768
1769
            let res = txn.get_versionstamp().unwrap();
1770
1771
            assert_eq!(res.0, total_records as u64 + 2);
1772
        }
1773
1774
        store.close().await.unwrap();
1775
    }
1776
1777
    #[tokio::test]
1778
    async fn stop_task_runner_with_pending_tasks() {
1779
        // Create a temporary directory for testing
1780
        let temp_dir = create_temp_directory();
1781
1782
        // Create store options with the test directory
1783
        let mut opts = Options::new();
1784
        opts.dir = temp_dir.path().to_path_buf();
1785
1786
        let (writes_tx, writes_rx) = bounded(100);
1787
        let (stop_tx, stop_rx) = bounded(1);
1788
        let core = Arc::new(Core::new(opts, writes_tx.clone()).unwrap());
1789
1790
        let (runner, done_rx) = TaskRunner::new(core.clone(), writes_rx, stop_rx);
1791
        runner.spawn();
1792
1793
        // Create a task that will take some time to process
1794
        let (slow_done_tx, slow_done_rx) = bounded(1);
1795
        let slow_task = Task {
1796
            entries: vec![],
1797
            done: Some(slow_done_tx),
1798
            tx_id: 1,
1799
            durability: Durability::default(),
1800
        };
1801
1802
        // Send the slow task
1803
        writes_tx.send(slow_task).await.unwrap();
1804
1805
        // Send stop signal immediately
1806
        stop_tx.send(()).await.unwrap();
1807
1808
        // Wait for TaskRunner to finish
1809
        done_rx
1810
            .recv()
1811
            .await
1812
            .expect("TaskRunner should signal completion");
1813
1814
        // Verify the slow task was completed
1815
        assert!(slow_done_rx.recv().await.is_ok());
1816
    }
1817
1818
    #[tokio::test(flavor = "multi_thread")]
1819
    async fn stop_task_runner_concurrent_tasks() {
1820
        // Create a temporary directory for testing
1821
        let temp_dir = create_temp_directory();
1822
1823
        // Create store options with the test directory
1824
        let mut opts = Options::new();
1825
        opts.dir = temp_dir.path().to_path_buf();
1826
1827
        // Create a new store instance
1828
        let store = Store::new(opts).expect("should create store");
1829
1830
        let (writes_tx, writes_rx) = bounded(1000); // Increased buffer size
1831
        let (stop_tx, stop_rx) = bounded(1);
1832
        let core = &store.inner.as_ref().unwrap().core;
1833
1834
        let (runner, finish_rx) = TaskRunner::new(core.clone(), writes_rx, stop_rx);
1835
        runner.spawn();
1836
1837
        let task_counter = Arc::new(AtomicU64::new(0));
1838
        let total_tasks = 1000;
1839
1840
        // First, send all tasks before stopping
1841
        for i in 0..total_tasks {
1842
            let (done_tx, done_rx) = bounded(1);
1843
            writes_tx
1844
                .send(Task {
1845
                    entries: vec![],
1846
                    done: Some(done_tx),
1847
                    tx_id: i,
1848
                    durability: Durability::default(),
1849
                })
1850
                .await
1851
                .expect("should send task");
1852
1853
            let task_counter = task_counter.clone();
1854
            tokio::spawn(async move {
1855
                done_rx.recv().await.unwrap().unwrap();
1856
                task_counter.fetch_add(1, Ordering::SeqCst);
1857
            });
1858
        }
1859
1860
        // Give some time for tasks to be processed
1861
        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1862
1863
        // Send stop signal
1864
        stop_tx.send(()).await.expect("should send stop signal");
1865
1866
        // Wait for TaskRunner to finish
1867
        finish_rx
1868
            .recv()
1869
            .await
1870
            .expect("TaskRunner should signal completion");
1871
1872
        // Give some time for all task counters to be updated
1873
        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1874
1875
        // Check that all tasks that were sent were processed
1876
        let final_count = task_counter.load(Ordering::SeqCst);
1877
        assert_eq!(
1878
            final_count, total_tasks,
1879
            "Expected {} tasks to be processed, but got {}",
1880
            total_tasks, final_count
1881
        );
1882
    }
1883
1884
    #[tokio::test(flavor = "multi_thread")]
1885
    async fn test_incremental_transaction_ids_concurrent() {
1886
        for is_ssi in [false, true] {
1887
            let (store, temp_dir) = create_store(None, is_ssi);
1888
            let store = Arc::new(store);
1889
1890
            let total_records = 1000;
1891
            let multiple_keys_records = total_records / 2;
1892
1893
            // Define keys and values
1894
            let keys: Vec<Bytes> = (1..=total_records)
1895
                .map(|i| Bytes::from(format!("key{}", i)))
1896
                .collect();
1897
            let values: Vec<Bytes> = (1..=total_records)
1898
                .map(|i| Bytes::from(format!("value{}", i)))
1899
                .collect();
1900
1901
            // Insert multiple transactions with single keys concurrently
1902
            let mut tasks = Vec::new();
1903
            for (i, key) in keys.iter().enumerate().take(multiple_keys_records) {
1904
                let store = store.clone();
1905
                let key = key.clone();
1906
                let value = values[i].clone();
1907
                tasks.push(tokio::spawn(async move {
1908
                    let mut txn = store.begin().unwrap();
1909
                    txn.set(&key, &value).unwrap();
1910
                    txn.commit().await.unwrap();
1911
                }));
1912
            }
1913
1914
            // Wait for all tasks to complete
1915
            for task in tasks {
1916
                task.await.unwrap();
1917
            }
1918
1919
            // Insert multiple transactions with multiple keys concurrently
1920
            let mut tasks = Vec::new();
1921
            for (i, key) in keys
1922
                .iter()
1923
                .enumerate()
1924
                .skip(multiple_keys_records)
1925
                .take(multiple_keys_records)
1926
            {
1927
                let store = store.clone();
1928
                let key = key.clone();
1929
                let value = values[i].clone();
1930
                let next_key = keys[(i + multiple_keys_records) % keys.len()].clone();
1931
                let next_value = values[(i + multiple_keys_records) % values.len()].clone();
1932
                tasks.push(tokio::spawn(async move {
1933
                    let mut txn = store.begin().unwrap();
1934
                    txn.set(&key, &value).unwrap();
1935
                    txn.set(&next_key, &next_value).unwrap();
1936
                    txn.commit().await.unwrap();
1937
                }));
1938
            }
1939
1940
            // Wait for all tasks to complete
1941
            for task in tasks {
1942
                task.await.unwrap();
1943
            }
1944
            // Close the store
1945
            store.close().await.unwrap();
1946
1947
            // Add delay to ensure that the store is closed
1948
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1949
1950
            // Reopen the store
1951
            let (store, _) = create_store(Some(temp_dir), is_ssi);
1952
1953
            // Commit a new transaction with a single key
1954
            {
1955
                let mut txn = store.begin().unwrap();
1956
                txn.set(&keys[0], &values[0]).unwrap();
1957
                txn.commit().await.unwrap();
1958
1959
                let res = txn.get_versionstamp().unwrap();
1960
1961
                assert_eq!(res.0, total_records as u64 + 1);
1962
            }
1963
1964
            // Commit another new transaction with multiple keys
1965
            {
1966
                let mut txn = store.begin().unwrap();
1967
                txn.set(&keys[1], &values[1]).unwrap();
1968
                txn.set(&keys[2], &values[2]).unwrap();
1969
                txn.commit().await.unwrap();
1970
1971
                let res = txn.get_versionstamp().unwrap();
1972
1973
                assert_eq!(res.0, total_records as u64 + 2);
1974
            }
1975
1976
            store.close().await.unwrap();
1977
        }
1978
    }
1979
}