TestFSRMStateStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestFSRMStateStore extends RMStateStoreTestBase {

  public static final Logger LOG =
      LoggerFactory.getLogger(TestFSRMStateStore.class);

  private TestFSRMStateStoreTester fsTester;

  class TestFSRMStateStoreTester implements RMStateStoreHelper {

    Path workingDirPathURI;
    TestFileSystemRMStore store;
    MiniDFSCluster cluster;
    boolean adminCheckEnable;

    class TestFileSystemRMStore extends FileSystemRMStateStore {

      TestFileSystemRMStore(Configuration conf) throws Exception {
        init(conf);
        assertNull(fs);
        assertTrue(workingDirPathURI.equals(fsWorkingPath));
        dispatcher.disableExitOnDispatchException();
        start();
        assertNotNull(fs);
      }

      public Path getVersionNode() {
        return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE);
      }

      public Version getCurrentVersion() {
        return CURRENT_VERSION_INFO;
      }

      public Path getAppDir(String appId) {
        Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME);
        Path appRootDir = new Path(rootDir, RM_APP_ROOT);
        Path appDir = new Path(appRootDir, appId);
        return appDir;
      }

      public Path getAttemptDir(String appId, String attemptId) {
        Path appDir = getAppDir(appId);
        Path attemptDir = new Path(appDir, attemptId);
        return attemptDir;
      }
    }

    public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable) throws Exception {
      Path workingDirPath = new Path("/yarn/Test");
      this.adminCheckEnable = adminCheckEnable;
      this.cluster = cluster;
      FileSystem fs = cluster.getFileSystem();
      fs.mkdirs(workingDirPath);
      Path clusterURI = new Path(cluster.getURI());
      workingDirPathURI = new Path(clusterURI, workingDirPath);
      fs.close();
    }

    @Override
    public RMStateStore getRMStateStore() throws Exception {
      YarnConfiguration conf = new YarnConfiguration();
      conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
          workingDirPathURI.toString());
      conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
      conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
              900L);
      conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
      conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
      if (adminCheckEnable) {
        conf.setBoolean(
          YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);
      }
      this.store = new TestFileSystemRMStore(conf);
      assertThat(store.getNumRetries()).isEqualTo(8);
      assertThat(store.getRetryInterval()).isEqualTo(900L);
      assertTrue(store.fs.getConf() == store.fsConf);
      FileSystem previousFs = store.fs;
      store.startInternal();
      assertTrue(store.fs != previousFs);
      assertTrue(store.fs.getConf() == store.fsConf);
      return store;
    }

    @Override
    public boolean isFinalStateValid() throws Exception {
      FileSystem fs = cluster.getFileSystem();
      FileStatus[] files = fs.listStatus(workingDirPathURI);
      return files.length == 1;
    }

    @Override
    public void writeVersion(Version version) throws Exception {
      store.updateFile(store.getVersionNode(), ((VersionPBImpl)
              version)
              .getProto().toByteArray(), false);
    }

    @Override
    public Version getCurrentVersion() throws Exception {
      return store.getCurrentVersion();
    }

    public boolean appExists(RMApp app) throws IOException {
      FileSystem fs = cluster.getFileSystem();
      Path nodePath =
              store.getAppDir(app.getApplicationId().toString());
      return fs.exists(nodePath);
    }

    public boolean attemptExists(RMAppAttempt attempt) throws IOException {
      FileSystem fs = cluster.getFileSystem();
      ApplicationAttemptId attemptId = attempt.getAppAttemptId();
      Path nodePath =
          store.getAttemptDir(attemptId.getApplicationId().toString(),
              attemptId.toString());
      return fs.exists(nodePath);
    }
  }

  @Test
  @Timeout(value = 120)
  public void testFSRMStateStore() throws Exception {
    HdfsConfiguration conf = new HdfsConfiguration();
    MiniDFSCluster cluster =
            new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    try {
      fsTester = new TestFSRMStateStoreTester(cluster, false);
      // If the state store is FileSystemRMStateStore then add corrupted entry.
      // It should discard the entry and remove it from file system.
      FSDataOutputStream fsOut = null;
      FileSystemRMStateStore fileSystemRMStateStore =
              (FileSystemRMStateStore) fsTester.getRMStateStore();
      String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
      ApplicationAttemptId attemptId3 =
          ApplicationAttemptId.fromString(appAttemptIdStr3);
      Path appDir =
              fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
      Path tempAppAttemptFile =
              new Path(appDir, attemptId3.toString() + ".tmp");
      fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
      fsOut.write("Some random data ".getBytes());
      fsOut.close();

      testRMAppStateStore(fsTester);
      assertFalse(fsTester.workingDirPathURI
              .getFileSystem(conf).exists(tempAppAttemptFile));
      testRMDTSecretManagerStateStore(fsTester);
      testCheckVersion(fsTester);
      testEpoch(fsTester);
      testAppDeletion(fsTester);
      testDeleteStore(fsTester);
      testRemoveApplication(fsTester);
      testRemoveAttempt(fsTester);
      testAMRMTokenSecretManagerStateStore(fsTester);
      testReservationStateStore(fsTester);
      testProxyCA(fsTester);
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  @Timeout(value = 60)
  public void testHDFSRMStateStore() throws Exception {
    final HdfsConfiguration conf = new HdfsConfiguration();
    UserGroupInformation yarnAdmin =
            UserGroupInformation.createUserForTesting("yarn",
                    new String[]{"admin"});
    final MiniDFSCluster cluster =
            new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    cluster.getFileSystem().mkdir(new Path("/yarn"),
            FsPermission.valueOf("-rwxrwxrwx"));
    cluster.getFileSystem().setOwner(new Path("/yarn"), "yarn", "admin");
    final UserGroupInformation hdfsAdmin = UserGroupInformation.getCurrentUser();
    final StoreStateVerifier verifier = new StoreStateVerifier() {
      @Override
      void afterStoreApp(final RMStateStore store, final ApplicationId appId) {
        try {
          // Wait for things to settle
          Thread.sleep(5000);
          hdfsAdmin.doAs(
                  new PrivilegedExceptionAction<Void>() {
                    @Override
                    public Void run() throws Exception {
                      verifyFilesUnreadablebyHDFS(cluster,
                              ((FileSystemRMStateStore) store).getAppDir
                                      (appId));
                      return null;
                    }
                  });
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }

      @Override
      void afterStoreAppAttempt(final RMStateStore store,
                                final ApplicationAttemptId appAttId) {
        try {
          // Wait for things to settle
          Thread.sleep(5000);
          hdfsAdmin.doAs(
                  new PrivilegedExceptionAction<Void>() {
                    @Override
                    public Void run() throws Exception {
                      verifyFilesUnreadablebyHDFS(cluster,
                              ((FileSystemRMStateStore) store)
                                      .getAppAttemptDir(appAttId));
                      return null;
                    }
                  });
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    };
    try {
      yarnAdmin.doAs(new PrivilegedExceptionAction<Void>() {
        @Override
        public Void run() throws Exception {
          fsTester = new TestFSRMStateStoreTester(cluster, true);
          testRMAppStateStore(fsTester, verifier);
          return null;
        }
      });
    } finally {
      cluster.shutdown();
    }
  }

  private void verifyFilesUnreadablebyHDFS(MiniDFSCluster cluster,
                                                     Path root) throws Exception{
    DistributedFileSystem fs = cluster.getFileSystem();
    Queue<Path> paths = new LinkedList<>();
    paths.add(root);
    while (!paths.isEmpty()) {
      Path p = paths.poll();
      FileStatus stat = fs.getFileStatus(p);
      if (!stat.isDirectory()) {
        try {
          LOG.warn("\n\n ##Testing path [" + p + "]\n\n");
          fs.open(p);
          fail("Super user should not be able to read ["+
              UserGroupInformation.getCurrentUser() + "] [" + p.getName() + "]");
        } catch (AccessControlException e) {
          assertTrue(e.getMessage().contains("superuser is not allowed to perform this operation"));
        } catch (Exception e) {
          fail("Should get an AccessControlException here");
        }
      }
      if (stat.isDirectory()) {
        FileStatus[] ls = fs.listStatus(p);
        for (FileStatus f : ls) {
          paths.add(f.getPath());
        }
      }
    }

  }

  @Test
  @Timeout(value = 60)
  public void testCheckMajorVersionChange() throws Exception {
    HdfsConfiguration conf = new HdfsConfiguration();
    MiniDFSCluster cluster =
        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    try {
      fsTester = new TestFSRMStateStoreTester(cluster, false) {
        Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);

        @Override
        public Version getCurrentVersion() throws Exception {
          return VERSION_INFO;
        }

        @Override
        public RMStateStore getRMStateStore() throws Exception {
          YarnConfiguration conf = new YarnConfiguration();
          conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
              workingDirPathURI.toString());
          this.store = new TestFileSystemRMStore(conf) {
            Version storedVersion = null;

            @Override
            public Version getCurrentVersion() {
              return VERSION_INFO;
            }

            @Override
            protected synchronized Version loadVersion() throws Exception {
              return storedVersion;
            }

            @Override
            protected synchronized void storeVersion() throws Exception {
              storedVersion = VERSION_INFO;
            }
          };
          return store;
        }
      };

      // default version
      RMStateStore store = fsTester.getRMStateStore();
      Version defaultVersion = fsTester.getCurrentVersion();
      store.checkVersion();
      assertEquals(defaultVersion, store.loadVersion());
    } finally {
      cluster.shutdown();
    }
  }

  @Override
  protected void modifyAppState() throws Exception {
    // imitate appAttemptFile1 is still .new, but old one is deleted
    String appAttemptIdStr1 = "appattempt_1352994193343_0001_000001";
    ApplicationAttemptId attemptId1 =
        ApplicationAttemptId.fromString(appAttemptIdStr1);
    Path appDir =
            fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
    Path appAttemptFile1 =
        new Path(appDir, attemptId1.toString() + ".new");
    FileSystemRMStateStore fileSystemRMStateStore =
        (FileSystemRMStateStore) fsTester.getRMStateStore();
    fileSystemRMStateStore.renameFile(appAttemptFile1,
            new Path(appAttemptFile1.getParent(),
                    appAttemptFile1.getName() + ".new"));
  }

  @Override
  protected void modifyRMDelegationTokenState() throws Exception {
    // imitate dt file is still .new, but old one is deleted
    Path nodeCreatePath =
        fsTester.store.getNodePath(fsTester.store.rmDTSecretManagerRoot,
            FileSystemRMStateStore.DELEGATION_TOKEN_PREFIX + 0);
    FileSystemRMStateStore fileSystemRMStateStore =
        (FileSystemRMStateStore) fsTester.getRMStateStore();
    fileSystemRMStateStore.renameFile(nodeCreatePath,
        new Path(nodeCreatePath.getParent(),
            nodeCreatePath.getName() + ".new"));
  }

  @Test
  @Timeout(value = 30)
  public void testFSRMStateStoreClientRetry() throws Exception {
    HdfsConfiguration conf = new HdfsConfiguration();
    MiniDFSCluster cluster =
        new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
    cluster.waitActive();
    try {
      TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster, false);
      final RMStateStore store = fsTester.getRMStateStore();
      store.setRMDispatcher(new TestDispatcher());
      final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
      cluster.shutdownNameNodes();

      Thread clientThread = new Thread(() -> {
        try {
          store.storeApplicationStateInternal(
              ApplicationId.newInstance(100L, 1),
              ApplicationStateData.newInstance(111, 111, "user", null,
                  RMAppState.ACCEPTED, "diagnostics", 222, 333, null));
        } catch (Exception e) {
          assertionFailedInThread.set(true);
          e.printStackTrace();
        }
      });
      Thread.sleep(2000);
      clientThread.start();
      cluster.restartNameNode();
      clientThread.join();
      assertFalse(assertionFailedInThread.get());
    } finally {
      cluster.shutdown();
    }
  }
}