TestSystemServiceManagerImpl.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;

import static org.junit.jupiter.api.Assertions.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
 * Test class for system service manager.
 */
public class TestSystemServiceManagerImpl {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestSystemServiceManagerImpl.class);
  private SystemServiceManagerImpl systemService;
  private Configuration conf;
  private String resourcePath = "system-services";

  private String[] users = new String[] {"user1", "user2"};
  private static Map<String, Set<String>> loadedServices = new HashMap<>();
  private static Map<String, Set<String>> savedServices = new HashMap<>();
  private static Map<String, Set<String>> submittedServices = new HashMap<>();

  @BeforeEach
  public void setup() {
    File file = new File(
        getClass().getClassLoader().getResource(resourcePath).getFile());
    conf = new Configuration();
    conf.set(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY,
        file.getAbsolutePath());
    systemService = new SystemServiceManagerImpl() {
      @Override ServiceClient getServiceClient() {
        return new TestServiceClient();
      }
    };
    systemService.init(conf); // do not call explicit start

    constructUserService(users[0], "example-app1");
    constructUserService(users[1], "example-app1", "example-app2");
  }

  @AfterEach
  public void tearDown() {
    systemService.stop();
  }

  @Test
  void testSystemServiceSubmission() throws Exception {
    systemService.start();

    /* verify for ignored sevices count */
    Map<String, Integer> ignoredUserServices =
        systemService.getIgnoredUserServices();
    assertEquals(1, ignoredUserServices.size());
    assertTrue(ignoredUserServices.containsKey(users[0]),
        "User user1 doesn't exist.");
    int count = ignoredUserServices.get(users[0]);
    assertEquals(1, count);
    assertEquals(1,
        systemService.getBadFileNameExtensionSkipCounter());
    assertEquals(1, systemService.getBadDirSkipCounter());

    Map<String, Set<Service>> userServices =
        systemService.getSyncUserServices();
    assertEquals(loadedServices.size(), userServices.size());
    verifyForScannedUserServices(userServices);

    verifyForLaunchedUserServices();

    // 2nd time launch service to handle if service exist scenario
    systemService.launchUserService(userServices);
    verifyForLaunchedUserServices();

    // verify start of stopped services
    submittedServices.clear();
    systemService.launchUserService(userServices);
    verifyForLaunchedUserServices();
  }

  private void verifyForScannedUserServices(
      Map<String, Set<Service>> userServices) {
    for (String user : users) {
      Set<Service> services = userServices.get(user);
      Set<String> serviceNames = loadedServices.get(user);
      assertEquals(serviceNames.size(), services.size());
      Iterator<Service> iterator = services.iterator();
      while (iterator.hasNext()) {
        Service next = iterator.next();
        assertTrue(serviceNames.contains(next.getName()),
            "Service name doesn't exist in expected userService " + serviceNames);
      }
    }
  }

  public void constructUserService(String user, String... serviceNames) {
    Set<String> service = loadedServices.get(user);
    if (service == null) {
      service = new HashSet<>();
      for (String serviceName : serviceNames) {
        service.add(serviceName);
      }
      loadedServices.put(user, service);
    }
  }

  class TestServiceClient extends ServiceClient {
    @Override
    protected void serviceStart() throws Exception {
      // do nothing
    }

    @Override
    protected void serviceStop() throws Exception {
      // do nothing
    }

    @Override
    protected void serviceInit(Configuration configuration)
        throws Exception {
      // do nothing
    }

    @Override
    public int actionBuild(Service service)
        throws YarnException, IOException {
      String userName =
          UserGroupInformation.getCurrentUser().getShortUserName();
      Set<String> services = savedServices.get(userName);
      if (services == null) {
        services = new HashSet<>();
        savedServices.put(userName, services);
      }
      if (services.contains(service.getName())) {
        String message = "Failed to save service " + service.getName()
            + ", because it already exists.";
        throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
            message);
      }
      services.add(service.getName());
      return 0;
    }

    @Override
    public ApplicationId actionStartAndGetId(String serviceName)
        throws YarnException, IOException {
      String userName =
          UserGroupInformation.getCurrentUser().getShortUserName();
      Set<String> services = submittedServices.get(userName);
      if (services == null) {
        services = new HashSet<>();
        submittedServices.put(userName, services);
      }
      if (services.contains(serviceName)) {
        String message = "Failed to create service " + serviceName
            + ", because it is already running.";
        throw new YarnException(message);
      }
      services.add(serviceName);
      return ApplicationId.newInstance(System.currentTimeMillis(), 1);
    }
  }

  private void verifyForLaunchedUserServices() {
    assertEquals(loadedServices.size(), submittedServices.size());
    for (Map.Entry<String, Set<String>> entry : submittedServices.entrySet()) {
      String user = entry.getKey();
      Set<String> serviceSet = entry.getValue();
      assertTrue(loadedServices.containsKey(user));
      Set<String> services = loadedServices.get(user);
      assertEquals(services.size(), serviceSet.size());
      assertTrue(services.containsAll(serviceSet));
    }
  }

  @Test
  void testFileSystemCloseWhenCleanUpService() throws Exception {
    FileSystem fs = null;
    Path path = new Path("/tmp/servicedir");

    HdfsConfiguration hdfsConfig = new HdfsConfiguration();
    MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
        .numDataNodes(1).build();

    fs = hdfsCluster.getFileSystem();
    if (!fs.exists(path)) {
      fs.mkdirs(path);
    }

    SystemServiceManagerImpl serviceManager = new SystemServiceManagerImpl();

    hdfsConfig.set(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY,
        path.toString());
    serviceManager.init(hdfsConfig);

    // the FileSystem object owned by SystemServiceManager must not be closed
    // when cleanup a service
    hdfsConfig.set("hadoop.registry.zk.connection.timeout.ms", "100");
    hdfsConfig.set("hadoop.registry.zk.retry.times", "1");
    ApiServiceClient asc = new ApiServiceClient();
    asc.serviceInit(hdfsConfig);
    asc.actionCleanUp("testapp", "testuser");

    try {
      serviceManager.start();
    } catch (Exception e) {
      if (e.getMessage().contains("Filesystem closed")) {
        fail("SystemServiceManagerImpl failed to handle " +
            "FileSystem close");
      } else {
        fail("Should not get any exceptions");
      }
    } finally {
      serviceManager.stop();
      fs = hdfsCluster.getFileSystem();
      if (fs.exists(path)) {
        fs.delete(path, true);
      }
      if (hdfsCluster != null) {
        hdfsCluster.shutdown();
      }
    }
  }
}