ResourceEstimatorService.java
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.resourceestimator.service;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import javax.inject.Singleton;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.resourceestimator.common.api.RecurrenceId;
import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline;
import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration;
import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorUtil;
import org.apache.hadoop.resourceestimator.common.exception.ResourceEstimatorException;
import org.apache.hadoop.resourceestimator.common.serialization.RLESparseResourceAllocationSerDe;
import org.apache.hadoop.resourceestimator.common.serialization.ResourceSerDe;
import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore;
import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException;
import org.apache.hadoop.resourceestimator.solver.api.Solver;
import org.apache.hadoop.resourceestimator.solver.exceptions.SolverException;
import org.apache.hadoop.resourceestimator.translator.api.LogParser;
import org.apache.hadoop.resourceestimator.translator.impl.LogParserUtil;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
/**
* Resource Estimator Service which provides a set of REST APIs for users to
* use the estimation service.
*/
@Singleton
@Path("/resourceestimator")
public class ResourceEstimatorService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ResourceEstimatorService.class);
private final SkylineStore skylineStore;
private final Solver solver;
private final LogParser logParser;
private final LogParserUtil logParserUtil = new LogParserUtil();
private final Configuration config;
private final Gson gson;
private final Type rleType;
private final Type skylineStoreType;
public ResourceEstimatorService() throws ResourceEstimatorException {
try {
config = new Configuration();
config.addResource(ResourceEstimatorConfiguration.CONFIG_FILE);
skylineStore = ResourceEstimatorUtil.createProviderInstance(config,
ResourceEstimatorConfiguration.SKYLINESTORE_PROVIDER,
ResourceEstimatorConfiguration.DEFAULT_SKYLINESTORE_PROVIDER,
SkylineStore.class);
logParser = ResourceEstimatorUtil.createProviderInstance(config,
ResourceEstimatorConfiguration.TRANSLATOR_PROVIDER,
ResourceEstimatorConfiguration.DEFAULT_TRANSLATOR_PROVIDER,
LogParser.class);
logParser.init(config, skylineStore);
logParserUtil.setLogParser(logParser);
solver = ResourceEstimatorUtil.createProviderInstance(config,
ResourceEstimatorConfiguration.SOLVER_PROVIDER,
ResourceEstimatorConfiguration.DEFAULT_SOLVER_PROVIDER,
Solver.class);
solver.init(config, skylineStore);
} catch (Exception ex) {
LOGGER
.error("Server initialization failed due to: {}", ex.getMessage());
throw new ResourceEstimatorException(ex.getMessage(), ex);
}
gson = new GsonBuilder()
.registerTypeAdapter(Resource.class, new ResourceSerDe())
.registerTypeAdapter(RLESparseResourceAllocation.class,
new RLESparseResourceAllocationSerDe())
.enableComplexMapKeySerialization().create();
rleType = new TypeToken<RLESparseResourceAllocation>() {
}.getType();
skylineStoreType =
new TypeToken<Map<RecurrenceId, List<ResourceSkyline>>>() {
}.getType();
}
/**
* Parse the log file. See also {@link LogParser#parseStream(InputStream)}.
*
* @param logFile file/directory of the log to be parsed.
* @throws IOException if fails to parse the log.
* @throws SkylineStoreException if fails to addHistory to
* {@link SkylineStore}.
* @throws ResourceEstimatorException if the {@link LogParser}
* is not initialized.
*/
@POST
@Path("/translator/{logFile : .+}")
public void parseFile(@PathParam("logFile") String logFile)
throws IOException, SkylineStoreException, ResourceEstimatorException {
logParserUtil.parseLog(logFile);
LOGGER.debug("Parse logFile: {}.", logFile);
}
/**
* Get predicted {code Resource} allocation for the pipeline. If the
* prediction for the pipeline already exists in the {@link SkylineStore}, it
* will directly get the prediction from {@link SkylineStore}, otherwise it
* will call the {@link Solver} to make prediction, and store the predicted
* {code Resource} allocation to the {@link SkylineStore}. Note that invoking
* {@link Solver} could be a time-consuming operation.
*
* @param pipelineId the id of the pipeline.
* @return Json format of {@link RLESparseResourceAllocation}.
* @throws SolverException if {@link Solver} fails;
* @throws SkylineStoreException if fails to get history
* {@link ResourceSkyline} or predicted {code Resource} allocation
* from {@link SkylineStore}.
*/
@GET
@Path("/estimator/{pipelineId}")
@Produces(MediaType.APPLICATION_JSON)
public String getPrediction(@PathParam(value = "pipelineId") String pipelineId)
throws SolverException, SkylineStoreException {
// first, try to grab the predicted resource allocation from the skyline
// store
RLESparseResourceAllocation result = skylineStore.getEstimation(pipelineId);
// if received resource allocation is null, then run the solver
if (result == null) {
RecurrenceId recurrenceId = new RecurrenceId(pipelineId, "*");
Map<RecurrenceId, List<ResourceSkyline>> jobHistory =
skylineStore.getHistory(recurrenceId);
result = solver.solve(jobHistory);
}
final String prediction = gson.toJson(result, rleType);
LOGGER.debug("Predict resource requests for pipelineId: {}.", pipelineId);
return prediction;
}
/**
* Get history {@link ResourceSkyline} from {@link SkylineStore}. This
* function supports the following special wildcard operations regarding
* {@link RecurrenceId}: If the {@code pipelineId} is "*", it will return all
* entries in the store; else, if the {@code runId} is "*", it will return all
* {@link ResourceSkyline}s belonging to the {@code pipelineId}; else, it will
* return all {@link ResourceSkyline}s belonging to the {{@code pipelineId},
* {@code runId}}. If the {@link RecurrenceId} does not exist, it will not do
* anything.
*
* @param pipelineId pipelineId of the history run.
* @param runId runId of the history run.
* @return Json format of history {@link ResourceSkyline}s.
* @throws SkylineStoreException if fails to getHistory
* {@link ResourceSkyline} from {@link SkylineStore}.
*/
@GET @Path("/skylinestore/history/{pipelineId}/{runId}")
@Produces(MediaType.APPLICATION_JSON)
public String getHistoryResourceSkyline(
@PathParam("pipelineId") String pipelineId,
@PathParam("runId") String runId) throws SkylineStoreException {
RecurrenceId recurrenceId = new RecurrenceId(pipelineId, runId);
Map<RecurrenceId, List<ResourceSkyline>> jobHistory = skylineStore.getHistory(recurrenceId);
final String skyline = gson.toJson(jobHistory, skylineStoreType);
LOGGER.debug("Query the skyline store for recurrenceId: {}.", recurrenceId);
return skyline;
}
/**
* Get estimated {code Resource} allocation for the pipeline.
*
* @param pipelineId id of the pipeline.
* @return Json format of {@link RLESparseResourceAllocation}.
* @throws SkylineStoreException if fails to get estimated {code Resource}
* allocation from {@link SkylineStore}.
*/
@GET @Path("/skylinestore/estimation/{pipelineId}")
@Produces(MediaType.APPLICATION_JSON)
public String getEstimatedResourceAllocation(
@PathParam("pipelineId") String pipelineId) throws SkylineStoreException {
RLESparseResourceAllocation result = skylineStore.getEstimation(pipelineId);
final String skyline = gson.toJson(result, rleType);
LOGGER.debug("Query the skyline store for pipelineId: {}.", pipelineId);
return skyline;
}
/**
* Delete history {@link ResourceSkyline}s from {@link SkylineStore}.
* <p> Note that for safety considerations, we only allow users to delete
* history {@link ResourceSkyline}s of one job run.
*
* @param pipelineId pipelineId of the history run.
* @param runId runId runId of the history run.
* @throws SkylineStoreException if fails to deleteHistory
* {@link ResourceSkyline}s.
*/
@DELETE
@Path("/skylinestore/history/{pipelineId}/{runId}")
public void deleteHistoryResourceSkyline(
@PathParam("pipelineId") String pipelineId,
@PathParam("runId") String runId) throws SkylineStoreException {
RecurrenceId recurrenceId = new RecurrenceId(pipelineId, runId);
skylineStore.deleteHistory(recurrenceId);
LOGGER.info("Delete ResourceSkyline for recurrenceId: {}.", recurrenceId);
}
}