TestFTPFileSystem.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.hadoop.fs.ftp;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Comparator;

import org.apache.hadoop.util.Preconditions;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.ftpserver.usermanager.impl.BaseUser;
import org.apache.ftpserver.usermanager.impl.WritePermission;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
 * Test basic @{link FTPFileSystem} class methods. Contract tests are in
 * TestFTPContractXXXX.
 */
@Timeout(180)
public class TestFTPFileSystem {

  private FtpTestServer server;
  private java.nio.file.Path testDir;

  @BeforeEach
  public void setUp() throws Exception {
    testDir = Files.createTempDirectory(
        GenericTestUtils.getTestDir().toPath(), getClass().getName()
    );
    server = new FtpTestServer(testDir).start();
  }

  @AfterEach
  @SuppressWarnings("ResultOfMethodCallIgnored")
  public void tearDown() throws Exception {
    if (server != null) {
      server.stop();
      Files.walk(testDir)
          .sorted(Comparator.reverseOrder())
          .map(java.nio.file.Path::toFile)
          .forEach(File::delete);
    }
  }

  @Test
  public void testCreateWithWritePermissions() throws Exception {
    BaseUser user = server.addUser("test", "password", new WritePermission());
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS", "ftp:///");
    configuration.set("fs.ftp.host", "localhost");
    configuration.setInt("fs.ftp.host.port", server.getPort());
    configuration.set("fs.ftp.user.localhost", user.getName());
    configuration.set("fs.ftp.password.localhost", user.getPassword());
    configuration.setBoolean("fs.ftp.impl.disable.cache", true);

    FileSystem fs = FileSystem.get(configuration);
    byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
    try (FSDataOutputStream outputStream = fs.create(new Path("test1.txt"))) {
      outputStream.write(bytesExpected);
    }
    try (FSDataInputStream input = fs.open(new Path("test1.txt"))) {
      assertThat(bytesExpected).isEqualTo(IOUtils.readFullyToByteArray(input));
    }
  }

  @Test
  public void testCreateWithoutWritePermissions() throws Exception {
    BaseUser user = server.addUser("test", "password");
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS", "ftp:///");
    configuration.set("fs.ftp.host", "localhost");
    configuration.setInt("fs.ftp.host.port", server.getPort());
    configuration.set("fs.ftp.user.localhost", user.getName());
    configuration.set("fs.ftp.password.localhost", user.getPassword());
    configuration.setBoolean("fs.ftp.impl.disable.cache", true);

    FileSystem fs = FileSystem.get(configuration);
    byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
    LambdaTestUtils.intercept(
        IOException.class, "Unable to create file: test1.txt, Aborting",
        () -> {
          try (FSDataOutputStream out = fs.create(new Path("test1.txt"))) {
            out.write(bytesExpected);
          }
        }
    );
  }

