DataFileCache.java

/* Copyright (c) 2001-2024, The HSQL Development Group
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * Redistributions of source code must retain the above copyright notice, this
 * list of conditions and the following disclaimer.
 *
 * Redistributions in binary form must reproduce the above copyright notice,
 * this list of conditions and the following disclaimer in the documentation
 * and/or other materials provided with the distribution.
 *
 * Neither the name of the HSQL Development Group nor the names of its
 * contributors may be used to endorse or promote products derived from this
 * software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL HSQL DEVELOPMENT GROUP, HSQLDB.ORG,
 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */


package org.hsqldb.persist;

import java.io.IOException;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.hsqldb.Database;
import org.hsqldb.HsqlException;
import org.hsqldb.Session;
import org.hsqldb.error.Error;
import org.hsqldb.error.ErrorCode;
import org.hsqldb.lib.FileAccess;
import org.hsqldb.lib.FileArchiver;
import org.hsqldb.lib.IntIndex;
import org.hsqldb.map.BitMap;
import org.hsqldb.rowio.RowInputBinaryDecode;
import org.hsqldb.rowio.RowInputInterface;
import org.hsqldb.rowio.RowOutputBinaryEncode;
import org.hsqldb.rowio.RowOutputInterface;

/**
 * Acts as a manager for CACHED table persistence.<p>
 *
 * This contains the top level functionality. Provides file management services
 * and access.<p>
 *
 * @author Fred Toussi (fredt@users dot sourceforge.net)
 * @version 2.5.1
 * @since 1.7.2
 */
public class DataFileCache {

    interface Flags {

        int FLAG_ISSHADOWED = 1;
        int FLAG_ISSAVED    = 2;
        int FLAG_ROWINFO    = 3;
        int FLAG_200        = 4;
        int FLAG_HX         = 5;
        int FLAG_251        = 6;
    }

    /**
     * file format fields
     */
    interface Positions {

        int LONG_EMPTY_SIZE      = 4;                                          // empty space size
        int LONG_FREE_POS        = 12;                                         // where iFreePos is saved
        int INT_SPACE_PROPS      = 20;                                         // (space size << 16) + scale
        int INT_SPACE_LIST_POS   = 24;                                         // space list
        int INT_FLAGS            = 28;
        int LONG_TIMESTAMP       = 32;                                         // db open tx timestamp
        int MIN_INITIAL_FREE_POS = 64;                                         // not used up to this
        int MAX_INITIAL_FREE_POS = DataSpaceManager.fixedDiskBlockSize * 2;    // not used up to this
    }

    protected FileAccess fa;

    //
    public DataSpaceManager spaceManager;
    static final int        initIOBufferSize = 4096;

    //
    protected String   dataFileName;
    protected String   backupFileName;
    protected Database database;
    protected boolean  logEvents = true;

    /**
     * this flag is used externally to determine if a backup is required
     */
    protected boolean fileModified;
    protected boolean cacheModified;
    protected int     dataFileScale;
    protected int     dataFileSpace;

    // post opening constant fields
    protected boolean cacheReadonly;

    //
    protected int cachedRowPadding;

    //
    protected long lostSpaceSize;
    protected long spaceManagerPosition;
    protected long fileStartFreePosition;
    protected int  storeCount;

    // reusable input / output streams
    protected RowInputInterface rowIn;
    public RowOutputInterface   rowOut;

    //
    public long maxDataFileSize;

    //
    boolean is251;

    //
    protected RandomAccessInterface dataFile;
    protected volatile long         fileFreePosition;
    protected int                   maxCacheRows;     // number of Rows
    protected long                  maxCacheBytes;    // number of bytes
    protected Cache                 cache;

    //
    private RAShadowFile shadowFile;

    //
    ReadWriteLock lock      = new ReentrantReadWriteLock();
    Lock          readLock  = lock.readLock();
    Lock          writeLock = lock.writeLock();

    public DataFileCache(Database db, String baseFileName) {
        initParams(db, baseFileName, false);

        cache = new Cache(this);
    }

