AbstractComponentManager.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import org.pf4j.PluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.config.loader.TikaObjectMapperFactory;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.core.config.ConfigStore;
import org.apache.tika.pipes.core.config.InMemoryConfigStore;
import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.plugins.TikaExtension;
import org.apache.tika.plugins.TikaExtensionFactory;

/**
 * Abstract base class for managing Tika components (Fetchers, Emitters, etc.).
 * Provides lazy instantiation, early validation, and optional runtime modifications.
 *
 * @param <T> the component type (e.g., Fetcher, Emitter)
 * @param <F> the factory type for creating components
 */
public abstract class AbstractComponentManager<T extends TikaExtension,
                                                F extends TikaExtensionFactory<T>> {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractComponentManager.class);

    protected final PluginManager pluginManager;
    private final ConfigStore configStore;
    private final Map<String, T> componentCache = new ConcurrentHashMap<>();
    private final boolean allowRuntimeModifications;

    protected AbstractComponentManager(PluginManager pluginManager,
                                      Map<String, ExtensionConfig> componentConfigs,
                                      boolean allowRuntimeModifications) {
        this(pluginManager, componentConfigs, allowRuntimeModifications, new InMemoryConfigStore());
    }

    protected AbstractComponentManager(PluginManager pluginManager,
                                      Map<String, ExtensionConfig> componentConfigs,
                                      boolean allowRuntimeModifications,
                                      ConfigStore configStore) {
        this.pluginManager = pluginManager;
        this.configStore = configStore;
        try {
            configStore.init();
        } catch (Exception e) {
            throw new RuntimeException("Failed to initialize ConfigStore", e);
        }
        componentConfigs.forEach(configStore::put);
        this.allowRuntimeModifications = allowRuntimeModifications;
    }

    /**
     * Returns the JSON configuration key for this component type (e.g., "fetchers", "emitters").
     */
    protected abstract String getConfigKey();

    /**
     * Returns the config store used by this manager.
     * Useful for subclasses that need direct access to the store.
     */
    protected ConfigStore getConfigStore() {
        return configStore;
    }

    /**
     * Returns the factory class for this component type.
     */
    protected abstract Class<F> getFactoryClass();

    /**
     * Returns the component name for error messages (e.g., "fetcher", "emitter").
     */
    protected abstract String getComponentName();

    /**
     * Creates a not-found exception for this component type.
     */
    protected abstract TikaException createNotFoundException(String message);

    /**
     * Validates the configuration and collects component configs without instantiating.
     */
    protected Map<String, ExtensionConfig> validateAndCollectConfigs(
            PluginManager pluginManager, JsonNode configNode) throws TikaConfigException, IOException {

        Map<String, F> factories = getFactories(pluginManager);
        Map<String, ExtensionConfig> configs = new HashMap<>();

        if (configNode != null && !configNode.isNull()) {
            // Outer loop: iterate over instance IDs
            Iterator<Map.Entry<String, JsonNode>> instanceFields = configNode.fields();
            while (instanceFields.hasNext()) {
                Map.Entry<String, JsonNode> instanceEntry = instanceFields.next();
                String instanceId = instanceEntry.getKey();
                JsonNode typeNode = instanceEntry.getValue();

                // Check for duplicate IDs (should not happen due to JSON parsing, but validate)
                if (configs.containsKey(instanceId)) {
                    throw new TikaConfigException("Duplicate " + getComponentName() +
                            " id: " + instanceId);
                }

                // Inner loop: extract the type name
                // The structure should be: { "type-name": { config } }
                // We expect exactly ONE type per instance
                Iterator<Map.Entry<String, JsonNode>> typeFields = typeNode.fields();

                if (!typeFields.hasNext()) {
                    throw new TikaConfigException(
                            "Invalid " + getComponentName() + " configuration for id '" + instanceId +
                            "': missing type specification. Expected format: {\"" + instanceId +
                            "\": {\"type-name\": {...}}}");
                }

                Map.Entry<String, JsonNode> typeEntry = typeFields.next();
                String typeName = typeEntry.getKey();
                JsonNode config = typeEntry.getValue();

                // Validate that there's only one type per instance
                if (typeFields.hasNext()) {
                    Map.Entry<String, JsonNode> extraTypeEntry = typeFields.next();
                    throw new TikaConfigException(
                            "Invalid " + getComponentName() + " configuration for id '" + instanceId +
                            "': multiple types specified ('" + typeName + "', '" +
                            extraTypeEntry.getKey() + "'). Each instance can only have one type.");
                }

                // Validate that factory exists for this type
                F factory = factories.get(typeName);
                if (factory == null) {
                    throw new TikaConfigException(
                            "Unknown " + getComponentName() + " type: '" + typeName +
                            "' for instance id '" + instanceId + "'. Available types: " +
                            factories.keySet());
                }

                configs.put(instanceId, new ExtensionConfig(instanceId, typeName,
                        toJsonString(config)));
            }
        }

        return configs;
    }

    protected Map<String, F> getFactories(PluginManager pluginManager) throws TikaConfigException {
        if (pluginManager.getStartedPlugins().isEmpty()) {
            pluginManager.loadPlugins();
            pluginManager.startPlugins();
        }

        Map<String, F> factories = new HashMap<>();
        for (F factory : pluginManager.getExtensions(getFactoryClass())) {
            String name = factory.getName();
            ClassLoader cl = factory.getClass().getClassLoader();
            boolean isFromPlugin = cl instanceof org.pf4j.PluginClassLoader;

            F existing = factories.get(name);
            if (existing != null) {
                boolean existingIsFromPlugin = existing.getClass().getClassLoader()
                        instanceof org.pf4j.PluginClassLoader;
                if (isFromPlugin && !existingIsFromPlugin) {
                    // Replace classpath version with plugin version
                    factories.put(name, factory);
                }
                // Otherwise skip duplicate (keep existing)
                continue;
            }
            factories.put(name, factory);
        }
        return factories;
    }

    private static String toJsonString(final JsonNode node) throws TikaConfigException {
        try {
            return TikaObjectMapperFactory.getMapper().writeValueAsString(node);
        } catch (JsonProcessingException e) {
            throw new TikaConfigException("Failed to serialize config to JSON string", e);
        }
    }

    /**
     * Gets a component by ID, lazily instantiating it if needed.
     */
    public T getComponent(String id) throws IOException, TikaException {
        // Check cache first (fast path, no synchronization)
        T component = componentCache.get(id);
        if (component != null) {
            return component;
        }

        // Check if config exists
        ExtensionConfig config = configStore.get(id);
        if (config == null) {
            throw createNotFoundException(
                    "Can't find " + getComponentName() + " for id=" + id +
                    ". Available: " + configStore.keySet());
        }

        // Synchronized block to ensure only one thread builds the component
        synchronized (this) {
            // Double-check in case another thread built it while we were waiting
            component = componentCache.get(id);
            if (component != null) {
                return component;
            }

            // Build the component
            try {
                component = buildComponent(config);
                componentCache.put(id, component);
                LOG.debug("Lazily instantiated {}: {}", getComponentName(), id);
                return component;
            } catch (TikaConfigException e) {
                throw new IOException("Failed to build " + getComponentName() + ": " + id, e);
            }
        }
    }

    /**
     * Builds a component instance from its configuration.
     */
    private T buildComponent(ExtensionConfig config) throws TikaConfigException, IOException {
        Map<String, F> factories = getFactories(pluginManager);
        F factory = factories.get(config.name());

        if (factory == null) {
            // This shouldn't happen since we validated in load(), but check anyway
            throw new TikaConfigException(
                    "Unknown " + getComponentName() + " type: " + config.name() +
                    ". Available: " + factories.keySet());
        }

        return factory.buildExtension(config);
    }

    /**
     * Dynamically adds or updates a component configuration at runtime.
     * The component will not be instantiated until it is first requested via {@link #getComponent(String)}.
     * If a component with the same ID already exists, it will be replaced and the cached instance cleared.
     * <p>
     * This method is only available if the manager was loaded with allowRuntimeModifications=true.
     * <p>
     * Only authorized/authenticated users should be allowed to modify components. BE CAREFUL.
     *
     * @param config the extension configuration for the component
     * @throws TikaConfigException if the component type is unknown or if runtime modifications are not allowed
     * @throws IOException if there is an error accessing the plugin manager
     */
    public synchronized void saveComponent(ExtensionConfig config) throws TikaConfigException, IOException {
        if (!allowRuntimeModifications) {
            throw new TikaConfigException(
                    "Runtime modifications are not allowed. " + getClass().getSimpleName() +
                    " must be loaded with allowRuntimeModifications=true to use save" +
                    getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
        }

        if (config == null) {
            throw new IllegalArgumentException("ExtensionConfig cannot be null");
        }

        String componentId = config.id();
        String typeName = config.name();

        // Validate that factory exists for this type
        Map<String, F> factories = getFactories(pluginManager);
        if (!factories.containsKey(typeName)) {
            throw new TikaConfigException(
                    "Unknown " + getComponentName() + " type: " + typeName +
                    ". Available: " + factories.keySet());
        }

        // If updating existing component, clear the cache so it gets re-instantiated
        if (configStore.containsKey(componentId)) {
            componentCache.remove(componentId);
            LOG.debug("Updating existing {} config: id={}, type={}", getComponentName(), componentId, typeName);
        } else {
            LOG.debug("Creating new {} config: id={}, type={}", getComponentName(), componentId, typeName);
        }

        // Store config without instantiating
        configStore.put(componentId, config);
    }

    /**
     * Deletes a component configuration by ID.
     * Clears the cached instance and removes the configuration.
     *
     * @param componentId the component ID to delete
     * @throws TikaConfigException if runtime modifications are not allowed or component not found
     */
    public synchronized void deleteComponent(String componentId) throws TikaConfigException {
        if (!allowRuntimeModifications) {
            throw new TikaConfigException(
                    "Runtime modifications are not allowed. " + getClass().getSimpleName() +
                    " must be loaded with allowRuntimeModifications=true to use delete" +
                    getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + getComponentName().substring(1) + "()");
        }

        if (componentId == null) {
            throw new IllegalArgumentException("Component ID cannot be null");
        }

        if (!configStore.containsKey(componentId)) {
            throw new TikaConfigException(
                    getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) + 
                    getComponentName().substring(1) + " with ID '" + componentId + "' not found");
        }

        // Clear cache and remove config
        componentCache.remove(componentId);
        configStore.remove(componentId);
        LOG.debug("Deleted {} config: id={}", getComponentName(), componentId);
    }

    /**
     * Gets the configuration for a specific component by ID.
     *
     * @param componentId the component ID
     * @return the component configuration, or null if not found
     */
    public ExtensionConfig getComponentConfig(String componentId) {
        return configStore.get(componentId);
    }

    /**
     * Returns the set of supported component IDs.
     */
    public Set<String> getSupported() {
        return configStore.keySet();
    }

    /**
     * Convenience method that returns a component if only one component
     * is configured. If 0 or > 1 components are configured, this throws an IllegalArgumentException.
     *
     * @return the single configured component
     */
    public T getComponent() throws IOException, TikaException {
        if (configStore.size() != 1) {
            throw new IllegalArgumentException(
                    "No-arg get" + getComponentName().substring(0, 1).toUpperCase(Locale.ROOT) +
                    getComponentName().substring(1) + "() requires exactly 1 configured " +
                    getComponentName() + ". Found: " + configStore.size() +
                    " (" + configStore.keySet() + ")");
        }
        // Get the single component id and use getComponent(id) for lazy loading
        String componentId = configStore.keySet().iterator().next();
        return getComponent(componentId);
    }
}