TarArchiveDataSource.java

/*
 * Copyright (c) 2024, RTE (http://www.rte-france.com)
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 * SPDX-License-Identifier: MPL-2.0
 */
package com.powsybl.commons.datasource;

import com.google.common.io.ByteStreams;
import com.powsybl.commons.io.ForwardingInputStream;
import com.powsybl.commons.io.ForwardingOutputStream;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;

/**
 * @author Nicolas Rol {@literal <nicolas.rol at rte-france.com>}
 */
public class TarArchiveDataSource extends AbstractArchiveDataSource {

    public TarArchiveDataSource(Path directory, String tarFileName, String baseName, String dataExtension, CompressionFormat compressionFormat, DataSourceObserver observer) {
        super(directory, tarFileName, baseName, dataExtension, compressionFormat, ArchiveFormat.TAR, observer);
    }

    public TarArchiveDataSource(Path directory, String tarFileName, String baseName, String dataExtension, CompressionFormat compressionFormat) {
        this(directory, tarFileName, baseName, dataExtension, compressionFormat, null);
    }

    public TarArchiveDataSource(Path directory, String baseName, String dataExtension, CompressionFormat compressionFormat, DataSourceObserver observer) {
        this(directory,
            baseName + (dataExtension == null || dataExtension.isEmpty() ? "" : "." + dataExtension) + ".tar" + (compressionFormat == null ? "" : "." + compressionFormat.getExtension()),
            baseName, dataExtension, compressionFormat, observer);
    }

    public TarArchiveDataSource(Path directory, String baseName, String dataExtension, CompressionFormat compressionFormat) {
        this(directory,
            baseName + (dataExtension == null || dataExtension.isEmpty() ? "" : "." + dataExtension) + ".tar" + (compressionFormat == null ? "" : "." + compressionFormat.getExtension()),
            baseName, dataExtension, compressionFormat, null);
    }

    public TarArchiveDataSource(Path directory, String baseName, CompressionFormat compressionFormat, DataSourceObserver observer) {
        this(directory,
            baseName + ".tar" + (compressionFormat == null ? "" : "." + compressionFormat.getExtension()),
            baseName, null, compressionFormat, observer);
    }

    public TarArchiveDataSource(Path directory, String baseName, CompressionFormat compressionFormat) {
        this(directory,
            baseName + ".tar" + (compressionFormat == null ? "" : "." + compressionFormat.getExtension()),
            baseName, null, compressionFormat, null);
    }

    public TarArchiveDataSource(Path directory, String baseName) {
        this(directory,
            baseName + ".tar",
            baseName, null, null, null);
    }

    public TarArchiveDataSource(Path tarFile) {
        this(tarFile.getParent(), new FileInformation(tarFile.getFileName().toString(), false));
    }

    public TarArchiveDataSource(Path tarFile, DataSourceObserver observer) {
        this(tarFile.getParent(), new FileInformation(tarFile.getFileName().toString(), false), observer);
    }

    private TarArchiveDataSource(Path directory, FileInformation fileInformation) {
        this(directory, fileInformation.getBaseName(), fileInformation.getDataExtension(), fileInformation.getCompressionFormat());
    }

    private TarArchiveDataSource(Path directory, FileInformation fileInformation, DataSourceObserver observer) {
        this(directory, fileInformation.getBaseName(), fileInformation.getDataExtension(), fileInformation.getCompressionFormat(), observer);
    }

    /**
     * {@inheritDoc}
     *
     * <p>Files are here located in the archive.</p>
     */
    @Override
    public Set<String> listNames(String regex) throws IOException {
        // Initialize variables
        Pattern p = Pattern.compile(regex);
        Set<String> names = new HashSet<>();
        Path tarFilePath = getArchiveFilePath();

        // Explore the archive
        try (BufferedInputStream inputStream = new BufferedInputStream(Files.newInputStream(tarFilePath));
             InputStream cis = getCompressedInputStream(inputStream, compressionFormat);
             TarArchiveInputStream tar = new TarArchiveInputStream(cis)) {
            ArchiveEntry entry;
            while ((entry = tar.getNextEntry()) != null) {
                if (!entry.isDirectory()
                    && p.matcher(entry.getName()).matches()) {
                    names.add(entry.getName());
                }
            }
        }
        return names;
    }

