FileListPipesIterator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.async.cli;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
import org.apache.tika.plugins.ExtensionConfig;

/**
 * PipesIterator that reads file paths from a text file (one path per line).
 * <p>
 * If a {@code basePath} is provided, lines are treated as relative paths
 * under that directory.  The fetch key uses the relative path so that the
 * file-system fetcher (whose basePath is the input directory) can resolve it.
 * <p>
 * Blank lines and lines starting with {@code #} are skipped.
 */
class FileListPipesIterator implements PipesIterator {

    private final Path fileListPath;
    private final Path basePath;

    FileListPipesIterator(Path fileListPath, Path basePath) {
        this.fileListPath = fileListPath;
        this.basePath = basePath;
    }

    @Override
    public Iterator<FetchEmitTuple> iterator() {
        BufferedReader reader;
        try {
            reader = Files.newBufferedReader(fileListPath);
        } catch (IOException e) {
            throw new RuntimeException("Failed to open file list: " + fileListPath, e);
        }

        AtomicInteger id = new AtomicInteger();
        return new Iterator<>() {
            private FetchEmitTuple next;
            private boolean done;

            @Override
            public boolean hasNext() {
                if (next != null) {
                    return true;
                }
                if (done) {
                    return false;
                }
                try {
                    String line;
                    while ((line = reader.readLine()) != null) {
                        line = line.trim();
                        if (!line.isEmpty() && !line.startsWith("#")) {
                            next = new FetchEmitTuple(
                                    String.valueOf(id.getAndIncrement()),
                                    new FetchKey(TikaConfigAsyncWriter.FETCHER_NAME, line),
                                    new EmitKey(TikaConfigAsyncWriter.EMITTER_NAME, line));
                            return true;
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException("Failed reading file list", e);
                }
                done = true;
                try {
                    reader.close();
                } catch (IOException e) {
                    // ignore
                }
                return false;
            }

            @Override
            public FetchEmitTuple next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                FetchEmitTuple t = next;
                next = null;
                return t;
            }
        };
    }

    @Override
    public Integer call() throws Exception {
        return (int) Files.lines(fileListPath)
                .map(String::trim)
                .filter(line -> !line.isEmpty() && !line.startsWith("#"))
                .count();
    }

    @Override
    public ExtensionConfig getExtensionConfig() {
        return null;
    }
}