Hadoop에서 FileSystem 객체에 대한 Tip 몇가지

징검다리 연휴이기는 하지만 그래도 오늘은 평일이기 때문에 출근하신 분들을 위해  Hadoop에서 파일 처리시 간단한 팁 하나 공유합니다.

문제 정의

Hadoop 파일 처리하는 Application 개발 시 Hadoop File System의 동작 원리를 제대로 이해하지 못하면 데이터가 제대로 저장되지 않는 경우가 많습니다. Hadoop File API를 이용하여 파일에 데이터를 저장하는 경우 Hadoop이 일반적인 파일시스템과 다른 속성은 다음과 같습니다.

  1. Output Stream을 생성하거나 rename, mkdir 등을 수행하기 위해서 FileSystem 객체를 이용해야 한다.
  2. Output Stream에 Write 한다고 해서 바로 물리적인 File에 반영되는 것이 아니다.
  3. 비정상 종료 시에 2번 문제를 해결하기 위해 FileSystem 객체에는 ShutdownHook이 설정되어 있어 오픈된 모든 OutputStream을 자동으로 close 시킨다.

    1. 이 경우에도 커널 패닉 상황 등에 대해서는 대응이 불가

  4. FileSystem 객체는 Path의 URI가 동일한 경우 객체가 Cache된다.

이런 속성에서 다음과 같은 상황이 발생하는 경우를 생각해볼 수 있습니다.

  • 요구사항1:
    • 3-1 상황에 대비하기 위해 OutputStream의 write() 후 flush()를 호출해도 반영이 안된다.
    • write시 바로 HDFS에 반영하게 하여 커널 패닉이나 시스템의 물리적인 문제 시에도 안정적으로 저장하고 싶은 경우가 있다.
  • 요구사항2:
    • 3번과 같이 FileSystem이ShutdownHook에서 자동으로 close()가 되면 다음과 같은 상황에서 에러가 발생합니다.
      • 사용자가 만든 프로그램에서 ShutdownHook에 작성 중인 File을 모두 close 한 다음 특정 디렉토리로 이동하는 로직을 사용하는 경우 다음과 같은 에러 발생
        1
        2
        3
        java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
        at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1948)
        
  • 요구사항3:
    • 4번 상황에서 Cache된 객체가 아닌 다른 객체를 가져오고 싶은 경우

해결 방법

요구사항1

요구사항 1의 해결 방법은 HdfsDataOutputStream의 hsync()를 이용하는 합니다. FileSystem 객체의 create() 메소드를 이용하여 생성한 OutputStream은 Path의 URI의 protocal이 "hdfs://"로 되어 있으면  HdfsDataOutputStream입니다. 따라서 다음과 같이 캐스트를 한 다음 hsync()를 호출하면 됩니다.

1
2
HdfsDataOutputStream testOut = (HdfsDataOutputStream)currentOut;
testOut.hsync((EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)));

처음에는 FSDataOutputStream의 hsync() 를 호출하였는데 이렇게 해도 실제 NameNode의 길이에는 영향을 주지 않습니다. 따라서 사용자가 의도했던 flush()와 동일한 효과를 얻기 위해서는  위 코드와 같이 SyncFlag.UPDATE_LENGTH를 해줘야 합니다. 이렇게 하면 NameNode의 메타 정보까지 수정하기 때문에 파일의 데이터가 최종적으로 저장되었다고 할 수 있습니다.

이 코드는 성능에 문제가 없는 경우만 사용하는 것이 좋습니다. 이 연산은 DataNode, NameNode 모두에게 호출하기 때문에 쓰기 성능에 많은 영향을 줄 수 있습니다.

요구사항 2

요구사항 2의 해결 방법은 다음과 같이 ShutdownHook 내에서 FileSytem 객체를 다시 가져오고 이것을 사용하는 방법입니다. FileSystem의 ShutdownHook이 동작하면 Cache에서 객체를 삭제하기 때문에 다시 가져오면 오픈된 상태의 객체를 가져오게 됩니다. 이때 가져온 객체는 반드시 close() 처리를 명시적으로 해주는 것이 좋습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyApp {
  public static void main (String[] args) throws Exception {
    final Configuration conf = new Configuration(); 
    final FileSystem fs = FileSystem.get(conf);
    Runtime.getRuntime().addShutdownHook(new Thread() {

      public void run() {
        FileSystem fs = null;
        try { 
          FileSystem.get(conf);
          fs.rename(originPath, destPath);
        } catch (Exception e) {
          e.printStackTrace();
        }  
        } finally {
          if (fs != null) {
            fs.close();
          }
        }  
      }  
    });  
    OutputStream out = fs.create(originPath);
  }
}

ShutdownHook은 Thread로 동작하기 때문에 어떤 Thread가 먼저 동작하는지 알수는 없습니다. 따라서 위 코드는 어떤 경우에는 잘 실행되지만 어떤 경우에는 잘 실행되지 않을 수도 있습니다. 이 경우 요구사항 3번의 해결방법으로 해결할 수 있습니다.

요구사항3

요구사항 3번은 Cache로 부터 FileSystem  객체를 가져오지 않고 FileSystem 객체를 매번 생성하도록 요청하는 방법을 사용할 수 있습니다. Configuration에 아래 코드와 같이 "fs.hdfs.impl.disable.cache" 속성을 true로 주면 Cache를 사용하지 않습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Runtime.getRuntime().addShutdownHook(new Thread() {

  public void run() {
    conf.setBoolean("fs.hdfs.impl.disable.cache", true);
    FileSystem fs = null;
    try { 
      FileSystem.get(conf);
      fs.rename(originPath, destPath);
    } catch (Exception e) {
      e.printStackTrace();
    }  
    } finally {
      if (fs != null) {
        fs.close();
      }
    }  
  }  
});

이외에 다른 방법으로는 다음과 같이 "fs.automatic.close" 옵션을 false로 하여 ShutdownHook에서 자동으로 close() 하지 않게 하는 방법입니다.

conf.setBoolean("fs.automatic.close", false);

이 옵션을 설정한 경우에는 반드시 사용자가 정의한 ShutdownHook에서 안정적으로 저장하는 코드를 추가하는 것을 권장합니다.

마치며

Hadoop의 경우 버전이 계속 바뀌면서 진화하고 있는데 과거에 생각하고 있었던 동작 방식이 현재에는 그대로 적용되지 않는 경우도 많습니다. 필자도 FileSystem 객체가 Cache되는 것만 알고 있었는데 "fs.hdfs.impl.disable.cache" 옵션 처럼 cache와 상관없이 매번 객체를 생성하는 옵션이 있는지 이번 문제를 해결하면서 알았습니다. 오픈소스를 제대로 사용하기 위해서는 문서나 스택오버플로우를 참고하는 것도 좋지만 문제 해결 시 소스 코드 먼저 보면서 소스 코드에 대한 이해도를 높이는 것이 가장 좋은 방법이 아닐까 생각합니다.


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.