    /**
     * used for defrag
     */
    public DataFileCache(Database db, String baseFileName, boolean defrag) {

        initParams(db, baseFileName, true);

        cache = new Cache(this);

        try {
            dataFile = new RAFileSimple(database.logger, dataFileName, "rw");
        } catch (Throwable t) {
            throw Error.error(ErrorCode.FILE_IO_ERROR, t);
        }

        initNewFile();
        initBuffers();

        if (dataFileSpace > 0) {
            spaceManager = new DataSpaceManagerBlocks(this);
        } else {
            spaceManager = new DataSpaceManagerSimple(this, false);
        }
    }

    /**
     * initial external parameters are set here.
     */
    protected void initParams(
            Database database,
            String baseFileName,
            boolean defrag) {

        this.database    = database;
        dataFileName     = baseFileName + Logger.dataFileExtension;
        backupFileName   = baseFileName + Logger.backupFileExtension;
        fa               = database.logger.getFileAccess();
        dataFileScale    = database.logger.getDataFileScale();
        dataFileSpace    = database.logger.getDataFileSpace();
        cachedRowPadding = dataFileScale;

        if (dataFileScale < 8) {
            cachedRowPadding = 8;
        }

        cacheReadonly = database.isFilesReadOnly();
        maxCacheRows  = database.logger.getCacheMaxRows();
        maxCacheBytes = database.logger.getCacheSize();
        maxDataFileSize = (long) Integer.MAX_VALUE * dataFileScale
                          * database.logger.getDataFileFactor();

        if (defrag) {
            dataFileName   = dataFileName + Logger.newFileExtension;
            backupFileName = backupFileName + Logger.newFileExtension;
            maxCacheRows   = 1024;
            maxCacheBytes  = 1024 * 4096;
        }
    }

