TestPrometheusSplit.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.prometheus;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.HostAddress;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.DateTimeEncoding.packDateTimeWithZone;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.plugin.prometheus.MetadataUtil.METRIC_CODEC;
import static com.facebook.presto.plugin.prometheus.PrometheusClock.fixedClockAt;
import static com.facebook.presto.plugin.prometheus.PrometheusSplitManager.OFFSET_MILLIS;
import static com.facebook.presto.plugin.prometheus.PrometheusSplitManager.decimalSecondString;
import static com.facebook.presto.plugin.prometheus.TestPrometheusTable.TYPE_MANAGER;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static java.time.Instant.ofEpochMilli;
import static java.time.ZoneOffset.UTC;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
@Test(singleThreaded = true)
public class TestPrometheusSplit
{
//private static final Metadata METADATA = createTestMetadataManager();
//private static final TypeManager TYPE_MANAGER = new TypeRegistry();
private static final int NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS = 100;
/**
* Prometheus mock data
* The array below represents to a response to from real data:
* $ curl "http://127.0.0.1:9090/api/v1/query?query=up[120s]&time=1568638172"
* Just the time items from the "values" section of the response
*/
private static final ImmutableList<Double> promTimeValuesMock = new ImmutableList.Builder<Double>()
.add(1568638066.999)
.add(1568638081.996)
.add(1568638097.0)
.add(1568638112.0)
.add(1568638126.997)
.add(1568638142.0)
.add(1568638157.0)
.add(1568638171.999)
.build();
private final PrometheusSplit split = new PrometheusSplit(URI.create("http://127.0.0.1/test.file"));
private PrometheusHttpServer prometheusHttpServer;
/**
* mock Prometheus chunked query responses (time values only)
*
* @param splitTimes the end times that would be used for each Prometheus instant query
* @param queryChunkDuration the duration value that would be used for each query, `30s` for instance
* @return the values from the Prometheus data that would be return by all the chunked queries
*/
private static List<String> mockPrometheusResponseToChunkedQueries(io.airlift.units.Duration queryChunkDuration, List<String> splitTimes)
{
return Lists.reverse(splitTimes).stream()
.map(endTime -> mockPrometheusResponseToQuery(queryChunkDuration, endTime))
.flatMap(Collection::stream)
.sorted()
.map(TestPrometheusSplit::doubleToPlainString)
.collect(Collectors.toList());
}
/**
* mock Prometheus instant query
*/
private static List<Double> mockPrometheusResponseToQuery(io.airlift.units.Duration queryChunkDuration, String endTimeStr)
{
Double endTime = Double.valueOf(endTimeStr);
Double duration = queryChunkDuration.getValue(TimeUnit.SECONDS);
return promTimeValuesMock.stream()
.filter(promTimeValue -> ((endTime - duration) <= promTimeValue) && (promTimeValue <= endTime))
.collect(Collectors.toList());
}
/**
* Convert list of Double to list of String and avoid scientific notation
*/
private static List<String> convertMockTimesToStrings(List<Double> times)
{
return times.stream()
.map(TestPrometheusSplit::doubleToPlainString)
.collect(Collectors.toList());
}
/**
* Convert Double to String and avoid scientific notation
*/
private static String doubleToPlainString(Double aDouble)
{
return new BigDecimal(aDouble.toString()).stripTrailingZeros().toPlainString();
}
private static long longFromDecimalSecondString(String decimalString)
{
return new BigDecimal(decimalString).multiply(new BigDecimal(1000L)).longValueExact();
}
private static String getQueryChunkSizeDurationAsPrometheusCompatibleDurationString(PrometheusConnectorConfig config)
{
return config.getQueryChunkSizeDuration().roundTo(config.getQueryChunkSizeDuration().getUnit()) +
io.airlift.units.Duration.timeUnitToString(config.getQueryChunkSizeDuration().getUnit());
}
private static PrometheusConnectorConfig getCommonConfig(URI dataUri)
{
PrometheusConnectorConfig config = new PrometheusConnectorConfig();
config.setPrometheusURI(dataUri);
config.setMaxQueryRangeDuration(io.airlift.units.Duration.valueOf("21d"));
config.setQueryChunkSizeDuration(io.airlift.units.Duration.valueOf("1d"));
config.setCacheDuration(io.airlift.units.Duration.valueOf("30s"));
return config;
}
@BeforeClass
public void setUp()
{
prometheusHttpServer = new PrometheusHttpServer();
}
@Test
public void testAddresses()
{
// http split with default port
PrometheusSplit httpSplit = new PrometheusSplit(URI.create("http://prometheus.com/prometheus"));
assertEquals(httpSplit.getAddresses(), ImmutableList.of(HostAddress.fromString("prometheus.com")));
assertTrue(httpSplit.isRemotelyAccessible());
// http split with custom port
httpSplit = new PrometheusSplit(URI.create("http://prometheus.com:8080/prometheus"));
assertEquals(httpSplit.getAddresses(), ImmutableList.of(HostAddress.fromParts("prometheus.com", 8080)));
assertTrue(httpSplit.isRemotelyAccessible());
// http split with default port
PrometheusSplit httpsSplit = new PrometheusSplit(URI.create("https://prometheus.com/prometheus"));
assertEquals(httpsSplit.getAddresses(), ImmutableList.of(HostAddress.fromString("prometheus.com")));
assertTrue(httpsSplit.isRemotelyAccessible());
// http split with custom port
httpsSplit = new PrometheusSplit(URI.create("https://prometheus.com:8443/prometheus"));
assertEquals(httpsSplit.getAddresses(), ImmutableList.of(HostAddress.fromParts("prometheus.com", 8443)));
assertTrue(httpsSplit.isRemotelyAccessible());
}
@Test
public void testJsonRoundTrip()
{
JsonCodec<PrometheusSplit> codec = jsonCodec(PrometheusSplit.class);
String json = codec.toJson(split);
PrometheusSplit copy = codec.fromJson(json);
assertEquals(copy.getUri(), split.getUri());
assertEquals(copy.getAddresses(), ImmutableList.of(HostAddress.fromString("127.0.0.1")));
assertTrue(copy.isRemotelyAccessible());
}
@Test
public void testQueryWithTableNameNeedingURLEncodeInSplits()
throws URISyntaxException
{
Instant now = LocalDateTime.of(2019, 10, 2, 7, 26, 56, 0).toInstant(UTC);
PrometheusConnectorConfig config = getCommonConfig(prometheusHttpServer.resolve("/prometheus-data/prom-metrics-non-standard-name.json"));
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TYPE_MANAGER);
PrometheusTable table = client.getTable("default", "up+now");
PrometheusTableHandle tableHandle = new PrometheusTableHandle("default", table.getName());
TupleDomain<ColumnHandle> columnConstraints = TupleDomain.withColumnDomains(
ImmutableMap.of(
new PrometheusColumnHandle("value", BIGINT, 1), Domain.all(VARCHAR),
new PrometheusColumnHandle("text", createUnboundedVarcharType(), 0), Domain.all(VARCHAR)));
PrometheusTableLayoutHandle tableLayoutHandle = new PrometheusTableLayoutHandle(tableHandle, columnConstraints);
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, fixedClockAt(now), config);
ConnectorSplitSource splits = splitManager.getSplits(
null,
null,
tableLayoutHandle,
null);
PrometheusSplit split = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0);
String queryInSplit = split.getUri().getQuery();
String timeShouldBe = decimalSecondString(now.toEpochMilli() -
config.getMaxQueryRangeDuration().toMillis() +
config.getQueryChunkSizeDuration().toMillis() -
OFFSET_MILLIS * 20);
assertEquals(queryInSplit,
new URI("http://doesnotmatter.example:9090/api/v1/query?query=up+now[" + getQueryChunkSizeDurationAsPrometheusCompatibleDurationString(config) + "]" + "&time=" +
timeShouldBe).getQuery());
}
@Test
public void testQueryDividedIntoSplitsFirstSplitHasRightTime()
throws URISyntaxException
{
Instant now = LocalDateTime.of(2019, 10, 2, 7, 26, 56, 0).toInstant(UTC);
PrometheusConnectorConfig config = getCommonConfig(prometheusHttpServer.resolve("/prometheus-data/prometheus-metrics.json"));
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TYPE_MANAGER);
PrometheusTable table = client.getTable("default", "up");
PrometheusTableHandle tableHandle = new PrometheusTableHandle("default", table.getName());
TupleDomain<ColumnHandle> columnConstraints = TupleDomain.withColumnDomains(
ImmutableMap.of(
new PrometheusColumnHandle("value", BIGINT, 1), Domain.all(VARCHAR),
new PrometheusColumnHandle("text", createUnboundedVarcharType(), 0), Domain.all(VARCHAR)));
PrometheusTableLayoutHandle tableLayoutHandle = new PrometheusTableLayoutHandle(tableHandle, columnConstraints);
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, fixedClockAt(now), config);
ConnectorSplitSource splits = splitManager.getSplits(
null,
null,
tableLayoutHandle,
null);
PrometheusSplit split = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0);
String queryInSplit = split.getUri().getQuery();
String timeShouldBe = decimalSecondString(now.toEpochMilli() -
config.getMaxQueryRangeDuration().toMillis() +
config.getQueryChunkSizeDuration().toMillis() -
OFFSET_MILLIS * 20);
assertEquals(queryInSplit,
new URI("http://doesnotmatter.example:9090/api/v1/query?query=up[" + getQueryChunkSizeDurationAsPrometheusCompatibleDurationString(config) + "]" + "&time=" +
timeShouldBe).getQuery());
}
@Test
public void testQueryDividedIntoSplitsLastSplitHasRightTime()
throws URISyntaxException
{
Instant now = LocalDateTime.of(2019, 10, 2, 7, 26, 56, 0).toInstant(UTC);
PrometheusConnectorConfig config = getCommonConfig(prometheusHttpServer.resolve("/prometheus-data/prometheus-metrics.json"));
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TYPE_MANAGER);
PrometheusTable table = client.getTable("default", "up");
PrometheusTableHandle tableHandle = new PrometheusTableHandle("default", table.getName());
TupleDomain<ColumnHandle> columnConstraints = TupleDomain.withColumnDomains(
ImmutableMap.of(
new PrometheusColumnHandle("value", BIGINT, 1), Domain.all(VARCHAR),
new PrometheusColumnHandle("text", createUnboundedVarcharType(), 0), Domain.all(VARCHAR)));
PrometheusTableLayoutHandle tableLayoutHandle = new PrometheusTableLayoutHandle(tableHandle, columnConstraints);
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, fixedClockAt(now), config);
ConnectorSplitSource splitsMaybe = splitManager.getSplits(
null,
null,
tableLayoutHandle,
null);
List<ConnectorSplit> splits = splitsMaybe.getNextBatch(NOT_PARTITIONED, NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS).getNow(null).getSplits();
int lastSplitIndex = splits.size() - 1;
PrometheusSplit lastSplit = (PrometheusSplit) splits.get(lastSplitIndex);
String queryInSplit = lastSplit.getUri().getQuery();
String timeShouldBe = decimalSecondString(now.toEpochMilli());
URI uriAsFormed = new URI("http://doesnotmatter.example:9090/api/v1/query?query=up[" +
getQueryChunkSizeDurationAsPrometheusCompatibleDurationString(config) + "]" +
"&time=" + timeShouldBe);
assertEquals(queryInSplit, uriAsFormed.getQuery());
}
@Test
public void testQueryDividedIntoSplitsShouldHaveCorrectSpacingBetweenTimes()
{
Instant now = LocalDateTime.of(2019, 10, 2, 7, 26, 56, 0).toInstant(UTC);
PrometheusConnectorConfig config = getCommonConfig(prometheusHttpServer.resolve("/prometheus-data/prometheus-metrics.json"));
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TYPE_MANAGER);
PrometheusTable table = client.getTable("default", "up");
PrometheusTableHandle tableHandle = new PrometheusTableHandle("default", table.getName());
TupleDomain<ColumnHandle> columnConstraints = TupleDomain.withColumnDomains(
ImmutableMap.of(
new PrometheusColumnHandle("value", BIGINT, 1), Domain.all(VARCHAR),
new PrometheusColumnHandle("text", createUnboundedVarcharType(), 0), Domain.all(VARCHAR)));
PrometheusTableLayoutHandle tableLayoutHandle = new PrometheusTableLayoutHandle(tableHandle, columnConstraints);
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, fixedClockAt(now), config);
ConnectorSplitSource splits = splitManager.getSplits(
null,
null,
tableLayoutHandle,
null);
PrometheusSplit split1 = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0);
Map<String, String> paramsMap1 = new HashMap<>();
String[] splitKV1 = split1.getUri().getQuery().split("&");
paramsMap1.put("query", splitKV1[0].split("=")[1]);
paramsMap1.put("time", splitKV1[1].split("=")[1]);
assertEquals(paramsMap1.get("query"), "up[1d]");
PrometheusSplit split2 = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0);
Map<String, String> paramsMap2 = new HashMap<>();
String[] splitKV2 = split2.getUri().getQuery().split("&");
paramsMap2.put("query", splitKV2[0].split("=")[1]);
paramsMap2.put("time", splitKV2[1].split("=")[1]);
assertEquals(paramsMap2.get("query"), "up[1d]");
long diff = Double.valueOf(paramsMap2.get("time")).longValue() - Double.valueOf(paramsMap1.get("time")).longValue();
assertEquals(config.getQueryChunkSizeDuration().getValue(TimeUnit.SECONDS), diff, 0.0001);
}
@Test
public void testSplitTimesCorrect()
{
io.airlift.units.Duration maxQueryRangeDuration = new io.airlift.units.Duration(3, TimeUnit.DAYS);
io.airlift.units.Duration queryChunkSizeDuration = new io.airlift.units.Duration(1, TimeUnit.DAYS);
Instant now = ofEpochMilli(1000000000L);
PrometheusTableHandle prometheusTableHandle = new PrometheusTableHandle("schemaName", "tableName");
List<String> splitTimes = PrometheusSplitManager.generateTimesForSplits(
now,
maxQueryRangeDuration, queryChunkSizeDuration, prometheusTableHandle);
List<String> expectedSplitTimes = ImmutableList.of(
"827199.998", "913599.999", "1000000");
assertEquals(splitTimes, expectedSplitTimes);
}
@Test
public void testSplitTimesCorrectNonModuloZeroDurationToChunk()
{
io.airlift.units.Duration maxQueryRangeDuration = new io.airlift.units.Duration(3, TimeUnit.DAYS);
io.airlift.units.Duration queryChunkSizeDuration = new io.airlift.units.Duration(2, TimeUnit.DAYS);
Instant now = ofEpochMilli(1000000000L);
PrometheusTableHandle prometheusTableHandle = new PrometheusTableHandle("schemaName", "tableName");
List<String> splitTimes = PrometheusSplitManager.generateTimesForSplits(now, maxQueryRangeDuration, queryChunkSizeDuration, prometheusTableHandle);
List<String> expectedSplitTimes = ImmutableList.of(
"827199.999", "1000000");
assertEquals(splitTimes, expectedSplitTimes);
}
@Test
public void testSplitTimesCorrectVersusMock()
{
io.airlift.units.Duration maxQueryRangeDuration = new io.airlift.units.Duration(120, TimeUnit.SECONDS);
io.airlift.units.Duration queryChunkSizeDuration = new io.airlift.units.Duration(30, TimeUnit.SECONDS);
Instant now = ofEpochMilli(1568638172000L);
PrometheusTableHandle prometheusTableHandle = new PrometheusTableHandle("schemaName", "tableName");
List<String> splitTimes = PrometheusSplitManager.generateTimesForSplits(now, maxQueryRangeDuration, queryChunkSizeDuration, prometheusTableHandle);
List<String> promTimesReturned = mockPrometheusResponseToChunkedQueries(queryChunkSizeDuration, splitTimes);
assertEquals(promTimesReturned, convertMockTimesToStrings(promTimeValuesMock));
}
@Test
public void testSplitTimesAreTimesNearBoundaryNotMissing()
{
io.airlift.units.Duration maxQueryRangeDuration = new io.airlift.units.Duration(120, TimeUnit.SECONDS);
io.airlift.units.Duration queryChunkSizeDuration = new io.airlift.units.Duration(30, TimeUnit.SECONDS);
Instant now = ofEpochMilli(1568638171999L);
PrometheusTableHandle prometheusTableHandle = new PrometheusTableHandle("schemaName", "tableName");
List<String> splitTimes = PrometheusSplitManager.generateTimesForSplits(now, maxQueryRangeDuration, queryChunkSizeDuration, prometheusTableHandle);
List<String> promTimesReturned = mockPrometheusResponseToChunkedQueries(queryChunkSizeDuration, splitTimes);
assertEquals(promTimesReturned, convertMockTimesToStrings(promTimeValuesMock));
}
@Test
public void testMockPrometheusResponseShouldBeCorrectWhenUpperBoundaryAlignsWithData()
{
List<Double> expectedResponse = ImmutableList.of(1568638142.0, 1568638157.0, 1568638171.999);
assertEquals(mockPrometheusResponseToQuery(new io.airlift.units.Duration(30, TimeUnit.SECONDS), "1568638171.999"), expectedResponse);
}
@Test
public void testMockPrometheusResponseShouldBeCorrectWhenLowerBoundaryAlignsWithData()
{
List<Double> expectedResponse = ImmutableList.of(1568638142.0, 1568638157.0, 1568638171.999);
assertEquals(mockPrometheusResponseToQuery(new io.airlift.units.Duration(30, TimeUnit.SECONDS), "1568638172."), expectedResponse);
}
@Test
public void testMockPrometheusResponseShouldBeCorrectWhenLowerBoundaryLaterThanData()
{
List<Double> expectedResponse = ImmutableList.of(1568638157.0, 1568638171.999);
assertEquals(mockPrometheusResponseToQuery(new io.airlift.units.Duration(30, TimeUnit.SECONDS), "1568638172.001"), expectedResponse);
}
@Test
public void testMockPrometheusResponseWithSeveralChunksShouldBeCorrect()
{
List<String> expectedResponse = ImmutableList.of("1568638112", "1568638126.997", "1568638142", "1568638157", "1568638171.999");
List<String> splitTimes = ImmutableList.of("1568638141.999", "1568638172.");
assertEquals(mockPrometheusResponseToChunkedQueries(new io.airlift.units.Duration(30, TimeUnit.SECONDS), splitTimes), expectedResponse);
}
@Test
public void testPredicatePushDownLowerBoundDirect()
{
Range lowRange = Range.greaterThanOrEqual(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone(1570460709643L, UTC_KEY));
ValueSet valueSet = ValueSet.ofRanges(lowRange);
Domain testDomain = Domain.create(valueSet, false);
TupleDomain<ColumnHandle> testTupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of(
new PrometheusColumnHandle("timestamp", TIMESTAMP_WITH_TIME_ZONE, 2), testDomain));
PrometheusPredicateTimeInfo predicateTimes = PrometheusSplitManager.determinePredicateTimes(testTupleDomain).orElseThrow(
() -> new AssertionError("predicate pushdown error on Prometheus Column"));
Instant expected = ofEpochMilli(1570460709643L);
assertEquals(predicateTimes.getPredicateLowerTimeBound().orElseThrow(
() -> new AssertionError("predicate pushdown error on Prometheus Column")
), expected);
}
@Test
public void testPredicatePushDownSetsLowerBoundOnly()
{
long predicateLowValue = 1568638171999L - 600000L;
Range lowRange = Range.greaterThanOrEqual(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone(predicateLowValue, UTC_KEY));
ValueSet valueSet = ValueSet.ofRanges(lowRange);
Domain testDomain = Domain.create(valueSet, false);
TupleDomain<ColumnHandle> testTupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of(
new PrometheusColumnHandle("timestamp", TIMESTAMP_WITH_TIME_ZONE, 2), testDomain));
PrometheusTableHandle prometheusTableHandle = new PrometheusTableHandle("schemaName", "tableName")
.withPredicate(testTupleDomain);
io.airlift.units.Duration maxQueryRangeDuration = new io.airlift.units.Duration(120, TimeUnit.SECONDS);
io.airlift.units.Duration queryChunkSizeDuration = new io.airlift.units.Duration(30, TimeUnit.SECONDS);
Instant now = ofEpochMilli(1568638171999L);
TemporalAmount maxQueryAsTime = java.time.Duration.ofMillis(maxQueryRangeDuration.toMillis());
List<String> splitTimes = PrometheusSplitManager.generateTimesForSplits(now, maxQueryRangeDuration, queryChunkSizeDuration, prometheusTableHandle);
String earliestSplit = splitTimes.get(0);
Instant earliestSplitAsTime = ofEpochMilli(longFromDecimalSecondString(earliestSplit));
TemporalAmount queryChunkAsTime = java.time.Duration.ofMillis(queryChunkSizeDuration.toMillis());
Instant startOfQuery = earliestSplitAsTime.minus(queryChunkAsTime);
assertNotEquals(startOfQuery, now.minus(maxQueryAsTime).minus(java.time.Duration.ofMillis((splitTimes.size() - 1) * OFFSET_MILLIS)));
assertEquals(startOfQuery.toEpochMilli(), ofEpochMilli(predicateLowValue).toEpochMilli() - ((splitTimes.size() - 1) * OFFSET_MILLIS));
}
@Test
public void testPredicatePushDownSetsUpperBoundOnly()
{
long predicateHighValue = 1568638171999L;
Range highRange = Range.lessThanOrEqual(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone(predicateHighValue, UTC_KEY));
ValueSet valueSet = ValueSet.ofRanges(highRange);
Domain testDomain = Domain.create(valueSet, false);
TupleDomain<ColumnHandle> testTupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of(
new PrometheusColumnHandle("timestamp", TIMESTAMP_WITH_TIME_ZONE, 2), testDomain));
PrometheusTableHandle prometheusTableHandle = new PrometheusTableHandle("schemaName", "tableName")
.withPredicate(testTupleDomain);
io.airlift.units.Duration maxQueryRangeDuration = new io.airlift.units.Duration(120, TimeUnit.SECONDS);
io.airlift.units.Duration queryChunkSizeDuration = new io.airlift.units.Duration(30, TimeUnit.SECONDS);
Instant now = ofEpochMilli(1568638171999L + 600000L);
List<String> splitTimes = PrometheusSplitManager.generateTimesForSplits(now, maxQueryRangeDuration, queryChunkSizeDuration, prometheusTableHandle);
TemporalAmount expectedMaxQueryAsTime = java.time.Duration.ofMillis(maxQueryRangeDuration.toMillis() +
((splitTimes.size() - 1) * OFFSET_MILLIS));
String lastSplit = splitTimes.get(splitTimes.size() - 1);
Instant lastSplitAsTime = ofEpochMilli(longFromDecimalSecondString(lastSplit));
String earliestSplit = splitTimes.get(0);
Instant earliestSplitAsTime = ofEpochMilli(longFromDecimalSecondString(earliestSplit));
TemporalAmount queryChunkAsTime = java.time.Duration.ofMillis(queryChunkSizeDuration.toMillis());
java.time.Duration actualMaxDuration = Duration.between(earliestSplitAsTime
.minus(queryChunkAsTime), lastSplitAsTime);
assertEquals(lastSplitAsTime.toEpochMilli(), 1568638171999L);
assertEquals(actualMaxDuration, expectedMaxQueryAsTime);
}
@Test
public void testPredicatePushDownSetsUpperAndLowerBound()
{
long predicateHighValue = 1568638171999L;
Range highRange = Range.equal(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone(predicateHighValue, UTC_KEY));
long predicateLowValue = 1568638171999L - 600000L;
Range lowRange = Range.equal(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone(predicateLowValue, UTC_KEY));
ValueSet valueSet = ValueSet.ofRanges(lowRange, highRange);
Domain testDomain = Domain.create(valueSet, false);
TupleDomain<ColumnHandle> testTupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of(
new PrometheusColumnHandle("timestamp", TIMESTAMP_WITH_TIME_ZONE, 2), testDomain));
PrometheusTableHandle prometheusTableHandle = new PrometheusTableHandle("schemaName", "tableName")
.withPredicate(testTupleDomain);
io.airlift.units.Duration maxQueryRangeDuration = new io.airlift.units.Duration(120, TimeUnit.SECONDS);
io.airlift.units.Duration queryChunkSizeDuration = new io.airlift.units.Duration(30, TimeUnit.SECONDS);
Instant now = ofEpochMilli(1568638171999L + 1200000L);
List<String> splitTimes = PrometheusSplitManager.generateTimesForSplits(now, maxQueryRangeDuration, queryChunkSizeDuration, prometheusTableHandle);
TemporalAmount expectedMaxQueryAsTime = java.time.Duration.ofMillis(new io.airlift.units.Duration(10, TimeUnit.MINUTES).toMillis() +
((splitTimes.size() - 1) * OFFSET_MILLIS));
String lastSplit = splitTimes.get(splitTimes.size() - 1);
Instant lastSplitAsTime = ofEpochMilli(longFromDecimalSecondString(lastSplit));
String earliestSplit = splitTimes.get(0);
Instant earliestSplitAsTime = ofEpochMilli(longFromDecimalSecondString(earliestSplit));
TemporalAmount queryChunkAsTime = java.time.Duration.ofMillis(queryChunkSizeDuration.toMillis());
java.time.Duration actualMaxDuration = Duration.between(earliestSplitAsTime
.minus(queryChunkAsTime), lastSplitAsTime);
assertEquals(lastSplitAsTime.toEpochMilli(), 1568638171999L);
assertEquals(actualMaxDuration, expectedMaxQueryAsTime);
}
@Test
public void testEmptyPredicatePredicatePushDown()
{
long predicateLowValue = 1570460709643L;
PrometheusTableHandle prometheusTableHandle = new PrometheusTableHandle("schemaName", "tableName");
io.airlift.units.Duration maxQueryRangeDuration = new io.airlift.units.Duration(120, TimeUnit.SECONDS);
io.airlift.units.Duration queryChunkSizeDuration = new io.airlift.units.Duration(30, TimeUnit.SECONDS);
Instant now = ofEpochMilli(1568638171999L);
TemporalAmount maxQueryAsTime = java.time.Duration.ofMillis(maxQueryRangeDuration.toMillis());
List<String> splitTimes = PrometheusSplitManager.generateTimesForSplits(now, maxQueryRangeDuration, queryChunkSizeDuration, prometheusTableHandle);
String earliestSplit = splitTimes.get(0);
Instant earliestSplitAsTime = ofEpochMilli(longFromDecimalSecondString(earliestSplit));
TemporalAmount queryChunkAsTime = java.time.Duration.ofMillis(queryChunkSizeDuration.toMillis());
Instant startOfQuery = earliestSplitAsTime.minus(queryChunkAsTime);
assertEquals(startOfQuery, now.minus(maxQueryAsTime).minus(java.time.Duration.ofMillis((splitTimes.size() - 1) * OFFSET_MILLIS)));
assertNotEquals(startOfQuery.toEpochMilli(), ofEpochMilli(predicateLowValue).toEpochMilli());
}
}