메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


Hadoop 2.7.x에서 사용할 수 있는 파일/디렉토리 관련 utils

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSUtils {
	/**
	   * create a existing file from local filesystem to hdfs
	   * @param source
	   * @param dest
	   * @param conf
	   * @throws IOException
	   */
	  public void addFile(String source, String dest, Configuration conf) throws IOException {

	    FileSystem fileSystem = FileSystem.get(conf);

	    // Get the filename out of the file path
	    String filename = source.substring(source.lastIndexOf('/') + 1,source.length());

	    // Create the destination path including the filename.
	    if (dest.charAt(dest.length() - 1) != '/') {
	      dest = dest + "/" + filename;
	    } else {
	      dest = dest + filename;
	    }

	    // System.out.println("Adding file to " + destination);

	    // Check if the file already exists
	    Path path = new Path(dest);
	    if (fileSystem.exists(path)) {
	      System.out.println("File " + dest + " already exists");
	      return;
	    }

	    // Create a new file and write data to it.
	    FSDataOutputStream out = fileSystem.create(path);
	    InputStream in = new BufferedInputStream(new FileInputStream(new File(
	        source)));

	    byte[] b = new byte[1024];
	    int numBytes = 0;
	    while ((numBytes = in.read(b)) > 0) {
	      out.write(b, 0, numBytes);
	    }

	    // Close all the file descriptors
	    in.close();
	    out.close();
	    fileSystem.close();
	  }

	  /**
	   * read a file from hdfs
	   * @param file
	   * @param conf
	   * @throws IOException
	   */
	  public void readFile(String file, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(file);
	    if (!fileSystem.exists(path)) {
	      System.out.println("File " + file + " does not exists");
	      return;
	    }

	    FSDataInputStream in = fileSystem.open(path);

	    String filename = file.substring(file.lastIndexOf('/') + 1,
	        file.length());

	    OutputStream out = new BufferedOutputStream(new FileOutputStream(
	        new File(filename)));

	    byte[] b = new byte[1024];
	    int numBytes = 0;
	    while ((numBytes = in.read(b)) > 0) {
	      out.write(b, 0, numBytes);
	    }

	    in.close();
	    out.close();
	    fileSystem.close();
	  }

	  /**
	   * delete a directory in hdfs
	   * @param file
	   * @throws IOException
	   */
	  public void deleteFile(String file, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(file);
	    if (!fileSystem.exists(path)) {
	      System.out.println("File " + file + " does not exists");
	      return;
	    }

	    fileSystem.delete(new Path(file), true);

	    fileSystem.close();
	  }

	  /**
	   * create directory in hdfs
	   * @param dir
	   * @throws IOException
	   */
	  public void mkdir(String dir, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(dir);
	    if (fileSystem.exists(path)) {
	      System.out.println("Dir " + dir + " already exists");
	      return;
	    } else {
		    fileSystem.mkdirs(path);
		    fileSystem.close();
	    }
	  }
	  
	  /**
	   * delete directory in hdfs
	   * @param dir
	   * @throws IOException
	   */
	  public void rmdir(String dir, Configuration conf) throws IOException {
	    FileSystem fileSystem = FileSystem.get(conf);

	    Path path = new Path(dir);
	    if (fileSystem.exists(path)) {
		    fileSystem.delete(path, true);
		    fileSystem.close();
	    } else {
		    System.out.println("Dir " + dir + " not exists");
	    }
	  }


	  /*
	  public static void main(String[] args) throws IOException {

	    if (args.length < 1) {
	      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
	          + " [<local_path> <hdfs_path>]");
	      System.exit(1);
	    }

	    FileSystemOperations client = new FileSystemOperations();
	    String hdfsPath = "hdfs://" + args[0] + ":" + args[1];

	    Configuration conf = new Configuration();
	    // Providing conf files
	    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/core-site.xml").getFile()));
	    // conf.addResource(new Path(HDFSAPIDemo.class.getResource("/conf/hdfs-site.xml").getFile()));
	    // (or) using relative paths
	    //    conf.addResource(new Path(
	    //        "/u/hadoop-1.0.2/conf/core-site.xml"));
	    //    conf.addResource(new Path(
	    //        "/u/hadoop-1.0.2/conf/hdfs-site.xml"));

	    //(or)
	    // alternatively provide namenode host and port info
	    conf.set("fs.default.name", hdfsPath);

	    if (args[0].equals("add")) {
	      if (args.length < 3) {
	        System.out.println("Usage: hdfsclient add <local_path> "
	            + "<hdfs_path>");
	        System.exit(1);
	      }

	      client.addFile(args[1], args[2], conf);

	    } else if (args[0].equals("read")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient read <hdfs_path>");
	        System.exit(1);
	      }

	      client.readFile(args[1], conf);

	    } else if (args[0].equals("delete")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient delete <hdfs_path>");
	        System.exit(1);
	      }

	      client.deleteFile(args[1], conf);

	    } else if (args[0].equals("mkdir")) {
	      if (args.length < 2) {
	        System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
	        System.exit(1);
	      }

	      client.mkdir(args[1], conf);

	    } else {
	      System.out.println("Usage: hdfsclient add/read/delete/mkdir"
	          + " [<local_path> <hdfs_path>]");
	      System.exit(1);
	    }

	    System.out.println("Done!");
	  }
	  */
}