    /**
     * Opens the *.data file for this cache, setting the variables that
     * allow access to the particular database version of the *.data file.
     */
    public void open(boolean readonly) {

        logInfoEvent("dataFileCache open start");

        try {
            boolean isNio = database.logger.propNioDataFile;
            int     fileType;

            if (database.isFilesInJar()) {
                fileType = RAFile.DATA_FILE_JAR;
            } else if (isNio) {
                fileType = RAFile.DATA_FILE_NIO;
            } else {
                fileType = RAFile.DATA_FILE_RAF;
            }

            if (readonly || database.isFilesInJar()) {
                dataFile = RAFile.newScaledRAFile(
                    database,
                    dataFileName,
                    true,
                    fileType);

                int flags = getFlags();

                if (BitMap.isSet(flags, Flags.FLAG_HX)) {
                    throw Error.error(ErrorCode.WRONG_DATABASE_FILE_VERSION);
                }

                dataFile.seek(Positions.LONG_FREE_POS);

                fileFreePosition = dataFile.readLong();

                dataFile.seek(Positions.INT_SPACE_LIST_POS);

                spaceManagerPosition = (long) dataFile.readInt()
                                       * DataSpaceManager.fixedDiskBlockSize;

                initBuffers();

                spaceManager = new DataSpaceManagerSimple(this, true);

                return;
            }

            boolean preexists     = fa.isStreamElement(dataFileName);
            boolean isIncremental = true;
            boolean isSaved       = false;
            boolean doRestore     = false;

            if (preexists) {
                dataFile = new RAFileSimple(database.logger, dataFileName, "r");

                long    length       = dataFile.length();
                boolean wrongVersion = false;

                if (length > Positions.LONG_TIMESTAMP) {
                    int flags = getFlags();

                    isSaved       = BitMap.isSet(flags, Flags.FLAG_ISSAVED);
                    isIncremental = BitMap.isSet(flags, Flags.FLAG_ISSHADOWED);
                    is251         = BitMap.isSet(flags, Flags.FLAG_251);

                    if (BitMap.isSet(flags, Flags.FLAG_HX)) {
                        wrongVersion = true;
                    }
                } else {
                    preexists = false;
                }

                if (isSaved && is251) {
                    dataFile.seek(Positions.LONG_TIMESTAMP);

                    long timestamp = dataFile.readLong();

                    if (timestamp > database.logger.getFilesTimestamp()) {
                        doRestore = true;
                    }
                }

                dataFile.close();

                if (wrongVersion) {
                    throw Error.error(ErrorCode.WRONG_DATABASE_FILE_VERSION);
                }

                if (!database.logger.propLargeData) {
                    if (length > (maxDataFileSize / 8) * 7) {
                        database.logger.propLargeData = true;
                        maxDataFileSize = (long) Integer.MAX_VALUE
                                          * dataFileScale
                                          * database.logger.getDataFileFactor();
                    }
                }

                if (length > maxDataFileSize) {
                    throw Error.error(
                        ErrorCode.DATA_FILE_IS_FULL,
                        String.valueOf(maxDataFileSize));
                }
            }

            if (preexists) {
                if (isSaved) {
                    boolean existsBackup = fa.isStreamElement(backupFileName);

                    if (existsBackup) {
                        logInfoEvent(
                            "data file was not modified but inc backup exists");

                        if (doRestore) {
                            restoreBackupIncremental();
                        }
                    }

                    deleteBackupFile();
                } else {
                    boolean restored;

                    if (isIncremental) {
                        restored = restoreBackupIncremental();
                    } else {
                        restored = restoreBackup();
                    }

                    if (!restored) {
                        database.logger.logSevereEvent(
                            "DataFileCache data file modified but no backup exists",
                            null);

                        throw Error.error(ErrorCode.DATA_FILE_BACKUP_MISMATCH);
                    }
                }
            }

            dataFile = RAFile.newScaledRAFile(
                database,
                dataFileName,
                readonly,
                fileType);

            if (preexists) {
                dataFile.seek(Positions.LONG_EMPTY_SIZE);

                lostSpaceSize = dataFile.readLong();

                dataFile.seek(Positions.LONG_FREE_POS);

                fileFreePosition      = dataFile.readLong();
                fileStartFreePosition = fileFreePosition;

                dataFile.seek(Positions.INT_SPACE_PROPS);

                int spaceProps = dataFile.readInt();

                setSpaceProps(spaceProps);
                dataFile.seek(Positions.INT_SPACE_LIST_POS);

                spaceManagerPosition = (long) dataFile.readInt()
                                       * DataSpaceManager.fixedDiskBlockSize;

                int flags = getFlags();

                flags = BitMap.set(flags, Flags.FLAG_ISSHADOWED);
                flags = BitMap.set(flags, Flags.FLAG_ISSAVED);

                setFlags(flags);
            } else {
                initNewFile();
            }

            initBuffers();

            fileModified  = false;
            cacheModified = false;

            if (dataFileSpace > 0) {
                spaceManager = new DataSpaceManagerBlocks(this);
            } else {
                spaceManager = new DataSpaceManagerSimple(this, false);
            }

            if (!preexists) {
                reset();
            }

            openShadowFile();
            logInfoEvent("dataFileCache open end");
        } catch (HsqlException e) {
            throw e;
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.open", t);
            release();

            throw Error.error(
                t,
                ErrorCode.FILE_IO_ERROR,
                ErrorCode.M_DataFileCache_open,
                new String[]{ t.toString(), dataFileName });
        }
    }

    void setSpaceProps(int spaceProps) {

        if (spaceProps == 0) {
            spaceProps = dataFileScale | (dataFileSpace << 16);

            try {
                dataFile.seek(Positions.INT_SPACE_PROPS);
                dataFile.writeInt(spaceProps);
                dataFile.synch();
            } catch (Throwable t) {
                throw Error.error(ErrorCode.FILE_IO_ERROR, t);
            }

            return;
        }

        dataFileScale = spaceProps & 0xffff;
        dataFileSpace = spaceProps >>> 16;

        database.logger.setDataFileScaleNoCheck(dataFileScale);
        database.logger.setDataFileSpace(dataFileSpace);
    }