  @Test
  public void testFTPDefaultPort() throws Exception {
    FTPFileSystem ftp = new FTPFileSystem();
    assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort());
  }

  @Test
  public void testFTPTransferMode() throws Exception {
    Configuration conf = new Configuration();
    FTPFileSystem ftp = new FTPFileSystem();
    assertEquals(FTP.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf));

    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "STREAM_TRANSFER_MODE");
    assertEquals(FTP.STREAM_TRANSFER_MODE, ftp.getTransferMode(conf));

    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "COMPRESSED_TRANSFER_MODE");
    assertEquals(FTP.COMPRESSED_TRANSFER_MODE, ftp.getTransferMode(conf));

    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "invalid");
    assertEquals(FTPClient.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf));
  }

  @Test
  public void testFTPDataConnectionMode() throws Exception {
    Configuration conf = new Configuration();
    FTPClient client = new FTPClient();
    FTPFileSystem ftp = new FTPFileSystem();
    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
        client.getDataConnectionMode());

    ftp.setDataConnectionMode(client, conf);
    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
        client.getDataConnectionMode());

    conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE, "invalid");
    ftp.setDataConnectionMode(client, conf);
    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
        client.getDataConnectionMode());

    conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE,
        "PASSIVE_LOCAL_DATA_CONNECTION_MODE");
    ftp.setDataConnectionMode(client, conf);
    assertEquals(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE,
        client.getDataConnectionMode());

  }

  @Test
  public void testGetFsAction(){
    FTPFileSystem ftp = new FTPFileSystem();
    int[] accesses = new int[] {FTPFile.USER_ACCESS, FTPFile.GROUP_ACCESS,
        FTPFile.WORLD_ACCESS};
    FsAction[] actions = FsAction.values();
    for(int i = 0; i < accesses.length; i++){
      for(int j = 0; j < actions.length; j++){
        enhancedAssertEquals(actions[j], ftp.getFsAction(accesses[i],
            getFTPFileOf(accesses[i], actions[j])));
      }
    }
  }

  private void enhancedAssertEquals(FsAction actionA, FsAction actionB){
    String notNullErrorMessage = "FsAction cannot be null here.";
    Preconditions.checkNotNull(actionA, notNullErrorMessage);
    Preconditions.checkNotNull(actionB, notNullErrorMessage);
    String errorMessageFormat = "expect FsAction is %s, whereas it is %s now.";
    String notEqualErrorMessage = String.format(errorMessageFormat,
        actionA.name(), actionB.name());
    assertEquals(actionA, actionB, notEqualErrorMessage);
  }

  private FTPFile getFTPFileOf(int access, FsAction action) {
    boolean check = access == FTPFile.USER_ACCESS ||
                      access == FTPFile.GROUP_ACCESS ||
                      access == FTPFile.WORLD_ACCESS;
    String errorFormat = "access must be in [%d,%d,%d], but it is %d now.";
    String errorMessage = String.format(errorFormat, FTPFile.USER_ACCESS,
         FTPFile.GROUP_ACCESS, FTPFile.WORLD_ACCESS, access);
    Preconditions.checkArgument(check, errorMessage);
    Preconditions.checkNotNull(action);
    FTPFile ftpFile = new FTPFile();

    if(action.implies(FsAction.READ)){
      ftpFile.setPermission(access, FTPFile.READ_PERMISSION, true);
    }

    if(action.implies(FsAction.WRITE)){
      ftpFile.setPermission(access, FTPFile.WRITE_PERMISSION, true);
    }

    if(action.implies(FsAction.EXECUTE)){
      ftpFile.setPermission(access, FTPFile.EXECUTE_PERMISSION, true);
    }

    return ftpFile;
  }

  @Test
  public void testFTPSetTimeout() {
    Configuration conf = new Configuration();
    FTPClient client = new FTPClient();
    FTPFileSystem ftp = new FTPFileSystem();

    ftp.setTimeout(client, conf);
    assertEquals(client.getControlKeepAliveTimeout(),
        FTPFileSystem.DEFAULT_TIMEOUT);

    long timeout = 600;
    conf.setLong(FTPFileSystem.FS_FTP_TIMEOUT, timeout);
    ftp.setTimeout(client, conf);
    assertEquals(client.getControlKeepAliveTimeout(), timeout);
  }

  private static void touch(FileSystem fs, Path filePath)
          throws IOException {
    // Create a file with a single byte of data.
    touch(fs, filePath, new byte[] {1});
  }

  private static void touch(FileSystem fs, Path path, byte[] data)
          throws IOException {
    try (FSDataOutputStream out = fs.create(path)) {
      if (data != null) {
        out.write(data);
      }
    }
  }

  /**
   * Test renaming a file.
   *
   * @throws Exception
   */
  @Test
  public void testRenameFileWithFullQualifiedPath() throws Exception {
    BaseUser user = server.addUser("test", "password", new WritePermission());
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS", "ftp:///");
    configuration.set("fs.ftp.host", "localhost");
    configuration.setInt("fs.ftp.host.port", server.getPort());
    configuration.set("fs.ftp.user.localhost", user.getName());
    configuration.set("fs.ftp.password.localhost", user.getPassword());
    configuration.setBoolean("fs.ftp.impl.disable.cache", true);

    FileSystem fs = FileSystem.get(configuration);

    // All the file operations will be relative to the root directory specified by the testDir
    // member variable.
    Path ftpDir = fs.makeQualified(new Path("/"));
    Path file1 = fs.makeQualified(new Path(ftpDir, "renamefile" + "1"));
    Path file2 = fs.makeQualified(new Path(ftpDir, "renamefile" + "2"));
    touch(fs, file1);
    assertTrue(fs.rename(file1, file2));
  }
}