Hive에서 Druid로 주저 없이 데이터 적재하기[번외:airflow]

Hive에 있는 데이터 Druid에 적재하려면?

다음과 같은 다양한 삽질 방법이 존재한다.

  1. Hive에 metastore로부터 hdfs location을 파악한 후 orc format이나  parquet format이냐에 따라 그에 맞는 hadoop ingestion spec을 작성해야함. orc인 경우 struct type정보를 잘 파악해야하며,  parquet인 경우에는 avro schema를 읽기 때문에 orc에 비해 덜 번거롭다. 자세한 내용은 요기를 참고:  ORC ingestion spec , Parquest ingestion spec  -> 여기서 문제점 하나가 발생 partition column의 경우는 어떻게 ingestion하지?

    partition column이 dimension이나 metric 또는 timestamp로 들어가는 경우가 있어서 이를 위해서는 별도 패치가 필요하다. 다행히 내부 브랜치에서 이런 기능을 구현하여 사용중

  2. Hive CLI를 통해 hdfs에 데이터를 내린후 hdfs데이터를 ingestion한다. 좀 수고스럽긴 하지만 parquet, orc, partition column등의 문제를 회피할 수 있는 방법은 Hive에서 필요 데이터를 join이나 쿼리 연산으로 처리 후csv,tsv 타입으로 hdfs패스에 내리고 로딩하는 방식이다. 무척이나 번거롭다.
  3. 필자의 삽질 Hive thrift server를 띄우고 oozie를 통해 데이터를 질의한 후 hdfs write하고 이를 인덱싱한다. 이미 기존에 푸던 삽질 방식이다. 관련된 내용이 궁금하다면 여길 참고하길

1,2,3 뭘 해도 깔끔한 방법이 없다. 2번의 경우 SQL의 자유도를 주기 위해 SQL로 데이터를 내리고 druid에 적재한후 해당 데이터는 깔끔하게 삭제해 주어야 한다. 이번 글에서는 hive to druid를 한방에 끝낼 수 있는 airflow plugin에 대해 이야기 해보려 한다.

먼저 간단하게 나마 airflow에 대해 설명하고 넘어가자

Airflow

oozie나 azkaban은 좀 들어봤는데 airflow는 뭐지? Apache Airflow는 프로그래밍 가능한 워크플로우 플랫폼이다. Airflow의 가장 큰 특징은 task에 대한 DAGs(Directed Acyclic Graphs)를 지원하는 워크 플로우이다. 사용자가 정의한 DAG pipeline을 통해 task가 수행되며 workflow 는 python code로 구성되어 있어 파이선 개발자에겐 상당히 유용한 워크플로우 플랫폼 이다.

