AWSRequestAnalyzer.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.audit;
import java.util.List;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateSessionRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.MULTIPART_UPLOAD_ABORTED;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.MULTIPART_UPLOAD_COMPLETED;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.MULTIPART_UPLOAD_LIST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.MULTIPART_UPLOAD_PART_PUT;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.MULTIPART_UPLOAD_STARTED;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_BULK_DELETE_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_DELETE_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
/**
* Extract information from a request.
* Intended for reporting and error logs.
*/
public class AWSRequestAnalyzer {
/**
* Given an AWS request, try to analyze it to operation,
* read/write and path.
* @param request request.
* @return information about the request.
*/
public RequestInfo analyze(SdkRequest request) {
// this is where Scala's case statement would massively
// simplify life.
// Please Keep in Alphabetical Order.
if (request instanceof AbortMultipartUploadRequest) {
return writing(MULTIPART_UPLOAD_ABORTED,
((AbortMultipartUploadRequest) request).key(),
0);
} else if (request instanceof CompleteMultipartUploadRequest) {
CompleteMultipartUploadRequest r
= (CompleteMultipartUploadRequest) request;
return writing(MULTIPART_UPLOAD_COMPLETED,
r.key(),
r.multipartUpload().parts().size());
} else if (request instanceof CreateMultipartUploadRequest) {
return writing(MULTIPART_UPLOAD_STARTED,
((CreateMultipartUploadRequest) request).key(),
0);
} else if (request instanceof DeleteObjectRequest) {
// DeleteObject: single object
return writing(OBJECT_DELETE_REQUEST,
((DeleteObjectRequest) request).key(),
1);
} else if (request instanceof DeleteObjectsRequest) {
// DeleteObjects: bulk delete
// use first key as the path
DeleteObjectsRequest r = (DeleteObjectsRequest) request;
List<ObjectIdentifier> objectIdentifiers
= r.delete().objects();
return writing(OBJECT_BULK_DELETE_REQUEST,
objectIdentifiers.isEmpty() ? null : objectIdentifiers.get(0).key(),
objectIdentifiers.size());
} else if (request instanceof GetBucketLocationRequest) {
GetBucketLocationRequest r = (GetBucketLocationRequest) request;
return reading(STORE_EXISTS_PROBE,
r.bucket(),
0);
} else if (request instanceof GetObjectRequest) {
GetObjectRequest r = (GetObjectRequest) request;
return reading(ACTION_HTTP_GET_REQUEST,
r.key(),
sizeFromRangeHeader(r.range()));
} else if (request instanceof HeadObjectRequest) {
return reading(ACTION_HTTP_HEAD_REQUEST,
((HeadObjectRequest) request).key(), 0);
} else if (request instanceof ListMultipartUploadsRequest) {
ListMultipartUploadsRequest r
= (ListMultipartUploadsRequest) request;
return reading(MULTIPART_UPLOAD_LIST,
r.prefix(),
r.maxUploads());
} else if (request instanceof ListObjectsRequest) {
ListObjectsRequest r = (ListObjectsRequest) request;
return reading(OBJECT_LIST_REQUEST,
r.prefix(),
r.maxKeys());
} else if (request instanceof ListObjectsV2Request) {
ListObjectsV2Request r = (ListObjectsV2Request) request;
return reading(OBJECT_LIST_REQUEST,
r.prefix(),
r.maxKeys());
} else if (request instanceof PutObjectRequest) {
PutObjectRequest r = (PutObjectRequest) request;
return writing(OBJECT_PUT_REQUEST,
r.key(),
0);
} else if (request instanceof UploadPartRequest) {
UploadPartRequest r = (UploadPartRequest) request;
return writing(MULTIPART_UPLOAD_PART_PUT,
r.key(),
r.contentLength());
}
// no explicit support, return classname
return writing(request.getClass().getName(), null, 0);
}
/**
* A request.
* @param verb verb
* @param mutating does this update the store
* @param key object/prefix, etc.
* @param size nullable size
* @return request info
*/
private RequestInfo request(final String verb,
final boolean mutating,
final String key,
final Number size) {
return new RequestInfo(verb, mutating, key, size);
}
/**
* A read request.
* @param verb verb
* @param key object/prefix, etc.
* @param size nullable size
* @return request info
*/
private RequestInfo reading(final String verb,
final String key, final Number size) {
return request(verb, false, key, size);
}
/**
* A write request of some form.
* @param verb verb
* @param key object/prefix, etc.
* @param size nullable size
* @return request info
*/
private RequestInfo writing(final String verb,
final String key, final Number size) {
return request(verb, true, key, size);
}
/**
* Predicate which returns true if the request is of a kind which
* could be outside a span because of how the AWS SDK generates them.
* @param request request
* @return true if the transfer manager creates them.
*/
public static boolean
isRequestNotAlwaysInSpan(final Object request) {
return request instanceof UploadPartCopyRequest
|| request instanceof CompleteMultipartUploadRequest
|| request instanceof GetBucketLocationRequest
|| request instanceof CreateSessionRequest;
}
/**
* Predicate which returns true if the request is part of the
* multipart upload API -and which therefore must be rejected
* if multipart upload is disabled.
* @param request request
* @return true if the transfer manager creates them.
*/
public static boolean isRequestMultipartIO(final Object request) {
return request instanceof UploadPartCopyRequest
|| request instanceof CompleteMultipartUploadRequest
|| request instanceof CreateMultipartUploadRequest
|| request instanceof UploadPartRequest;
}
/**
* Info about a request.
*/
public static final class RequestInfo {
/**
* Verb.
*/
private String verb;
/**
* Is this a mutating call?
*/
private boolean mutating;
/**
* Key if there is one; maybe first key in a list.
*/
private String key;
/**
* Size, where the meaning of size depends on the request.
*/
private long size;
/**
* Construct.
* @param verb operation/classname, etc.
* @param mutating does this update S3 State.
* @param key key/path/bucket operated on.
* @param size size of request (bytes, elements, limit...). Nullable.
*/
private RequestInfo(final String verb,
final boolean mutating,
final String key,
final Number size) {
this.verb = verb;
this.mutating = mutating;
this.key = key;
this.size = toSafeLong(size);
}
public String getVerb() {
return verb;
}
public boolean isMutating() {
return mutating;
}
public String getKey() {
return key;
}
public long getSize() {
return size;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"{");
sb.append(verb);
if (key != null) {
sb.append(" '").append(key).append('\'');
}
sb.append(" size=").append(size);
sb.append(", mutating=").append(mutating);
sb.append('}');
return sb.toString();
}
}
private static long toSafeLong(final Number size) {
return size != null ? size.longValue() : 0;
}
private static final String BYTES_PREFIX = "bytes=";
/**
* Given a range header, determine the size of the request.
* @param rangeHeader header string
* @return parsed size or -1 for problems
*/
private static Number sizeFromRangeHeader(String rangeHeader) {
if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) {
String[] values = rangeHeader
.substring(BYTES_PREFIX.length())
.split("-");
if (values.length == 2) {
try {
long start = Long.parseUnsignedLong(values[0]);
long end = Long.parseUnsignedLong(values[1]);
return end - start;
} catch(NumberFormatException e) {
}
}
}
return -1;
}
}