Hive Query 의 Hadoop Job Id (YARN) 알아내기

필요성

Hive Client 프로그래밍을 하다보면 Client 에서 실행한 Hive Query의  Job Id 값을 알아내야 하는 경우가  있습니다. 저의 경우에는 이전 글을 통해 공개되었던 Hadoop Data Warehouse 의 데이터 분석환경으로 Hive Query Language 를 수행할 수 있는 Zeppelin 과 매우 유사한 분석 툴을 직접 개발하여 사용하고 있는데, 이 툴에서 실행된 Job 의 Progress 를 보여주는 부분과 Job 을 Kill 하는 기능 구현을 위해 Job Id 가 필요했습니다(이전글 : SK텔레콤, Hadoop DW 와 데이터 분석환경 구축사례).

Client 의 상황은 두가지 경우를 생각해 볼 수 있는데, 첫번째는 CLI 에서 Hive Query 를 실행하는 경우이며, 두번째는 JDBC를 통해 Hive Thrift Server 에 컨넥션을 맺은 상태로 Thrift Server 를 거쳐 Hive Query 가 실행되는 경우가 될 수 있습니다.

먼저 첫번째 경우에는, Hive Git 에 CLI 를 통해 실행한 Job 에 대한 상태를 관리하는 테스트 코드가 있어서 관련성이 있지 않을까 싶어 살펴봤습니다.

테스트 코드를 살펴보면, UnitTest 를 위해 다시 MiniHS2 라는 클래스를 사용하고 있으며, 이 클래스는 AbstractHiveService 를 상속받고 4가지 모드(MR, TEZ, LLAP, DFS_ONLY)의 미니 클러스터 형태를 지원하는 Mock Object 클래스입니다.

어떤 부분에 대한 테스트를 하고 있는지 코드를 살펴보면,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Test 
  public void testFetchResultsOfLogAsync() throws Exception { 
    // verify whether the sql operation log is generated and fetch correctly in async mode. 
    OperationHandle operationHandle = client.executeStatementAsync(sessionHandle, sql, null); 
    // Poll on the operation status till the query is completed 
    boolean isQueryRunning = true; 
    long pollTimeout = System.currentTimeMillis() + 100000; 
    OperationStatus opStatus; 
    OperationState state = null; 
    RowSet rowSetAccumulated = null; 
    StringBuilder logs = new StringBuilder(); 
    while (isQueryRunning) { 
      // Break if polling times out 
      if (System.currentTimeMillis() > pollTimeout) { 
        break; 
      } 
      opStatus = client.getOperationStatus(operationHandle); 
      Assert.assertNotNull(opStatus); 
      state = opStatus.getState(); 
      rowSetAccumulated = client.fetchResults(operationHandle, FetchOrientation.FETCH_NEXT, 2000, 
          FetchType.LOG); 
      for (Object[] row : rowSetAccumulated) { 
        logs.append(row[0]); 
      } 
      if (state == OperationState.CANCELED || 
          state == OperationState.CLOSED || 
          state == OperationState.FINISHED || 
          state == OperationState.ERROR) { 
        isQueryRunning = false; 
      } 
      Thread.sleep(10); 
    } 
    // The sql should be completed now. 
    Assert.assertEquals("Query should be finished",  OperationState.FINISHED, state); 
    // Verify the accumulated logs 
    verifyFetchedLogPost(logs.toString(), expectedLogsVerbose, true); 
    // Verify the fetched logs from the beginning of the log file 
    RowSet rowSet = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 2000, 
        FetchType.LOG); 
    verifyFetchedLog(rowSet, expectedLogsVerbose); 
  } 

테스트 코드는 executeStatementAsync 메소드를 통해서 비동기 실행 후 실행된 Job 의 상태를 모니터링 하고 있는데, 실행된 쿼리의 OperationState 를 CLOSED, FINISHED, ERROR, CANCELED 로 관리하는 부분만 존재하고 Job 에 대한 Id 를 가져올 수 있는 API 는 보이지가 않습니다 (OperationState 의 Java Doc 을 살펴봐도 Job Id 를 가져오는 Method 가 없음).

두번째 경우는, Client 가 JDBC 를 통해 Hive Thrfit Server 를 거쳐 Hive Query를 실행하는 경우 입니다.