    void initNewFile() {

        try {
            int initialFreePos = Positions.MAX_INITIAL_FREE_POS;

            if (dataFileSpace == 0) {
                initialFreePos = Positions.MIN_INITIAL_FREE_POS;

                if (initialFreePos < dataFileScale) {
                    initialFreePos = dataFileScale;
                }
            }

            fileFreePosition      = initialFreePos;
            fileStartFreePosition = initialFreePos;

            dataFile.seek(Positions.LONG_FREE_POS);
            dataFile.writeLong(fileFreePosition);

            int spaceProps = dataFileScale | (dataFileSpace << 16);

            dataFile.seek(Positions.INT_SPACE_PROPS);
            dataFile.writeInt(spaceProps);
            dataFile.seek(Positions.LONG_TIMESTAMP);
            dataFile.writeLong(database.logger.getFilesTimestamp());

            // set shadowed flag;
            int flags = 0;

            flags = BitMap.set(flags, Flags.FLAG_ISSHADOWED);
            flags = BitMap.set(flags, Flags.FLAG_ISSAVED);
            flags = BitMap.set(flags, Flags.FLAG_200);
            flags = BitMap.set(flags, Flags.FLAG_251);

            setFlags(flags);

            is251 = true;
        } catch (Throwable t) {
            throw Error.error(ErrorCode.FILE_IO_ERROR, t);
        }
    }

    private void openShadowFile() {

        shadowFile = new RAShadowFile(
            database.logger,
            dataFile,
            backupFileName,
            fileFreePosition,
            1 << 14);
    }

    /**
     * Restores a compressed backup or the .data file.
     */
    private boolean restoreBackup() {

        try {
            FileAccess fileAccess = database.logger.getFileAccess();

            deleteBackupFile();

            if (fileAccess.isStreamElement(backupFileName)) {
                FileArchiver.unarchive(
                    backupFileName,
                    dataFileName,
                    fileAccess,
                    FileArchiver.COMPRESSION_ZIP);

                return true;
            }

            return false;
        } catch (Throwable t) {
            database.logger.logSevereEvent("DataFileCache.restoreBackup", t);

            throw Error.error(
                t,
                ErrorCode.FILE_IO_ERROR,
                ErrorCode.M_Message_Pair,
                new String[]{ t.toString(), backupFileName });
        }
    }

    /**
     * Restores in from an incremental backup
     */
    private boolean restoreBackupIncremental() {

        try {
            FileAccess fileAccess = database.logger.getFileAccess();

            if (fileAccess.isStreamElement(backupFileName)) {
                RAShadowFile.restoreFile(
                    database,
                    backupFileName,
                    dataFileName);
                deleteBackupFile();

                return true;
            }

            // this is an anomaly where no backup exists but .data file
            // modified flag has been set
            return false;
        } catch (Throwable e) {
            database.logger.logSevereEvent(
                "DataFileCache.restoreBackupIncremental",
                e);

            throw Error.error(ErrorCode.FILE_IO_ERROR, e);
        }
    }

    /**
     *  Abandons changed rows and closes the .data file.
     */
    public void release() {

        writeLock.lock();

        try {
            if (dataFile == null) {
                return;
            }

            if (shadowFile != null) {
                shadowFile.close();

                shadowFile = null;
            }

            dataFile.close();
            logDetailEvent("dataFileCache file closed");

            dataFile = null;
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.release", t);
        } finally {
            writeLock.unlock();
        }
    }

    /**
     *  Writes out all cached rows that have been modified and the
     *  free position pointer for the *.data file and then closes the file.
     */
    public void close() {

        writeLock.lock();

        try {
            if (dataFile == null) {
                return;
            }

            reset();
            dataFile.close();
            logDetailEvent("dataFileCache file close end");

            dataFile = null;
        } catch (HsqlException e) {
            throw e;
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.close", t);

            throw Error.error(
                t,
                ErrorCode.FILE_IO_ERROR,
                ErrorCode.M_DataFileCache_close,
                new String[]{ t.toString(), dataFileName });
        } finally {
            writeLock.unlock();
        }
    }

    protected void clear() {

        writeLock.lock();

        try {
            cache.clear();
        } finally {
            writeLock.unlock();
        }
    }

    public void adjustStoreCount(int adjust) {

        writeLock.lock();

        try {
            storeCount += adjust;
        } finally {
            writeLock.unlock();
        }
    }

