TestSharedCacheUploader.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
import org.junit.Test;

public class TestSharedCacheUploader {

  /**
   * If verifyAccess fails, the upload should fail
   */
  @Test
  public void testFailVerifyAccess() throws Exception {
    SharedCacheUploader spied = createSpiedUploader();
    doReturn(false).when(spied).verifyAccess();

    assertFalse(spied.call());
  }

  /**
   * If rename fails, the upload should fail
   */
  @Test
  public void testRenameFail() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
    LocalResource resource = mock(LocalResource.class);
    Path localPath = mock(Path.class);
    when(localPath.getName()).thenReturn("foo.jar");
    String user = "joe";
    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
    SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
    when(response.getAccepted()).thenReturn(true);
    when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
        thenReturn(response);
    FileSystem fs = mock(FileSystem.class);
    // return false when rename is called
    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(false);
    FileSystem localFs = FileSystem.getLocal(conf);
    SharedCacheUploader spied =
        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
            localFs);
    // stub verifyAccess() to return true
    doReturn(true).when(spied).verifyAccess();
    // stub getActualPath()
    doReturn(localPath).when(spied).getActualPath();
    // stub computeChecksum()
    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
    // stub uploadFile() to return true
    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));

    assertFalse(spied.call());
  }

  /**
   * If verifyAccess, uploadFile, rename, and notification succeed, the upload
   * should succeed
   */
  @Test
  public void testSuccess() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
    LocalResource resource = mock(LocalResource.class);
    Path localPath = mock(Path.class);
    when(localPath.getName()).thenReturn("foo.jar");
    String user = "joe";
    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
    SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
    when(response.getAccepted()).thenReturn(true);
    when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
        thenReturn(response);
    FileSystem fs = mock(FileSystem.class);
    // return false when rename is called
    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
    FileSystem localFs = FileSystem.getLocal(conf);
    SharedCacheUploader spied =
        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
            localFs);
    // stub verifyAccess() to return true
    doReturn(true).when(spied).verifyAccess();
    // stub getActualPath()
    doReturn(localPath).when(spied).getActualPath();
    // stub computeChecksum()
    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
    // stub uploadFile() to return true
    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
    // stub notifySharedCacheManager to return true
    doReturn(true).when(spied).notifySharedCacheManager(isA(String.class),
        isA(String.class));

    assertTrue(spied.call());
  }

  /**
   * If verifyAccess, uploadFile, and rename succed, but it receives a nay from
   * SCM, the file should be deleted
   */
  @Test
  public void testNotifySCMFail() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
    LocalResource resource = mock(LocalResource.class);
    Path localPath = mock(Path.class);
    when(localPath.getName()).thenReturn("foo.jar");
    String user = "joe";
    FileSystem fs = mock(FileSystem.class);
    // return false when rename is called
    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
    FileSystem localFs = FileSystem.getLocal(conf);
    SharedCacheUploader spied =
        createSpiedUploader(resource, localPath, user, conf, null, fs,
            localFs);
    // stub verifyAccess() to return true
    doReturn(true).when(spied).verifyAccess();
    // stub getActualPath()
    doReturn(localPath).when(spied).getActualPath();
    // stub computeChecksum()
    doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
    // stub uploadFile() to return true
    doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
    // stub notifySharedCacheManager to return true
    doReturn(false).when(spied).notifySharedCacheManager(isA(String.class),
        isA(String.class));

    assertFalse(spied.call());
    verify(fs).delete(isA(Path.class), anyBoolean());
  }

  /**
   * If resource is public, verifyAccess should succeed
   */
  @Test
  public void testVerifyAccessPublicResource() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
    LocalResource resource = mock(LocalResource.class);
    // give public visibility
    when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
    Path localPath = mock(Path.class);
    when(localPath.getName()).thenReturn("foo.jar");
    String user = "joe";
    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
    FileSystem fs = mock(FileSystem.class);
    FileSystem localFs = FileSystem.getLocal(conf);
    SharedCacheUploader spied =
        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
            localFs);

    assertTrue(spied.verifyAccess());
  }

  /**
   * If the localPath does not exists, getActualPath should get to one level
   * down
   */
  @Test
  public void testGetActualPath() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
    LocalResource resource = mock(LocalResource.class);
    // give public visibility
    when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
    Path localPath = new Path("foo.jar");
    String user = "joe";
    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
    FileSystem fs = mock(FileSystem.class);
    FileSystem localFs = mock(FileSystem.class);
    // stub it to return a status that indicates a directory
    FileStatus status = mock(FileStatus.class);
    when(status.isDirectory()).thenReturn(true);
    when(localFs.getFileStatus(localPath)).thenReturn(status);
    SharedCacheUploader spied =
        createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
            localFs);

    Path actualPath = spied.getActualPath();
    assertEquals(actualPath.getName(), localPath.getName());
    assertEquals(actualPath.getParent().getName(), localPath.getName());
  }

  private SharedCacheUploader createSpiedUploader() throws IOException {
    Configuration conf = new Configuration();
    conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
    LocalResource resource = mock(LocalResource.class);
    Path localPath = mock(Path.class);
    String user = "foo";
    SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
    FileSystem fs = FileSystem.get(conf);
    FileSystem localFs = FileSystem.getLocal(conf);
    return createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
        localFs);
  }

  private SharedCacheUploader createSpiedUploader(LocalResource resource, Path localPath,
      String user, Configuration conf, SCMUploaderProtocol scmClient,
      FileSystem fs, FileSystem localFs)
          throws IOException {
    SharedCacheUploader uploader = new SharedCacheUploader(resource, localPath, user, conf, scmClient,
        fs, localFs);
    return spy(uploader);
  }
}