    protected boolean entryExists(Path tarFilePath, String fileName) {
        if (Files.exists(tarFilePath)) {
            try (InputStream fis = Files.newInputStream(tarFilePath);
                 BufferedInputStream bis = new BufferedInputStream(fis);
                 InputStream is = getCompressedInputStream(bis, compressionFormat);
                 TarArchiveInputStream tais = new TarArchiveInputStream(is)) {

                TarArchiveEntry entry;
                while ((entry = tais.getNextEntry()) != null) {
                    if (entry.getName().equals(fileName)) {
                        return true;
                    }
                }
                return false;
            } catch (IOException | UnsupportedOperationException e) {
                return false;
            }
        }
        return false;
    }

    @Override
    public OutputStream newOutputStream(String suffix, String ext, boolean append) throws IOException {
        return newOutputStream(DataSourceUtil.getFileName(baseName, suffix, ext), append);
    }

    @Override
    public OutputStream newOutputStream(String fileName, boolean append) throws IOException {
        Objects.requireNonNull(fileName);
        if (append) {
            throw new UnsupportedOperationException("append not supported in tar file data source");
        }
        Path tarFilePath = getArchiveFilePath();
        OutputStream os = new TarEntryOutputStream(tarFilePath, fileName, compressionFormat);
        return observer != null ? new ObservableOutputStream(os, tarFilePath + ":" + fileName, observer) : os;
    }

    @Override
    public InputStream newInputStream(String suffix, String ext) throws IOException {
        return newInputStream(DataSourceUtil.getFileName(baseName, suffix, ext));
    }

    @Override
    public InputStream newInputStream(String fileName) throws IOException {
        Objects.requireNonNull(fileName);
        Path tarFilePath = getArchiveFilePath();

        // If the file is in the archive, we can open it
        if (entryExists(tarFilePath, fileName)) {
            InputStream is = new TarEntryInputStream(tarFilePath, fileName, compressionFormat);
            return observer != null ? new ObservableInputStream(is, tarFilePath + ":" + fileName, observer) : is;
        }
        return null;
    }

    private static final class TarEntryInputStream extends ForwardingInputStream<InputStream> {

        private TarEntryInputStream(Path tarFilePath, String fileName, CompressionFormat compressionFormat) throws IOException {
            super(setStreamToFile(getTmpStream(tarFilePath, compressionFormat), fileName));
        }

        private static TarArchiveInputStream getTmpStream(Path tarFilePath, CompressionFormat compressionFormat) throws IOException {
            return new TarArchiveInputStream(getCompressedInputStream(new BufferedInputStream(Files.newInputStream(tarFilePath)), compressionFormat));
        }

        private static InputStream setStreamToFile(TarArchiveInputStream tais, String fileName) throws IOException {
            TarArchiveEntry entry;
            while ((entry = tais.getNextEntry()) != null) {
                if (entry.getName().equals(fileName)) {
                    return tais;
                }
            }
            return null;
        }
    }

    private static final class TarEntryOutputStream extends ForwardingOutputStream<OutputStream> {

        private final Path tarFilePath;
        private final String fileName;
        private final CompressionFormat compressionFormat;
        private boolean closed;

        private TarEntryOutputStream(Path tarFilePath, String fileName, CompressionFormat compressionFormat) throws IOException {
            super(getTmpStream(getTmpStreamFilePath(tarFilePath)));
            this.tarFilePath = tarFilePath;
            this.fileName = fileName;
            this.compressionFormat = compressionFormat;
            this.closed = false;
        }

        private static OutputStream getTmpStream(Path tarFilePath) throws IOException {
            return new BufferedOutputStream(Files.newOutputStream(tarFilePath));
        }

        private static Path getTmpStreamFilePath(Path tarFilePath) {
            return tarFilePath.getParent().resolve("tmp_stream_" + tarFilePath.getFileName() + ".stream");
        }

        private static TarArchiveOutputStream getTarStream(Path tmpTarFilePath) throws IOException {
            return new TarArchiveOutputStream(new BufferedOutputStream(Files.newOutputStream(tmpTarFilePath)));
        }