번호 제목 날짜 조회 수
530 DataNode를 기동할때 "Block pool ID needed, but service not yet registered with NN" 오류 발생에 따른 조치사항 2018.05.28 4471
529 Cloudera Manager설치및 Uninstall 방법(순서) 2018.05.28 4703
528 우분투 서버에 GUI로 접속하기 file 2018.05.27 3651
527 Cloudera Manager재설치하는 동안 "Host is in bad health"오류가 발생하는 경우 확인/조치 사항 2018.05.24 3860
526 "You are running Cloudera Manager in non-production mode.." warning메세지가 나타나지 않게 조치하는 방법 2018.05.23 4147
525 oozie db변경후 재기동시 "Table 'oozie.VALIDATE_CONN' doesn't exist" 오류 발생시 조치방법 2018.05.23 3537
524 Embedded PostgreSql설정을 외부의 MariaDB로변경하기 [1] 2018.05.22 3613
523 CDH 5.14.2 설치중 agent설치에서 실패하는 경우 확인/조치 2018.05.22 3577
522 Cluster Install -> Provide Login Credentials에서 root가 아닌 다른 사용자를 지정하는 경우 "Exhausted available authentication methods"오류 발생시 조치방법 2018.05.22 3674
521 Cloudera Hadoop and Spark Developer Certification 준비(참고) 2018.05.16 5461
520 tar를 이용한 리눅스 백업 2018.05.13 3196
519 crypto관련 기생충 박멸 스크립트 2018.05.11 3627
518 Hue Job Browser의 Queries탭에서 조건을 지정하는 방법 2018.05.10 3208
517 Impala의 Queries탭에서 여러조건으로 쿼리 찾기 2018.05.09 4339
516 Cloudera의 API를 이용하여 impala의 실행되었던 쿼리 확인하는 예시 2018.05.03 5230
515 Toree 0.1.0-incubating이 Scala 2.10.4까지만 지원하게 되어서 발생하는 NoSuchMethod오류 문제 해결방법(scala 2.11.x을 지원하지만 오류가 발생할 수 있음) 2018.04.20 3192
514 우분투 16.04LTS에 Zeppelin 0.7.3설치 2018.04.18 4083
513 CentOS 7.x에 Jupyter설치 2018.04.18 4337
512 Apache Toree설치(Jupyter에서 Scala, PySpark, SparkR, SQL을 사용할 수 있도록 하는 Kernel) 2018.04.17 3934
511 우분투 16.04LTS에 Jupyter설치 2018.04.17 4023
위로