메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


Hadoop 2.7.x에서 사용할 수 있는 파일/디렉토리 관련 utils

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSUtils {
	/**
	   * create a existing file from local filesystem to hdfs
	   * @param source
	   * @param dest
	   * @param conf
	   * @throws IOException
	   */
	  public void addFile(String source, String dest, Configuration conf) throws IOException {

	    FileSystem fileSystem = FileSystem.get(conf);

	    // Get the filename out of the file path
	    String filename = source.substring(source.lastIndexOf('/') + 1,source.length());

	    // Create the destination path including the filename.
	    if (dest.charAt(dest.length() - 1) != '/') {
	      dest = dest + "/" + filename;
	    } else {
	      dest = dest + filename;
	    }

	    // System.out.println("Adding file to " + destination);

	    // Check if the file already exists
	    Path path = new Path(dest);
	    if (fileSystem.exists(path)) {
	      System.out.println("File " + dest + " already exists");
	      return;
	    }

	    // Create a new file and write data to it.
	    FSDataOutputStream out = fileSystem.create(path);
	    InputStream in = new BufferedInputStream(new FileInputStream(new File(
	        source)));

	    byte[] b = new byte[1024];
	    int numBytes = 0;
	    while ((numBytes = in.read(b)) > 0) {
	      out.write(b, 0, numBytes);
	    }

	    // Close all the file descriptors
	    in.close();
	    out.close();
	    fileSystem.close();
	  }

	  /**
	   * read a file from hdfs
	   * @param file
	   * @param conf
	   * @throws IOException
	   */
	  public void readFile(String file, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(file);
	    if (!fileSystem.exists(path)) {
	      System.out.println("File " + file + " does not exists");
	      return;
	    }

	    FSDataInputStream in = fileSystem.open(path);

	    String filename = file.substring(file.lastIndexOf('/') + 1,
	        file.length());

	    OutputStream out = new BufferedOutputStream(new FileOutputStream(
	        new File(filename)));

	    byte[] b = new byte[1024];
	    int numBytes = 0;
	    while ((numBytes = in.read(b)) > 0) {
	      out.write(b, 0, numBytes);
	    }

	    in.close();
	    out.close();
	    fileSystem.close();
	  }

	  /**
	   * delete a directory in hdfs
	   * @param file
	   * @throws IOException
	   */
	  public void deleteFile(String file, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(file);
	    if (!fileSystem.exists(path)) {
	      System.out.println("File " + file + " does not exists");
	      return;
	    }

	    fileSystem.delete(new Path(file), true);

	    fileSystem.close();
	  }

	  /**
	   * create directory in hdfs
	   * @param dir
	   * @throws IOException
	   */
	  public void mkdir(String dir, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(dir);
	    if (fileSystem.exists(path)) {
	      System.out.println("Dir " + dir + " already exists");
	      return;
	    } else {
		    fileSystem.mkdirs(path);
		    fileSystem.close();
	    }
	  }
	  
	  /**
	   * delete directory in hdfs
	   * @param dir
	   * @throws IOException
	   */
	  public void rmdir(String dir, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(dir);
	    if (fileSystem.exists(path)) {
		    fileSystem.delete(path, true);
		    fileSystem.close();
	    } else {
		    System.out.println("Dir " + dir + " not exists");
	    }
	  }


	  /*
	  public static void main(String[] args) throws IOException {

	    if (args.length < 1) {
	      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
	          + " [<local_path> <hdfs_path>]");
	      System.exit(1);
	    }

	    FileSystemOperations client = new FileSystemOperations();
	    String hdfsPath = "hdfs://" + args[0] + ":" + args[1];

	    Configuration conf = new Configuration();
	    // Providing conf files
	    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/core-site.xml").getFile()));
	    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/hdfs-site.xml").getFile()));
	    // (or) using relative paths
	    //    conf.addResource(new Path(
	    //        "/u/hadoop-1.0.2/conf/core-site.xml"));
	    //    conf.addResource(new Path(
	    //        "/u/hadoop-1.0.2/conf/hdfs-site.xml"));

	    //(or)
	    // alternatively provide namenode host and port info
	    conf.set("fs.default.name", hdfsPath);

	    if (args[0].equals("add")) {
	      if (args.length < 3) {
	        System.out.println("Usage: hdfsclient add <local_path> "
	            + "<hdfs_path>");
	        System.exit(1);
	      }

	      client.addFile(args[1], args[2], conf);

	    } else if (args[0].equals("read")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient read <hdfs_path>");
	        System.exit(1);
	      }

	      client.readFile(args[1], conf);

	    } else if (args[0].equals("delete")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient delete <hdfs_path>");
	        System.exit(1);
	      }

	      client.deleteFile(args[1], conf);

	    } else if (args[0].equals("mkdir")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
	        System.exit(1);
	      }

	      client.mkdir(args[1], conf);

	    } else {
	      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
	          + " [<local_path> <hdfs_path>]");
	      System.exit(1);
	    }

	    System.out.println("Done!");
	  }
	  */
}


번호 제목 날짜 조회 수
227 [DBeaver 4.3.0]import/export시 "Client home is not specified for connection" 오류발생시 조치사항 2017.12.21 4561
226 hadoop cluster에 포함된 노드중에서 문제있는 decommission하는 방법및 절차 file 2017.12.28 5059
225 [2.7.2] distribute-exclude.sh사용할때 ssh 포트변경에 따른 오류발생시 조치사항 2018.01.02 4570
224 [Decommission]시 시간이 많이 걸리면서(수일) Decommission이 완료되지 않는 경우 조치 2018.01.03 10257
223 spark stream처리할때 두개의 client프로그램이 동일한 checkpoint로 접근할때 발생하는 오류 내용 2018.01.16 3537
222 Hadoop의 Datanode를 Decommission하고 나서 HBase의 regionservers파일에 해당 노드명을 지웠는데 여전히 "Dead regionser"로 표시되는 경우 처리 2018.01.25 3737
221 Could not compute split, block input-0-1517397051800 not found형태의 오류가 발생시 조치방법 2018.02.01 3290
220 spark-submit 실행시 "java.lang.OutOfMemoryError: Java heap space"발생시 조치사항 2018.02.01 3712
219 scala application 샘플소스(SparkSession이용) 2018.03.07 4366
218 Scala를 이용한 Streaming예제 2018.03.08 4302
217 Scala에서 countByWindow를 이용하기(예제) 2018.03.08 4838
216 update 샘플 2018.03.12 4546
215 HA(Namenode, ResourceManager, Kerberos) 및 보안(Zookeeper, Hadoop) 2018.03.16 2707
214 hadoop 클러스터 실행 스크립트 정리 2018.03.20 4816
213 HDFS Balancer설정및 수행 2018.03.21 3166
212 Components of the Impala Server 2018.03.21 3434
211 cloudera-scm-agent 설정파일 위치및 재시작 명령문 2018.03.29 3968
210 Cloudera가 사용하는 서비스별 디렉토리 2018.03.29 3357
209 Cloudera가 사용하는 서비스별 포트 2018.03.29 3878
208 Cloudera설치중에 "Error, CM server guid updated"오류 발생시 조치방법 2018.03.29 3044
위로