    public void reopen() {

        writeLock.lock();

        try {
            openShadowFile();
            spaceManager.initialiseSpaces();
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * Commits all the changes to the file
     */
    public void reset() {

        writeLock.lock();

        try {
            if (cacheReadonly) {
                return;
            }

            logInfoEvent("dataFileCache commit start");
            spaceManager.reset();
            cache.saveAll();

            // set empty
            long lostSize = spaceManager.getLostBlocksSize();

            dataFile.seek(Positions.LONG_EMPTY_SIZE);
            dataFile.writeLong(lostSize);

            // set end
            dataFile.seek(Positions.LONG_FREE_POS);
            dataFile.writeLong(fileFreePosition);

            // set space props
            int spaceProps = dataFileScale | (dataFileSpace << 16);

            dataFile.seek(Positions.INT_SPACE_PROPS);
            dataFile.writeInt(spaceProps);

            // set space list
            int pos = (int) (spaceManagerPosition
                             / DataSpaceManager.fixedDiskBlockSize);

            dataFile.seek(Positions.INT_SPACE_LIST_POS);
            dataFile.writeInt(pos);

            if (is251) {
                dataFile.seek(Positions.LONG_TIMESTAMP);
                dataFile.writeLong(database.logger.getFilesTimestamp());
            }

            // set saved flag and sync file
            setFlag(Flags.FLAG_ISSAVED, true);
            logDetailEvent("file sync end");

            fileModified          = false;
            cacheModified         = false;
            fileStartFreePosition = fileFreePosition;

            if (shadowFile != null) {
                shadowFile.close();

                shadowFile = null;
            }

            logInfoEvent("dataFileCache commit end");
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.reset commit", t);

            throw Error.error(
                t,
                ErrorCode.FILE_IO_ERROR,
                ErrorCode.M_DataFileCache_close,
                new String[]{ t.toString(), dataFileName });
        } finally {
            writeLock.unlock();
        }
    }

    protected void initBuffers() {

        if (rowOut == null) {
            rowOut = new RowOutputBinaryEncode(
                database.logger.getCrypto(),
                initIOBufferSize,
                cachedRowPadding);
        }

        if (rowIn == null) {
            rowIn = new RowInputBinaryDecode(
                database.logger.getCrypto(),
                new byte[initIOBufferSize]);
        }
    }

    DataFileDefrag defrag(Session session) {

        writeLock.lock();

        try {
            cache.saveAll();

            DataFileDefrag dfd = new DataFileDefrag(database, this);

            dfd.process(session);

            return dfd;
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * Used when a row is deleted as a result of some DML or DDL statement.
     * Removes the row from the cache data structures.
     */
    public void remove(CachedObject object) {
        release(object.getPos());
    }

    public void removePersistence(CachedObject object) {}

    public void add(CachedObject object, boolean keep) {

        writeLock.lock();

        try {
            cacheModified = true;

            cache.put(object);

            if (keep) {
                object.keepInMemory(true);
            }

            if (object.getStorageSize() > initIOBufferSize) {
                rowOut.reset(object.getStorageSize());
            }
        } finally {
            writeLock.unlock();
        }
    }

    public CachedObject get(
            CachedObject object,
            PersistentStore store,
            boolean keep) {

        readLock.lock();

        long pos;

        try {
            if (object.isInMemory()) {
                if (keep) {
                    object.keepInMemory(true);
                }

                return object;
            }

            pos = object.getPos();

            if (pos < 0) {
                return null;
            }

            object = cache.get(pos);

            if (object != null) {
                if (keep) {
                    object.keepInMemory(true);
                }

                return object;
            }
        } finally {
            readLock.unlock();
        }

        return getFromFile(pos, store, keep);
    }

    public CachedObject get(
            long pos,
            int size,
            PersistentStore store,
            boolean keep) {

        CachedObject object;

        if (pos < 0) {
            return null;
        }

        readLock.lock();

        try {
            object = cache.get(pos);

            if (object != null) {
                if (keep) {
                    object.keepInMemory(true);
                }

                return object;
            }
        } finally {
            readLock.unlock();
        }

        return getFromFile(pos, size, store, keep);
    }

    public CachedObject get(long pos, PersistentStore store, boolean keep) {

        CachedObject object;

        if (pos < 0) {
            return null;
        }

        readLock.lock();

        try {
            object = cache.get(pos);

            if (object != null) {
                if (keep) {
                    object.keepInMemory(true);
                }

                return object;
            }
        } finally {
            readLock.unlock();
        }

        return getFromFile(pos, store, keep);
    }

    private CachedObject getFromFile(
            long pos,
            PersistentStore store,
            boolean keep) {

        CachedObject object = null;

        writeLock.lock();

        try {
            object = cache.get(pos);

            if (object != null) {
                if (keep) {
                    object.keepInMemory(true);
                }

                return object;
            }

            for (int j = 0; j < 2; j++) {
                try {
                    readObject(pos);

                    object = store.get(rowIn);

                    if (object == null) {
                        throw Error.error(
                            ErrorCode.GENERAL_IO_ERROR,
                            "position " + pos);
                    }

                    break;
                } catch (Throwable t) {
                    if (t instanceof OutOfMemoryError) {
                        cache.clearUnchanged();

                        if (j > 0) {
                            logInfoEvent(
                                dataFileName + " getFromFile out of mem, pos: "
                                + pos);

                            HsqlException ex = Error.error(
                                ErrorCode.OUT_OF_MEMORY,
                                t);

                            ex.info = rowIn;

                            throw ex;
                        }
                    } else if (t instanceof HsqlException) {
                        ((HsqlException) t).info = rowIn;

                        throw(HsqlException) t;
                    } else {
                        HsqlException ex = Error.error(
                            ErrorCode.GENERAL_IO_ERROR,
                            t);

                        ex.info = rowIn;

                        throw ex;
                    }
                }
            }

            if (object == null) {
                throw Error.error(ErrorCode.DATA_FILE_ERROR);
            }

            // for text tables with empty rows at the beginning,
            // pos may move forward in readObject
            cache.put(object);

            if (keep) {
                object.keepInMemory(true);
            }

            return object;
        } catch (HsqlException e) {
            logSevereEvent(dataFileName + " getFromFile failed " + pos, e);

            throw e;
        } finally {
            writeLock.unlock();
        }
    }

    private CachedObject getFromFile(
            long pos,
            int size,
            PersistentStore store,
            boolean keep) {

        CachedObject object = null;

        writeLock.lock();

        try {
            object = cache.get(pos);

            if (object != null) {
                if (keep) {
                    object.keepInMemory(true);
                }

                return object;
            }

            for (int j = 0; j < 2; j++) {
                try {
                    readObject(pos, size);

                    object = store.get(rowIn);
                    break;
                } catch (OutOfMemoryError err) {
                    cache.clearUnchanged();
                    System.gc();

                    if (j > 0) {
                        logSevereEvent(
                            dataFileName + " getFromFile out of mem " + pos,
                            err);

                        throw err;
                    }
                }
            }

            if (object == null) {
                throw Error.error(ErrorCode.DATA_FILE_ERROR);
            }

            cache.putUsingReserve(object);

            if (keep) {
                object.keepInMemory(true);
            }

            return object;
        } catch (HsqlException e) {
            logSevereEvent(dataFileName + " getFromFile failed " + pos, e);

            throw e;
        } finally {
            writeLock.unlock();
        }
    }

    RowInputInterface getRaw(long pos) {

        writeLock.lock();

        try {
            readObject(pos);

            return rowIn;
        } finally {
            writeLock.unlock();
        }
    }

    private void readObject(long pos) {

        try {
            dataFile.seek(pos * dataFileScale);

            int size = dataFile.readInt();

            rowIn.resetRow(pos, size);
            dataFile.read(rowIn.getBuffer(), 4, size - 4);
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.readObject", t, pos);

            HsqlException ex = Error.error(ErrorCode.DATA_FILE_ERROR, t);

            if (rowIn.getFilePosition() != pos) {
                rowIn.resetRow(pos, 0);
            }

            ex.info = rowIn;

            throw ex;
        }
    }

    protected void readObject(long pos, int size) {

        try {
            rowIn.resetBlock(pos, size);
            dataFile.seek(pos * dataFileScale);
            dataFile.read(rowIn.getBuffer(), 0, size);
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.readObject", t, pos);

            HsqlException ex = Error.error(ErrorCode.DATA_FILE_ERROR, t);

            ex.info = rowIn;

            throw ex;
        }
    }

    public void releaseRange(long startPos, long limitPos) {

        writeLock.lock();

        try {
            cacheModified = true;

            cache.releaseRange(startPos, limitPos);
        } finally {
            writeLock.unlock();
        }
    }

    public void releaseRange(IntIndex list, int fileBlockItemCount) {

        writeLock.lock();

        try {
            cacheModified = true;

            cache.releaseRange(list, fileBlockItemCount);
        } finally {
            writeLock.unlock();
        }
    }

    public CachedObject release(long pos) {

        writeLock.lock();

        try {
            cacheModified = true;

            return cache.release(pos);
        } finally {
            writeLock.unlock();
        }
    }

    protected void saveRows(CachedObject[] rows, int offset, int count) {

        if (count == 0) {
            return;
        }

        int  pageCount   = copyShadow(rows, offset, count);
        long startTime   = cache.saveAllTimer.elapsedTime();
        long storageSize = 0;

        cache.saveAllTimer.start();

        if (pageCount > 0) {
            setFileModified();
        }

        for (int i = offset; i < offset + count; i++) {
            CachedObject r = rows[i];

            saveRowNoLock(r);

            rows[i]     = null;
            storageSize += r.getStorageSize();
        }

        cache.saveAllTimer.stop();
        cache.logSaveRowsEvent(count, storageSize, startTime);
    }

    /**
     * Writes out the specified Row. Will write only the Nodes or both Nodes
     * and table row data depending on what is not already persisted to disk.
     */
    public void saveRow(CachedObject row) {

        writeLock.lock();

        try {
            copyShadow(row);
            setFileModified();
            saveRowNoLock(row);
        } finally {
            writeLock.unlock();
        }
    }

    public void saveRowOutput(long pos) {

        try {
            dataFile.seek(pos * dataFileScale);
            dataFile.write(
                rowOut.getOutputStream().getBuffer(),
                0,
                rowOut.getOutputStream().size());
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.saveRowOutput", t, pos);

            throw Error.error(ErrorCode.DATA_FILE_ERROR, t);
        }
    }

    protected void saveRowNoLock(CachedObject row) {

        try {
            rowOut.reset();
            row.write(rowOut);
            dataFile.seek(row.getPos() * dataFileScale);
            dataFile.write(
                rowOut.getOutputStream().getBuffer(),
                0,
                rowOut.getOutputStream().size());
            row.setChanged(false);
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.saveRowNoLock", t, row.getPos());

            throw Error.error(ErrorCode.DATA_FILE_ERROR, t);
        }
    }

    protected int copyShadow(CachedObject[] rows, int offset, int count) {

        int pageCount = 0;

        if (shadowFile != null) {
            long time    = cache.shadowTimer.elapsedTime();
            long seekpos = 0;

            cache.shadowTimer.start();

            try {
                for (int i = offset; i < offset + count; i++) {
                    CachedObject row = rows[i];

                    seekpos   = row.getPos() * dataFileScale;
                    pageCount += shadowFile.copy(seekpos, row.getStorageSize());
                }

                if (pageCount > 0) {
                    shadowFile.synch();
                }
            } catch (Throwable t) {
                logSevereEvent("DataFileCache.copyShadow", t, seekpos);

                throw Error.error(ErrorCode.DATA_FILE_ERROR, t);
            }

            cache.shadowTimer.stop();

            if (pageCount > 0) {
                time = cache.shadowTimer.elapsedTime() - time;

                logDetailEvent(
                    "copyShadow [size, time] " + shadowFile.getSavedLength()
                    + " " + time);
            }
        }

        return pageCount;
    }

    protected int copyShadow(CachedObject row) {

        if (shadowFile != null) {
            long seekpos = row.getPos() * dataFileScale;

            try {
                int pageCount = shadowFile.copy(seekpos, row.getStorageSize());

                shadowFile.synch();

                return pageCount;
            } catch (Throwable t) {
                logSevereEvent("DataFileCache.copyShadow", t, row.getPos());

                throw Error.error(ErrorCode.DATA_FILE_ERROR, t);
            }
        }

        return 0;
    }

    void deleteDataFile() {
        Log.deleteFile(fa, dataFileName);
    }

    private void deleteBackupFile() {
        Log.deleteFile(fa, backupFileName);
    }

    /**
     * Delta must always result in block multiples
     */
    public long enlargeFileSpace(long newLength) {

        writeLock.lock();

        try {
            long position = fileFreePosition;

            if (newLength > maxDataFileSize) {
                logSevereEvent(
                    "data file reached maximum allowed size: " + dataFileName
                    + " " + maxDataFileSize,
                    null);

                throw Error.error(ErrorCode.DATA_FILE_IS_FULL);
            }

            boolean result = dataFile.ensureLength(newLength);

            if (!result) {
                logSevereEvent(
                    "data file cannot be enlarged - disk space: "
                    + dataFileName + " " + newLength,
                    null);

                throw Error.error(ErrorCode.DATA_FILE_IS_FULL);
            }

            fileFreePosition = newLength;

            return position;
        } finally {
            writeLock.unlock();
        }
    }

    public int capacity() {
        return maxCacheRows;
    }

    public long bytesCapacity() {
        return maxCacheBytes;
    }

    public long getTotalCachedBlockSize() {
        return cache.getTotalCachedBlockSize();
    }

    public long getLostBlockSize() {
        return spaceManager.getLostBlocksSize();
    }

    public long getFileFreePos() {
        return fileFreePosition;
    }

    public int getCachedObjectCount() {
        return cache.size();
    }

    public String getFileName() {
        return dataFileName;
    }

    public int getDataFileScale() {
        return dataFileScale;
    }

    public int getDataFileSpace() {
        return dataFileSpace;
    }

    public boolean isFileModified() {
        return fileModified;
    }

    public boolean isModified() {
        return cacheModified;
    }

    public boolean isFileOpen() {
        return dataFile != null;
    }

    protected void setFileModified() {

        try {
            if (!fileModified) {

                // unset saved flag;
                setFlag(Flags.FLAG_ISSAVED, false);
                logDetailEvent("setFileModified flag set ");

                fileModified = true;
            }
        } catch (Throwable t) {
            logSevereEvent("DataFileCache.setFileModified", t);

            throw Error.error(ErrorCode.DATA_FILE_ERROR, t);
        }
    }

    int getFlags() throws IOException {

        dataFile.seek(Positions.INT_FLAGS);

        int flags = dataFile.readInt();

        return flags;
    }

    void setFlags(int flags) throws IOException {
        dataFile.seek(Positions.INT_FLAGS);
        dataFile.writeInt(flags);
        dataFile.synch();
    }

    void setFlag(int singleFlag, boolean val) throws IOException {

        dataFile.seek(Positions.INT_FLAGS);

        int flags = dataFile.readInt();

        flags = val
                ? BitMap.set(flags, singleFlag)
                : BitMap.unset(flags, singleFlag);

        dataFile.seek(Positions.INT_FLAGS);
        dataFile.writeInt(flags);
        dataFile.synch();
    }

    public boolean isDataReadOnly() {
        return this.cacheReadonly;
    }

    public RAShadowFile getShadowFile() {
        return shadowFile;
    }

    public AtomicInteger getAccessCount() {
        return cache.getAccessCount();
    }

    private void logSevereEvent(String message, Throwable t, long position) {

        if (logEvents) {
            StringBuilder sb = new StringBuilder();

            sb.append(message).append(' ').append(position);
            database.logger.logSevereEvent(sb.toString(), t);
        }
    }

    public void logSevereEvent(String message, Throwable t) {
        if (logEvents) {
            database.logger.logSevereEvent(message, t);
        }
    }

    void logInfoEvent(String message) {
        if (logEvents) {
            database.logger.logInfoEvent(message);
        }
    }

    void logDetailEvent(String message) {
        if (logEvents) {
            database.logger.logDetailEvent(message);
        }
    }
}