        private static Path getTmpTarFilePath(Path tarFilePath) {
            return tarFilePath.getParent().resolve("tmp_" + tarFilePath.getFileName());
        }

        private static Path getTmpCompressedTarFilePath(Path tarFilePath) {
            return tarFilePath.getParent().resolve("tmp_comp_" + tarFilePath.getFileName());
        }

        private void compressTarFile() throws IOException {
            try (InputStream fis = Files.newInputStream(getTmpTarFilePath(tarFilePath));
                 OutputStream fos = Files.newOutputStream(getTmpCompressedTarFilePath(tarFilePath), StandardOpenOption.CREATE);
                 OutputStream compressedOS = getCompressedOutputStream(fos, this.compressionFormat)) {
                byte[] buffer = new byte[8192];
                int len;
                while ((len = fis.read(buffer)) != -1) {
                    compressedOS.write(buffer, 0, len);
                }
            }
        }

        private static OutputStream getCompressedOutputStream(OutputStream os, CompressionFormat compressionFormat) throws IOException {
            return compressionFormat == null ? os : switch (compressionFormat) {
                case GZIP -> new GzipCompressorOutputStream(os);
                case BZIP2 -> new BZip2CompressorOutputStream(os);
                case XZ -> new XZCompressorOutputStream(os);
                case ZSTD -> new ZstdCompressorOutputStream(os);
                default -> os;
            };
        }

        @Override
        public void close() throws IOException {
            if (!closed) {

                // Close temporary stream file
                super.close();

                // Open a new temporary archive
                try (TarArchiveOutputStream taos = getTarStream(getTmpTarFilePath(tarFilePath))) {

                    // Useful parameter
                    taos.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
                    taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);

                    // Temporary stream file path
                    Path tmpStreamFilePath = getTmpStreamFilePath(tarFilePath);

                    // Copy content of temporary stream file into an entry of the temporary archive
                    try (InputStream is = Files.newInputStream(tmpStreamFilePath)) {

                        // New tar entry
                        TarArchiveEntry entry = new TarArchiveEntry(fileName);
                        entry.setSize(Files.size(tmpStreamFilePath));

                        // New file to add
                        taos.putArchiveEntry(entry);

                        // Write the data in the entry
                        ByteStreams.copy(is, taos);

                        // close new entry
                        taos.closeArchiveEntry();
                    }

                    // Copy existing entries into the temporary archive
                    if (Files.exists(tarFilePath)) {
                        try (InputStream fis = Files.newInputStream(tarFilePath);
                             BufferedInputStream bis = new BufferedInputStream(fis);
                             InputStream cis = getCompressedInputStream(bis, compressionFormat);
                             TarArchiveInputStream tarInput = new TarArchiveInputStream(cis)) {
                            TarArchiveEntry oldEntry;
                            while ((oldEntry = tarInput.getNextEntry()) != null) {
                                if (!oldEntry.getName().equals(fileName)) {
                                    taos.putArchiveEntry(oldEntry);
                                    byte[] buffer = new byte[8192];
                                    int len;
                                    while ((len = tarInput.read(buffer)) != -1) {
                                        taos.write(buffer, 0, len);
                                    }
                                    taos.closeArchiveEntry();
                                }
                            }
                        }
                    }

                    // Finishes the TAR archive without closing the underlying OutputStream
                    taos.finish();
                }

                // Compress the archive if needed
                compressTarFile();

                // swap with tmp tar
                Path tmpTarFilePath = getTmpCompressedTarFilePath(tarFilePath);
                Files.move(tmpTarFilePath, tarFilePath, StandardCopyOption.REPLACE_EXISTING);

                closed = true;
            }
        }
    }

    private static InputStream getCompressedInputStream(InputStream is, CompressionFormat compressionFormat) throws IOException {
        if (compressionFormat == null) {
            return is;
        }
        return switch (compressionFormat) {
            case GZIP -> new GzipCompressorInputStream(is);
            case BZIP2 -> new BZip2CompressorInputStream(is);
            case XZ -> new XZCompressorInputStream(is);
            case ZSTD -> new ZstdCompressorInputStream(is);
            default -> is;
        };
    }
}