FileStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.cxf.systest.jaxrs;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import jakarta.activation.DataHandler;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.ResponseBuilder;
import jakarta.ws.rs.core.Response.Status;
import jakarta.ws.rs.core.StreamingOutput;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriInfo;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.jaxrs.ext.multipart.Attachment;
import org.apache.cxf.jaxrs.ext.multipart.MultipartBody;

@Path("/file-store")
public class FileStore {
    private final ConcurrentMap<String, byte[]> store = new ConcurrentHashMap<>();
    @Context private HttpHeaders headers;

    @POST
    @Path("/stream")
    @Consumes("*/*")
    public Response addFile(@QueryParam("chunked") boolean chunked, InputStream in) throws IOException {
        String transferEncoding = headers.getHeaderString("Transfer-Encoding");

        if (chunked != Objects.equals("chunked", transferEncoding)) {
            throw new WebApplicationException(Status.EXPECTATION_FAILED);
        }

        try (in) {
            if (chunked) {
                return Response.ok(new StreamingOutput() {
                    @Override
                    public void write(OutputStream out) throws IOException, WebApplicationException {
                        in.transferTo(out);
                    }
                }).build();
            } else {
                // Make sure we have small amount of data for chunking to not kick in
                final byte[] content = in.readAllBytes(); 
                return Response.ok(Arrays.copyOf(content, content.length / 10)).build();
            }
        }
    }

    @POST
    @Consumes("multipart/form-data")
    public void addFile(@QueryParam("chunked") boolean chunked, 
            @Suspended final AsyncResponse response, @Context final UriInfo uri, final MultipartBody body)  {

        String transferEncoding = headers.getHeaderString("Transfer-Encoding");
        if (chunked != Objects.equals("chunked", transferEncoding)) {
            response.resume(Response.status(Status.EXPECTATION_FAILED).build());
            return;
        }

        for (final Attachment attachment: body.getAllAttachments()) {
            final DataHandler handler = attachment.getDataHandler();

            if (handler != null) {
                final String source = handler.getName();
                if (StringUtils.isEmpty(source)) {
                    response.resume(Response.status(Status.BAD_REQUEST).build());
                    return;
                }

                try {
                    if (store.containsKey(source)) {
                        response.resume(Response.status(Status.CONFLICT).build());
                        return;
                    }

                    final byte[] content = IOUtils.readBytesFromStream(handler.getInputStream());
                    if (store.putIfAbsent(source, content) != null) {
                        response.resume(Response.status(Status.CONFLICT).build());
                        return;
                    }
                    
                    if (response.isSuspended()) {
                        final StreamingOutput stream = new StreamingOutput() {
                            @Override
                            public void write(OutputStream os) throws IOException, WebApplicationException {
                                if (chunked) {
                                    // Make sure we have enough data for chunking to kick in
                                    for (int i = 0; i < 10; ++i) {
                                        os.write(content);
                                    }
                                } else {
                                    os.write(content);
                                }
                            }
                        };
                        response.resume(Response.created(uri.getRequestUriBuilder()
                                .path(source).build()).entity(stream)
                                .build());
                    }

                } catch (final Exception ex) {
                    response.resume(Response.serverError().build());
                }

                if (response.isSuspended()) {
                    response.resume(Response.created(uri.getRequestUriBuilder().path(source).build())
                            .build());
                }
            }
        }

        if (response.isSuspended()) {
            response.resume(Response.status(Status.BAD_REQUEST).build());
        }
    }

    @GET
    @Consumes("multipart/form-data")
    public void getFile(@QueryParam("chunked") boolean chunked, @QueryParam("filename") String source,
            @Suspended final AsyncResponse response) {

        if (StringUtils.isEmpty(source)) {
            response.resume(Response.status(Status.BAD_REQUEST).build());
            return;
        }

        try {
            if (!store.containsKey(source)) {
                response.resume(Response.status(Status.NOT_FOUND).build());
                return;
            }

            final byte[] content = store.get(source);
            if (response.isSuspended()) {
                final StreamingOutput stream = new StreamingOutput() {
                    @Override
                    public void write(OutputStream os) throws IOException, WebApplicationException {
                        if (chunked) {
                            // Make sure we have enough data for chunking to kick in
                            for (int i = 0; i < 10; ++i) {
                                os.write(content);
                            }
                        } else {
                            os.write(content);
                        }
                    }
                };
                response.resume(Response.ok().entity(stream).build());
            }

        } catch (final Exception ex) {
            response.resume(Response.serverError().build());
        }
    }

    @GET
    @Path("/redirect")
    public Response redirectFile(@Context UriInfo uriInfo) {
        final UriBuilder builder = uriInfo.getBaseUriBuilder().path(getClass());
        uriInfo.getQueryParameters(true).forEach((p, v) -> builder.queryParam(p, v.get(0)));

        final ResponseBuilder response = Response.status(303).header("Location", builder.build());
        return response.build();
    }
}