직접 개발해서 사용하고 있는 Client 쿼리 실행환경이 웹을 기반으로 하는 툴이다 보니, 당연히 실행된 쿼리를 Kill 할 수 있는 기능 구현이 필요했으며 현재는 JDBC Driver 에서 제공하는 cancel 함수 구현을 통해 필요한 기능을 사용하고 있으나 이 방법으로는 Job 에 대한 정확한 Progress 정보를 제공할 수가 없습니다.

여러가지 방법을 고민한 끝에 찾은 방법은 아래와 같습니다.

솔루션

첫번째와 두번째 케이스 모두, hive.log 파일에 던져진 쿼리의 로그가 남게되어 있습니다.

hive 를 통해 아래와 같은 쿼리를 던지고 hive.log 에는 어떤 로그가 찍히는지 확인해 봤습니다.

1
2
3
SELECT    branch_id
FROM      cm_branch
GROUP BY  branch_id;

위의 쿼리에 대해서 아래와 같은 로그가 남습니다.

1
2
3
4
5
6
7
8
9
10
11
2016-07-12 13:25:34,649 INFO  [main]: log.PerfLogger (PerfLogger.java:PerfLogBegin(121)) - <PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>
...
...
2016-07-12 13:25:34,651 INFO  [main]: parse.ParseDriver (ParseDriver.java:parse(185)) - Parsing command: SELECT branch_id FROM cm_branch GROUP BY branch_id
...
...
2016-07-12 13:25:34,724 INFO  [main]: ql.Driver (Driver.java:execute(1328)) - Starting command(queryId=hadoop_20160712132534_7c3b5216-9887-4daf-910f-739331229141): SELECT branch_id FROM cm_branch GROUP BY branch_id
2016-07-12 13:25:34,724 INFO  [main]: ql.Driver (SessionState.java:printInfo(951)) - Query ID = hadoop_20160712132534_7c3b5216-9887-4daf-910f-739331229141
2016-07-12 13:25:34,776 INFO  [main]: client.TezClient (TezClient.java:submitDAGSession(405)) - Submitting dag to TezSession, sessionName=HIVE-eb526e38-56fc-4729-b7b7-0fe0c0adf488, applicationId=application_1464848686584_171573, dagName=hadoop_20160712132534_7c3b5216-9887-4daf-910f-739331229141:3
...
...

중간에 로그를 생략했지만, 724번째줄 로그를 보시면 queryId (hadoop_20160712132534_7c3b5216-9887-4daf-910f-739331229141)와 실행되었던 쿼리(SELECT branch_id FROM cm_branch GROUP BY branch_id)가 찍힙니다.

776번째 로그를 보시면 724번째 로그에서 생성되었던 queryIddagName로 찍혀있고, Hadoop 에 Submitting 된 applicationId(application_1464848686584_171573) 를 찾아낼 수 있습니다.

결국 hive.log를 파싱하고 위 두개의 로그를 연결해서 JobId(applicationId) 를 알아내야 합니다.

로그를 파싱할때에는 내가 던진 쿼리와 동일한 쿼리가 다른 사람에 의해 던져질 수도 있기 때문에, 쿼리를 제출할 때 주석처리를 한 영역에 유니크한 아이디를 넣는 방법으로 구분하는 약간의 응용(?)이 필요합니다.

아래와 같은 방식으로 유니크한 아이디를 분석환경에서 쿼리를 제출할 시점에 생성하여 주석으로 넣어주는 방법이 있습니다.

1
2
-- 유니크아이디(사용자계정명 등 비지니스로직을 반영해서 생성함)
SELECT * FROM tb1;

Hortonworks 매니저의 코드에서는 시간정보를 가지고 Parsing 을 해서 알아낸다고 하는데, 개인적으로는 위에서 처럼 ID 를 주석으로 남겨주는 것이 더 좋은 방법으로 생각됩니다.

결론

Client API 에서는 실행된 Hive Job 에 대한 Hadoop Job Application Id 를 알아낼 수 없습니다. 따라서 hive.log 를 파싱 처리하는 부분의 개발이 필요하며 이를 통해 Client 에서 Progress 의 구현 및 Job Management 상세 정보 확인을 위한 url 링크 추가 등을 구현할 수 있겠습니다.


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.