Airflow가 쿨한 이유 [https://airflow.apache.org]

  • Dynamic: Airflow pipeline은 code로 설정 가능하고 이는 dynamic 한 pipeline생성을 허용한다.
  • Extensible: task를 수행하기 위한 operator를 쉽게 정의 할 수 있고 library를 확장하여 환경에 적합하게 확장성을 가진다.
  • Elegant: 내부적으로 Jinja template engine을 사용하고 있어 수행하는 script의 매개변수를 손쉽게 관리할 수 있다.
  • Scalable: 임의의 worker를 띄우고 job queue를 관리하고 있어 규모있는 운영이 가능하다.

Druid Hook 과 Hive to Druid Operator

airflow에 대해 더 깊게 이야기 하고 싶지만 이 글의 주제가 airflow는 아닌 관계로 중간 생략하고 hive에서 druid로 바로 적재하는 방식에 대해 알아보자. 다음과 같이 두개의 파일만 참고하면 된다.

https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/druid_hook.py

https://github.com/apache/incubator-airflow/blob/master/airflow/operators/hive_to_druid.py

한줄 요약하면 druid hook을 통해 druid overload에 연결하고 hive to druid가 실제로 hive에 데이터를 로딩하여 druid로 인덱싱하게 만드는 operator이다. 아래 hive_to_druid의 일부 코드를 살펴보자. python code가 독해가 되시는 분은 술술 읽히실 것이다. hive cli hook을 통해 hive로 접속한 후 해당 쿼리를 질의하여 temp table을 생성하여 데이터를 넣는다. temp table이 생성되고 나서 해당 hdfs location을 읽어온 후 druid hook을 통해 hadoop ingestion을 수행하고 색인이 끝나면 table을 drop한다. 깔끔한 마무리이;;;;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def execute(self, context):
    hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
    self.log.info("Extracting data from Hive")
    hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
    sql = self.sql.strip().strip(';')
    tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()])
    hql = """\
    SET mapred.output.compress=false;
    SET hive.exec.compress.output=false;
    DROP TABLE IF EXISTS {hive_table};
    CREATE TABLE {hive_table}
    ROW FORMAT DELIMITED FIELDS TERMINATED BY  '\t'
    STORED AS TEXTFILE
    TBLPROPERTIES ('serialization.null.format' = ''{tblproperties})
    AS
    {sql}
    """.format(**locals())
    self.log.info("Running command:\n %s", hql)
    hive.run_cli(hql)
    m = HiveMetastoreHook(self.metastore_conn_id)
    # Get the Hive table and extract the columns
    t = m.get_table(hive_table)
    columns = [col.name for col in t.sd.cols]
    # Get the path on hdfs
    hdfs_uri = m.get_table(hive_table).sd.location
    pos = hdfs_uri.find('/user')
    static_path = hdfs_uri[pos:]
    schema, table = hive_table.split('.')
    druid = DruidHook(druid_ingest_conn_id=self.druid_ingest_conn_id)
    try:
        index_spec = self.construct_ingest_query(
            static_path=static_path,
            columns=columns,
        )
        self.log.info("Inserting rows into Druid, hdfs path: %s", static_path)
        druid.submit_indexing_job(index_spec)
        self.log.info("Load seems to have succeeded!")
    finally:
        self.log.info(
            "Cleaning up by dropping the temp Hive table %s",
            hive_table
        )
        hql = "DROP TABLE IF EXISTS {}".format(hive_table)
        hive.run_cli(hql)

해당 Operator를 수행하기 위한 python script는 다음과 같다. 그 전에 airflow에서 druid overload 와 hive관련 설정은 미리 되어 있어야 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
default_args = {
        'owner': 'jerry',
        'depends_on_past': False,
        'start_date': datetime(2017, 11, 10),
        'email': ['jerry@go.go'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      }
dag = DAG('daily_druid',
<code class="java plain">         schedule_interval=</code><code class="java string">'0 12 * * *'</code><code class="java plain">,</code>
         default_args=default_args)
HiveToDruidTransfer(task_id="load_dummy_test",
                    sql="select * from default.sales \
                            limit 10;",
                    druid_datasource="airflow_test",
                    intervals=["2017-11-01/2017-11-10"],
                    ts_dim="create_date",
                    dag=dag
                )

airflow의 dag 이름은 daily_druid이고 task이름은 load_dummy_test이다. 위의 내용처럼 load_dummy_test task는  default db의 sales table을 질의하여 timestamp filed는 create_date로 지정하여 druid에 인덱싱하는 task이다. airflow는 확장 가능한 모듈 구조로 되어 있기때문에 자세한 설명은 따로 없다. ㅎㅎ 자세한 파라미터는 https://pythonhosted.org/airflow/_modules/hive_to_druid.html 의 코드를 확인한다. 여기까지 읽었는데 Druid가 뭐지라고 생각하신다면 이미 낚이셨;;;; Druid 가 낯설다면 Druid 소개 글을 먼저 읽어보실 :)

만에 하나 Timezone변경 및 hadoop property 변경 등으로 hive_to_druid.py 의 ingest_query_dict를 변경해야 할 필요가 있다. 삽집을 해보시고 안되시면 연락을 May be force be with you!


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.