PipesWorker.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.core.server;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.extractor.EmbeddedDocumentExtractorFactory;
import org.apache.tika.extractor.UnpackHandler;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.writefilter.MetadataWriteLimiterFactory;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.emitter.Emitter;
import org.apache.tika.pipes.api.emitter.StreamEmitter;
import org.apache.tika.pipes.core.PipesResults;
import org.apache.tika.pipes.core.emitter.EmitterManager;
import org.apache.tika.pipes.core.extractor.EmittingUnpackHandler;
import org.apache.tika.pipes.core.extractor.FrictionlessUnpackHandler;
import org.apache.tika.pipes.core.extractor.TempFileUnpackHandler;
import org.apache.tika.pipes.core.extractor.UnpackConfig;
import org.apache.tika.pipes.core.extractor.UnpackExtractorFactory;
import org.apache.tika.pipes.core.extractor.frictionless.DataPackage;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.StringUtils;
class PipesWorker implements Callable<PipesResult> {
private static final Logger LOG = LoggerFactory.getLogger(PipesWorker.class);
private final FetchEmitTuple fetchEmitTuple;
private final ParseContext parseContext;
private final AutoDetectParser autoDetectParser;
private final EmitterManager emitterManager;
private final FetchHandler fetchHandler;
private final ParseHandler parseHandler;
private final EmitHandler emitHandler;
private final MetadataWriteLimiterFactory defaultMetadataWriteLimiterFactory;
public PipesWorker(FetchEmitTuple fetchEmitTuple, ParseContext parseContext, AutoDetectParser autoDetectParser,
EmitterManager emitterManager, FetchHandler fetchHandler, ParseHandler parseHandler,
EmitHandler emitHandler, MetadataWriteLimiterFactory defaultMetadataWriteLimiterFactory) {
this.fetchEmitTuple = fetchEmitTuple;
this.parseContext = parseContext;
this.autoDetectParser = autoDetectParser;
this.emitterManager = emitterManager;
this.fetchHandler = fetchHandler;
this.parseHandler = parseHandler;
this.emitHandler = emitHandler;
this.defaultMetadataWriteLimiterFactory = defaultMetadataWriteLimiterFactory;
}
@Override
public PipesResult call() throws Exception {
MetadataListAndEmbeddedBytes parseData = null;
TempFileUnpackHandler tempHandler = null;
FrictionlessUnpackHandler frictionlessHandler = null;
try {
//this can be null if there is a fetch exception
ParseDataOrPipesResult parseDataResult = parseFromTuple();
if (parseDataResult.pipesResult != null) {
return parseDataResult.pipesResult;
}
parseData = parseDataResult.parseDataResult;
if (parseData == null || metadataIsEmpty(parseData.getMetadataList())) {
return PipesResults.EMPTY_OUTPUT;
}
// Check if we need to zip and emit embedded files
UnpackHandler handler = parseContext.get(UnpackHandler.class);
if (handler instanceof FrictionlessUnpackHandler) {
frictionlessHandler = (FrictionlessUnpackHandler) handler;
PipesResult frictionlessResult = emitFrictionlessOutput(frictionlessHandler, parseData);
if (frictionlessResult != null) {
// Frictionless emit failed - return the error
return frictionlessResult;
}
} else if (handler instanceof TempFileUnpackHandler) {
tempHandler = (TempFileUnpackHandler) handler;
PipesResult zipResult = zipAndEmitEmbeddedFiles(tempHandler);
if (zipResult != null) {
// Zipping/emitting failed - return the error
return zipResult;
}
}
return emitHandler.emitParseData(fetchEmitTuple, parseData, parseContext);
} finally {
// Clean up handlers if used
if (frictionlessHandler != null) {
try {
frictionlessHandler.close();
} catch (IOException e) {
LOG.warn("problem closing frictionless handler", e);
}
} else if (tempHandler != null) {
try {
tempHandler.close();
} catch (IOException e) {
LOG.warn("problem closing temp file handler", e);
}
} else if (parseData != null && parseData.hasUnpackHandler() &&
parseData.getUnpackHandler() instanceof Closeable) {
try {
((Closeable) parseData.getUnpackHandler()).close();
} catch (IOException e) {
LOG.warn("problem closing unpack handler", e);
}
}
}
}
static boolean metadataIsEmpty(List<Metadata> metadataList) {
return metadataList == null || metadataList.isEmpty();
}
/**
* Zips all embedded files from the temp handler and emits the zip to the user's emitter.
*
* @param tempHandler the handler containing embedded files in temp directory
* @return PipesResult if there was an error, null if successful
*/
private PipesResult zipAndEmitEmbeddedFiles(TempFileUnpackHandler tempHandler) {
// Skip if no embedded files
if (!tempHandler.hasEmbeddedFiles()) {
LOG.debug("No embedded files to zip");
return null;
}
UnpackConfig unpackConfig = parseContext.get(UnpackConfig.class);
String emitterName = unpackConfig.getEmitter();
Emitter emitter;
try {
emitter = emitterManager.getEmitter(emitterName);
} catch (Exception e) {
LOG.warn("Failed to get emitter for zip: {}", emitterName, e);
return new PipesResult(PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND,
"Emitter not found for zipping: " + emitterName);
}
if (!(emitter instanceof StreamEmitter)) {
LOG.warn("Emitter {} is not a StreamEmitter, cannot emit zip", emitterName);
return new PipesResult(PipesResult.RESULT_STATUS.EMIT_EXCEPTION,
"Emitter must be a StreamEmitter to emit zipped embedded files. Found: " +
emitter.getClass().getName());
}
StreamEmitter streamEmitter = (StreamEmitter) emitter;
EmitKey containerEmitKey = fetchEmitTuple.getEmitKey();
// Create zip file in temp directory
Path zipFile = tempHandler.getTempDirectory().resolve("embedded-files.zip");
try {
createZipFile(zipFile, tempHandler, unpackConfig);
} catch (IOException e) {
LOG.warn("Failed to create zip file", e);
return new PipesResult(PipesResult.RESULT_STATUS.EMIT_EXCEPTION,
"Failed to create zip file: " + ExceptionUtils.getStackTrace(e));
}
// Emit the zip file
String zipEmitKey = containerEmitKey.getEmitKey() + "-embedded.zip";
try (InputStream zipStream = Files.newInputStream(zipFile)) {
streamEmitter.emit(zipEmitKey, zipStream, new Metadata(), parseContext);
} catch (IOException e) {
LOG.warn("Failed to emit zip file", e);
return new PipesResult(PipesResult.RESULT_STATUS.EMIT_EXCEPTION,
"Failed to emit zip file: " + ExceptionUtils.getStackTrace(e));
}
LOG.debug("Successfully zipped and emitted {} embedded files to {}",
tempHandler.getEmbeddedFiles().size(), zipEmitKey);
return null;
}
/**
* Emits Frictionless Data Package output (zipped or directory mode).
*
* @param frictionlessHandler the handler containing embedded files with SHA256 hashes
* @param parseData the parsed metadata list for optional inclusion
* @return PipesResult if there was an error, null if successful
*/
private PipesResult emitFrictionlessOutput(FrictionlessUnpackHandler frictionlessHandler,
MetadataListAndEmbeddedBytes parseData) {
UnpackConfig unpackConfig = frictionlessHandler.getUnpackConfig();
// Skip if no embedded files (unless includeOriginal is set)
if (!frictionlessHandler.hasEmbeddedFiles() && !unpackConfig.isIncludeOriginal()) {
LOG.debug("No embedded files for Frictionless output");
return null;
}
String emitterName = unpackConfig.getEmitter();
Emitter emitter;
try {
emitter = emitterManager.getEmitter(emitterName);
} catch (Exception e) {
LOG.warn("Failed to get emitter for Frictionless output: {}", emitterName, e);
return new PipesResult(PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND,
"Emitter not found for Frictionless output: " + emitterName);
}
if (!(emitter instanceof StreamEmitter)) {
LOG.warn("Emitter {} is not a StreamEmitter, cannot emit Frictionless output", emitterName);
return new PipesResult(PipesResult.RESULT_STATUS.EMIT_EXCEPTION,
"Emitter must be a StreamEmitter for Frictionless output. Found: " +
emitter.getClass().getName());
}
StreamEmitter streamEmitter = (StreamEmitter) emitter;
EmitKey containerEmitKey = fetchEmitTuple.getEmitKey();
// Get container name from fetch key
String fetchKey = fetchEmitTuple.getFetchKey().getFetchKey();
String containerName = fetchKey;
int lastSlash = Math.max(fetchKey.lastIndexOf('/'), fetchKey.lastIndexOf('\\'));
if (lastSlash >= 0 && lastSlash < fetchKey.length() - 1) {
containerName = fetchKey.substring(lastSlash + 1);
}
if (unpackConfig.getOutputMode() == UnpackConfig.OUTPUT_MODE.ZIPPED) {
return emitFrictionlessZipped(frictionlessHandler, streamEmitter, containerEmitKey,
containerName, parseData, unpackConfig);
} else {
return emitFrictionlessDirectory(frictionlessHandler, streamEmitter, containerEmitKey,
containerName, parseData, unpackConfig);
}
}
/**
* Emits Frictionless Data Package as a single zip file.
*/
private PipesResult emitFrictionlessZipped(FrictionlessUnpackHandler frictionlessHandler,
StreamEmitter streamEmitter,
EmitKey containerEmitKey,
String containerName,
MetadataListAndEmbeddedBytes parseData,
UnpackConfig unpackConfig) {
// Build the data package manifest
DataPackage dataPackage = frictionlessHandler.buildDataPackage(containerName);
// Create zip file in temp directory
Path zipFile = frictionlessHandler.getTempDirectory().resolve("frictionless-package.zip");
try {
createFrictionlessZipFile(zipFile, frictionlessHandler, dataPackage, parseData, unpackConfig);
} catch (IOException e) {
LOG.warn("Failed to create Frictionless zip file", e);
return new PipesResult(PipesResult.RESULT_STATUS.EMIT_EXCEPTION,
"Failed to create Frictionless zip file: " + ExceptionUtils.getStackTrace(e));
}
// Emit the zip file
String zipEmitKey = containerEmitKey.getEmitKey() + "-frictionless.zip";
try (InputStream zipStream = Files.newInputStream(zipFile)) {
streamEmitter.emit(zipEmitKey, zipStream, new Metadata(), parseContext);
} catch (IOException e) {
LOG.warn("Failed to emit Frictionless zip file", e);
return new PipesResult(PipesResult.RESULT_STATUS.EMIT_EXCEPTION,
"Failed to emit Frictionless zip file: " + ExceptionUtils.getStackTrace(e));
}
LOG.debug("Successfully emitted Frictionless package with {} resources to {}",
dataPackage.resourceCount(), zipEmitKey);
return null;
}
/**
* Emits Frictionless Data Package files directly to the emitter (directory mode).
*/
private PipesResult emitFrictionlessDirectory(FrictionlessUnpackHandler frictionlessHandler,
StreamEmitter streamEmitter,
EmitKey containerEmitKey,
String containerName,
MetadataListAndEmbeddedBytes parseData,
UnpackConfig unpackConfig) {
String baseEmitKey = containerEmitKey.getEmitKey();
// Build the data package manifest
DataPackage dataPackage = frictionlessHandler.buildDataPackage(containerName);
try {
// Emit original document if included
if (unpackConfig.isIncludeOriginal() && frictionlessHandler.hasOriginalDocument()) {
String originalEmitKey = baseEmitKey + "/" + frictionlessHandler.getOriginalDocumentName();
try (InputStream is = Files.newInputStream(frictionlessHandler.getOriginalDocumentPath())) {
streamEmitter.emit(originalEmitKey, is, new Metadata(), parseContext);
}
}
// Emit each embedded file under unpacked/
for (FrictionlessUnpackHandler.FrictionlessFileInfo fileInfo : frictionlessHandler.getEmbeddedFiles()) {
String fileEmitKey = baseEmitKey + "/unpacked/" + fileInfo.fileName();
try (InputStream is = Files.newInputStream(fileInfo.filePath())) {
streamEmitter.emit(fileEmitKey, is, fileInfo.metadata(), parseContext);
}
}
// Emit datapackage.json
String dpEmitKey = baseEmitKey + "/datapackage.json";
byte[] dpBytes = dataPackage.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8);
try (InputStream dpStream = new java.io.ByteArrayInputStream(dpBytes)) {
streamEmitter.emit(dpEmitKey, dpStream, new Metadata(), parseContext);
}
// Emit metadata.json if requested
if (unpackConfig.isIncludeFullMetadata() && parseData != null &&
parseData.getMetadataList() != null) {
String metadataEmitKey = baseEmitKey + "/metadata.json";
byte[] metadataBytes = writeMetadataListAsJson(parseData.getMetadataList());
try (InputStream metadataStream = new java.io.ByteArrayInputStream(metadataBytes)) {
streamEmitter.emit(metadataEmitKey, metadataStream, new Metadata(), parseContext);
}
}
} catch (IOException e) {
LOG.warn("Failed to emit Frictionless directory output", e);
return new PipesResult(PipesResult.RESULT_STATUS.EMIT_EXCEPTION,
"Failed to emit Frictionless directory output: " + ExceptionUtils.getStackTrace(e));
}
LOG.debug("Successfully emitted Frictionless package with {} resources (directory mode) to {}",
dataPackage.resourceCount(), baseEmitKey);
return null;
}
/**
* Creates a Frictionless Data Package zip file.
*/
private void createFrictionlessZipFile(Path zipFile, FrictionlessUnpackHandler frictionlessHandler,
DataPackage dataPackage, MetadataListAndEmbeddedBytes parseData,
UnpackConfig unpackConfig) throws IOException {
try (ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(zipFile))) {
// Add datapackage.json at root
ZipEntry dpEntry = new ZipEntry("datapackage.json");
zos.putNextEntry(dpEntry);
dataPackage.writeTo(zos);
zos.closeEntry();
// Add metadata.json if requested
if (unpackConfig.isIncludeFullMetadata() && parseData != null &&
parseData.getMetadataList() != null) {
ZipEntry metadataEntry = new ZipEntry("metadata.json");
zos.putNextEntry(metadataEntry);
writeMetadataListAsJson(zos, parseData.getMetadataList());
zos.closeEntry();
}
// Add original document if included (at root level)
if (unpackConfig.isIncludeOriginal() && frictionlessHandler.hasOriginalDocument()) {
ZipEntry originalEntry = new ZipEntry(frictionlessHandler.getOriginalDocumentName());
zos.putNextEntry(originalEntry);
Files.copy(frictionlessHandler.getOriginalDocumentPath(), zos);
zos.closeEntry();
}
// Add all embedded files under unpacked/
for (FrictionlessUnpackHandler.FrictionlessFileInfo fileInfo : frictionlessHandler.getEmbeddedFiles()) {
ZipEntry fileEntry = new ZipEntry("unpacked/" + fileInfo.fileName());
zos.putNextEntry(fileEntry);
Files.copy(fileInfo.filePath(), zos);
zos.closeEntry();
}
}
}
/**
* Writes a list of metadata objects as JSON array to output stream.
*/
private void writeMetadataListAsJson(OutputStream os, List<Metadata> metadataList) throws IOException {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(com.fasterxml.jackson.core.JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
mapper.enable(com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT);
List<java.util.Map<String, Object>> metadataMapList = new java.util.ArrayList<>();
for (Metadata metadata : metadataList) {
java.util.Map<String, Object> metadataMap = new java.util.LinkedHashMap<>();
for (String name : metadata.names()) {
String[] values = metadata.getValues(name);
if (values.length == 1) {
metadataMap.put(name, values[0]);
} else {
metadataMap.put(name, values);
}
}
metadataMapList.add(metadataMap);
}
mapper.writeValue(os, metadataMapList);
}
/**
* Writes a list of metadata objects as JSON array to byte array.
*/
private byte[] writeMetadataListAsJson(List<Metadata> metadataList) throws IOException {
java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
writeMetadataListAsJson(baos, metadataList);
return baos.toByteArray();
}
/**
* Creates a zip file containing all embedded files.
*/
private void createZipFile(Path zipFile, TempFileUnpackHandler tempHandler,
UnpackConfig unpackConfig) throws IOException {
try (ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(zipFile))) {
// Include original document if requested
if (unpackConfig.isIncludeOriginal() && tempHandler.hasOriginalDocument()) {
ZipEntry originalEntry = new ZipEntry(tempHandler.getOriginalDocumentName());
zos.putNextEntry(originalEntry);
Files.copy(tempHandler.getOriginalDocumentPath(), zos);
zos.closeEntry();
}
for (TempFileUnpackHandler.EmbeddedFileInfo fileInfo : tempHandler.getEmbeddedFiles()) {
// Add the embedded file
ZipEntry fileEntry = new ZipEntry(fileInfo.fileName());
zos.putNextEntry(fileEntry);
Files.copy(fileInfo.filePath(), zos);
zos.closeEntry();
// Add metadata JSON if requested
if (unpackConfig.isIncludeMetadataInZip()) {
String metadataFileName = fileInfo.fileName() + ".metadata.json";
ZipEntry metadataEntry = new ZipEntry(metadataFileName);
zos.putNextEntry(metadataEntry);
writeMetadataAsJson(zos, fileInfo.metadata());
zos.closeEntry();
}
}
}
}
/**
* Writes metadata as JSON to the output stream.
* Note: Does not close the output stream.
*/
private void writeMetadataAsJson(OutputStream os, Metadata metadata) throws IOException {
ObjectMapper mapper = new ObjectMapper();
// Disable auto-close so we don't close the zip output stream
mapper.configure(com.fasterxml.jackson.core.JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
// Convert metadata to a map for JSON serialization
java.util.Map<String, Object> metadataMap = new java.util.LinkedHashMap<>();
for (String name : metadata.names()) {
String[] values = metadata.getValues(name);
if (values.length == 1) {
metadataMap.put(name, values[0]);
} else {
metadataMap.put(name, values);
}
}
mapper.writeValue(os, metadataMap);
}
/**
* Stores the original document to the temp handler for inclusion in the zip.
* Uses TikaInputStream's internal file caching to avoid consuming the stream.
*/
private void storeOriginalDocument(TikaInputStream tis, TempFileUnpackHandler tempHandler)
throws IOException {
String fileName = getFileNameFromFetchKey();
// TikaInputStream caches to a temp file internally - get that file
Path originalPath = tis.getPath();
if (originalPath != null && Files.exists(originalPath)) {
// Copy from the cached file
try (InputStream is = Files.newInputStream(originalPath)) {
tempHandler.storeOriginalDocument(is, fileName);
}
} else {
// Stream hasn't been cached yet - we need to read and reset
tis.mark(Integer.MAX_VALUE);
try {
tempHandler.storeOriginalDocument(tis, fileName);
} finally {
tis.reset();
}
}
}
/**
* Stores the original document to the frictionless handler for inclusion in output.
* Uses TikaInputStream's internal file caching to avoid consuming the stream.
*/
private void storeOriginalDocumentForFrictionless(TikaInputStream tis,
FrictionlessUnpackHandler frictionlessHandler)
throws IOException {
String fileName = getFileNameFromFetchKey();
// TikaInputStream caches to a temp file internally - get that file
Path originalPath = tis.getPath();
if (originalPath != null && Files.exists(originalPath)) {
// Copy from the cached file
try (InputStream is = Files.newInputStream(originalPath)) {
frictionlessHandler.storeOriginalDocument(is, fileName);
}
} else {
// Stream hasn't been cached yet - we need to read and reset
tis.mark(Integer.MAX_VALUE);
try {
frictionlessHandler.storeOriginalDocument(tis, fileName);
} finally {
tis.reset();
}
}
}
/**
* Extracts the file name from the fetch key.
*/
private String getFileNameFromFetchKey() {
String fetchKey = fetchEmitTuple.getFetchKey().getFetchKey();
String fileName = fetchKey;
int lastSlash = Math.max(fetchKey.lastIndexOf('/'), fetchKey.lastIndexOf('\\'));
if (lastSlash >= 0 && lastSlash < fetchKey.length() - 1) {
fileName = fetchKey.substring(lastSlash + 1);
}
return fileName;
}
protected ParseDataOrPipesResult parseFromTuple() throws TikaException, InterruptedException {
//start a new metadata object to gather info from the fetch process
//we want to isolate and not touch the metadata sent into the fetchEmitTuple
//so that we can inject it after the filter at the very end
ParseContext localContext = null;
try {
localContext = setupParseContext();
} catch (IOException e) {
LOG.warn("fetcher initialization exception id={}", fetchEmitTuple.getId(), e);
return new ParseDataOrPipesResult(null,
new PipesResult(PipesResult.RESULT_STATUS.FETCHER_INITIALIZATION_EXCEPTION, ExceptionUtils.getStackTrace(e)));
}
// Use newMetadata() to apply any configured write limits
Metadata metadata = localContext.newMetadata();
FetchHandler.TisOrResult tisOrResult = fetchHandler.fetch(fetchEmitTuple, metadata, localContext);
if (tisOrResult.pipesResult() != null) {
return new ParseDataOrPipesResult(null, tisOrResult.pipesResult());
}
try (TikaInputStream tis = tisOrResult.tis()) {
// Store original document for zipping/frictionless if requested
UnpackHandler handler = localContext.get(UnpackHandler.class);
UnpackConfig uc = localContext.get(UnpackConfig.class);
if (uc != null && uc.isIncludeOriginal()) {
if (handler instanceof FrictionlessUnpackHandler frictionlessHandler) {
storeOriginalDocumentForFrictionless(tis, frictionlessHandler);
} else if (handler instanceof TempFileUnpackHandler tempHandler) {
storeOriginalDocument(tis, tempHandler);
}
}
return parseHandler.parseWithStream(fetchEmitTuple, tis, metadata, localContext);
} catch (SecurityException e) {
LOG.error("security exception id={}", fetchEmitTuple.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception id={}", fetchEmitTuple.getId(), e);
return new ParseDataOrPipesResult(null,
new PipesResult(PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH, ExceptionUtils.getStackTrace(e)));
}
}
private ParseContext setupParseContext() throws TikaException, IOException {
// ContentHandlerFactory and ParseMode are retrieved from ParseContext in ParseHandler.
// They are set in ParseContext from PipesConfig loaded via TikaLoader at startup.
// If the parseContext from the FetchEmitTuple doesn't have a MetadataWriteLimiterFactory,
// use the default one loaded from config in PipesServer
MetadataWriteLimiterFactory existingFactory = parseContext.get(MetadataWriteLimiterFactory.class);
if (existingFactory == null && defaultMetadataWriteLimiterFactory != null) {
parseContext.set(MetadataWriteLimiterFactory.class, defaultMetadataWriteLimiterFactory);
}
ParseMode parseMode = parseContext.get(ParseMode.class);
UnpackConfig unpackConfig = parseContext.get(UnpackConfig.class);
// For UNPACK mode, automatically set up byte extraction
if (parseMode == ParseMode.UNPACK) {
if (unpackConfig == null) {
unpackConfig = new UnpackConfig();
parseContext.set(UnpackConfig.class, unpackConfig);
}
// Determine emitter: prefer UnpackConfig, fall back to FetchEmitTuple
String emitterName = unpackConfig.getEmitter();
if (StringUtils.isBlank(emitterName)) {
emitterName = fetchEmitTuple.getEmitKey().getEmitterId();
if (StringUtils.isBlank(emitterName)) {
throw new TikaConfigException(
"UNPACK parse mode requires an emitter. Set emitter in UnpackConfig " +
"or specify an emitterId in FetchEmitTuple.emitKey.");
}
unpackConfig.setEmitter(emitterName);
}
// Set up the extractor factory - the extractor will be created during parsing
// with the correct context (after RecursiveParserWrapper sets up EmbeddedParserDecorator)
parseContext.set(EmbeddedDocumentExtractorFactory.class, new UnpackExtractorFactory());
// Set up the bytes handler based on output format and mode
if (unpackConfig.getOutputFormat() == UnpackConfig.OUTPUT_FORMAT.FRICTIONLESS) {
// Frictionless Data Package format - always uses FrictionlessUnpackHandler
// which computes SHA256 hashes and stores files for datapackage.json generation
parseContext.set(UnpackHandler.class,
new FrictionlessUnpackHandler(fetchEmitTuple.getEmitKey(), unpackConfig));
} else if (unpackConfig.isZipEmbeddedFiles()) {
// Regular format with zipping - use TempFileUnpackHandler
parseContext.set(UnpackHandler.class,
new TempFileUnpackHandler(fetchEmitTuple.getEmitKey(), unpackConfig));
} else {
// Regular format, direct emission - use EmittingUnpackHandler
parseContext.set(UnpackHandler.class,
new EmittingUnpackHandler(fetchEmitTuple, emitterManager, parseContext));
}
return parseContext;
}
// For non-UNPACK modes, no byte extraction setup needed
// UnpackConfig may be present from config file but is only used when ParseMode.UNPACK is set
return parseContext;
}
//parse data result or a terminal pipesresult
record ParseDataOrPipesResult(MetadataListAndEmbeddedBytes parseDataResult, PipesResult pipesResult) {
}
}