Presto Kafka connector 개선 실패기

이 글은 Presto 관련해서 계속 연재되는 글 중 하나입니다. 이전글을 못보신 분들은 다음 글을 참고하세요.

Presto SQL을 이용하여 Kafka topic 데이터 조회하기

Presto, Zeppelin을 이용한 초간단 BI 시스템 구축 사례(1)

Presto, Zeppelin을 이용한 초간단 BI 시스템 구축 사례(2)

다 읽고 실망하실 분들을 위해 미리 밝히지만 이글은 실패한 삽질기 입니다.

shoveling

Presto는 다양한 저장소에 저장된 다양한 형태의 파일 포맷에 대해 SQL을 분산/병렬로 실행할 수 있는 기능을 제공하는 솔루션입니다. Presto에서 지원하는 저장소 중 Kafka 라는 솔루션이 있습니다. 엄밀하게 말하면 Kafka는 저장소라기 보다는 Queue 솔루션입니다. 일반적인 Queue 솔루션과는 조금 다른 개념을 가지고 있으며 Kafka 역시 분산 클러스터로 구성할 수 있어 대용량, 고가용, 고성능에 적합한 Queue 솔루션이라고 할 수 있습니다.

이번 글에서는 Presto에서 제공하는 기본 Kafka Connector에서 다음과 같은 상황에서의 성능을 빠르게 하기 위해 처리했던 내용을 공유하고자 합니다.

  • Kafka에는 최신 로그 데이터가 저장됨. 사용자는 최근 로그 몇 개를 대상으로 데이터를 조회하고 싶어함
  • Kafka의 Topic의 Retention 설정은 3일이라서 Topic의 처음부터 읽는 방식으로는 성능이 나오지 않음

이런 성능 요구사항을 위해 다음과 같은 초기 개념을 가지고 접근 방법을 모색하였습니다.

  • 현재의 Presto Kafka Connector 구현에서는  Consumer가 처음부터 topic을 읽음
  • Consumer가 특정 offset 부터 읽게 되면 읽기 성능이 향상됨
  • Consumer는 offset을 어떻게 알 수 있을까?

이것을 위해 처음 떠오른 생각은 Kafka도 Partition으로 Topic이 분리되어 있고 Presto Kafka 테이블에서는 자동으로 _partition_id 필드가 추가 되기 때문에 Kafka 테이블은 모두 Partition이 되어 있다고 생각하였습니다.  그래서 첫번째로 알아본 것이 Partition Pruning 기법입니다. 그 다음이 Storage 계층까지 Filter를 내려보내는 기법인데 이 부분은 아직 해결하지 못했습니다.

Kafka 특징

Kafka는 Queue 솔루션이기는 하지만 분산 Queue 처리 기능과 빅데이터 시스템에 적합하도록 일반 Queue와는 다른 몇가지 독특한 특징이 있습니다.

  • 데이터는 Topic 이라는 단위로 구분
    • 포맷이 다른 레코드를 Queue에 저장할 경우 Topic 을 생성하여 다르게 구분할 수 있습니다. Topic은 다른  솔루션에서는 Queue에 대응하는 개념이고, 데이터베이스의 경우 테이블에 대응하는 개념이라고 할 수 있습니다.
  • Topic은 분산 저장을 위해 Partition 으로 분리
    • 하나의 Kafka 클러스터는 보통 여러 대의 서버로 구성되기 때문에 하나의 Topic은 한 서버에 저장되지 않습니다. 이것은 대용량 데이터를 처리할 수 있는 Queue 시스템을 만들기 위해 필수 기능이라고 할 수 있습니다.
    • 이렇게 하나의 Topic의 여러개로 분리하는 개념이 있는데 이렇게 분리된 단위를 Partition 이라고 합니다.
  •  하나의 Partition은 여러개의 Segment로 구성
    • Kafka가 Queue 데이터를 파일에 저장하는 최소 단위를 Segement 라고 합니다. Segment는 보통 크기로 롤링되거나 시간으로 롤링됩니다. 기본 값은 1GB 로 설정되어 있어 1GB 크기가 넘게되면 새로운 Segment가 생성됩니다.
  • Queue 데이터 Get 이후에도 데이터는 Queue에 존재
    • 일반적인 Queue 시스템에서는 Queue의 Client(보통은 Consume 라고 부름)가 Queue에서 Element 하나를 가져오면 Queue에서는 데이터가 삭제됩니다.
    • 하지만, Kafka에서는  하나의 Consume가 Topic(Queue)의 데이터를 가져가도 Topic에서는 데이터가 사제되지 않습니다. 데이터의 삭제는 Kafka 서버에서 관리되며 각 Topic은 retention 값을 가지고 있으며 이 시간을 이전 Element는 자동으로 삭제됩니다. 따라서 Consume는 자신이 어떤 Element까지 처리했는지를 기억하고 있어야, Consume가 재시작될 때 중복되어서 처리하는 것을 방지할 수 있습니다.
    • 얼핏 보면 일반적인 Queue 시스템에 비해 사용하기가 더 복잡하고 번거롭다고 생각할 수 있지만 최근의 데이터  처리 스타일을 보면 이런 방식이 더 좋은 방식이라는 것을 알 수 있습니다.
    • 예를 들어, 로그 데이터가 Kafka를 이용하여 수집되고 있고, 이 로그 데이터를 실시간으로 분석해야 하고, 안정적으로 HDFS와 같은 저장소에 저장해야 하는 경우라면 실시간 분석을 위한 Consume와 HDFS에 저장을 하는 Consume을 별도로 만들고 각 Consume가 자신이 처리한 Offset을 관리하면 각 처리 특성에 맞는 전략을 선택할 수 있기 때문에 쉽게 시스템을 구성할 수 있습니다.
    • 이 기능이 최근 Kafka를 많이 이용하는 이유 중의 하나라고 생각합니다.

Partition Pruning

Select 절의 Filter 조건문과 테이블의 파티션 정보를 이용하여 조건문에 포함되지 않는 파티션은 SCAN(읽기 연산) 대상에서 제외시키는 기능입니다. 대용량을 저장하는 테이블의 경우 대부분 파티션을 설정해서 사용하고 있기 때문에 Partition Pruning 기능은 성능 향상에 아주 중요한 기능이라고 할 수 있습니다.

Oracle, MySQL과 같은 전통적인 데이터베이스 솔루션에서도 파티션은 널리 사용되고 있으며 Oracle,  MySQL 모두 다음과 같은 파티션 타입을 지원합니다.

  • Range 파티션
    • 파티션 키를 범위로 나누어 파티션, 파티션은 연속적인 값으로 분리됨
      • 예: P1 less than 100, P2 less then 200, ...
  • Hash 파티션
    • 파티션 키를 Hash 함수를 이용하여 반환되는 값을 이용한 파티션
      • partition BY hash (cust_id) partitions 4;
  • List 파티션
    • 파티션은 비연속된 값으로 구분될 수 있음. 파티션 키가 특정 값에 속하는지 여부로 파티션에 소속, 각 List에는 값이 IN 절로 여러개 정의할 수 있음
      • partition BY list (month)  ( partition winter VALUES ('DEC', 'FEB', 'JAN'), partition spring VALUES ('MAR', 'APR', 'MAY'), partition unknown VALUES (DEFAULT));
  • Composite 파티션
    • 여러개의 파티션을 복합해서 사용하는 파티션

Presto는 기본 Catalog를 제공하지 않으며 일반적으로는 Hive를 기본 Catalog로 이용합니다. Hive의 파티션은 앞에서 설명한 데이터베이스의 파티션과는 조금 다른 형태의 파티션을 제공하고 있습니다. 파티션 컬럼의 특정 값으로 파티션을 지정하는 방식입니다. 예를 들어 Log 테이블의 경우 Date 기준으로 파티션을 만드는 경우 다음과 같이 테이블을 생성합니다.

1
2
3
4
5
6
7
create table t_log (
  path string,
  status int,
  user_agent string,
  access_time string
)
partitioned by (date string)

데이터베이스 솔루션에서는 파티션 대상이 되는 컬럼이 보통 컬럼 정의에 포함되고, partition by 절에서 해당 컬럼을 지정하지만 Hive 에서는컬럼 정의에서 정의하지 않고, partitioned by 절에서 직접 정의한다. 따라서 데이터의 Type 까지 partitioned by 절에 추가해야 한다. 그리고 partition type에 대한 정의도 하지 않고 있다. 파티션의 종류가 하나이기 때문에 지정할 필요가 없기 때문이다.

위와 같이 파티션이 지정된 테이블은 실제 데이터 저장이 다음과 같은 디렉토리 구조를 가져야 한다. 파일명은 어떤 파일명이라도 상관 없습니다.

  • {hive_warehouse_root}/t_log/date=20160701/log.dat
  • {hive_warehouse_root}/t_log/date=20160702/log.dat
  • {hive_warehouse_root}/t_log/date=20160703/log.dat

보는 것 처럼 파티션은 하나의 지정된 값으로만 설정할 수 있으며, date 컬럼에 대한 값은 디렉토리 명에서 "컬럼명=Value" 문자열을 파싱하여 얻을 수 있습니다. 위와 같은 디렉토리 구조에서 "select date from t_log" 질의 결과는 데이터 파일을 읽을 필요도 없이 디렉토리 정보만으로 [20160701, 20160702, 20160703]이 됩니다.

Kafka 데이터 조회 성능 향상 아이디어(1)

Presto에서 기본적으로 제공하는 Kafka Connector는 질의 시 모든 Partition에 대해 특정 Offset부터 읽을 수 있는 기능이 없으며 처음부터 끝까지 Scan 하게 구현되어 있습니다. 이런 구현에서는 다음 질의와 같이 최근 레코드 몇개만 조회하는 질의인 경우에도 전체 데이터를 Scan 해야 하는 문제가 있습니다.

1
2
3
4
select *
from kafka.popit.log_topic
order by access_time desc
limit 100

log_topic의 retention 주기가 1일 이상만 되어서 이 질의의 성능은 사용자가 예상했던 성능을 만족시킬 수 없습니다. 가장 이상적인 방법은 다음과 같습니다.

  • Scan Task는 log_topic의 각 partition 별로 생성
  • Scan Task에서 사용하는 Kafka Consume client의 시작 offset = partition의 max offset -100(limit 절의 값) 을 사용
  • 각 Scan Task의 결과를 취합하여 100개만 반환

하지만 Presto에서는 이렇게 구현하기 어렵습니다. 이유는 질의 실행 계획 수립 시 각 스토리지의 Connector에게 Table의 정보, Split 정보를 요청할 때 order by 절이나 limit 절에 대한 정보는 전달하지 않고 where 절의 정보만 전달하기 때문입니다.

Kafka 데이터 조회 성능 향상 아이디어(2)

두번째 생각한 방법은, Partition의 offset 정보를 질의를 실행하는 사용자에게 보여주고 그 정보를 이용하여 사용자가 where 조건에 offset 정보를 제공하는 방법입니다.

  • Kafka topic meta 정보 조회 기능을 이용하여 Presto에 kafka_topic_meta 테이블을 기본 제공 테이블로 제공
  • 사용자는 다음과 같은 질의를 이용하여 Partition 정보 조회
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    presto:kafka> select * from kafka._topic_.topics;
    _topic_name     | _partition_id | _partition_offset | _segment_end
    -----------------+---------------+-------------------+--------------
    popit.test_topic |             0 |              1000 |          500
    popit.test_topic |             0 |              1000 |         1000
    popit.test_topic |             1 |              2500 |          800
    popit.test_topic |             1 |              2500 |         1650
    popit.test_topic |             1 |              2500 |         2500
    (5 rows)
  • Partition 정보에서는 Partition, Segment 정보를 보여주고 있는데 위의 예제에는 popit.test_topic은 두개의 파티션을 가지고 있으며 0번 파티션의 현재 max offset는 1000이며, segment는 두개이고 첫번째 segment의 max offset은 500이고, 두번째 segment의 max offset은 1000이다.
  • 이 정보를 이용하면 사용자는 다음과 같은 최근 100개의 데이터를 빠른 응답 속도로 를 조회할 수 있음
  • 1
    2
    3
    4
    5
    6
    7
    8
    select *
    from kafka.popit.log_topic
    where (
    (_partition_id = 0 and _partition_offset > 900) or
    (_partition_id = 1 and _partition_offset > 2400)
    )
    order by access_time desc
    limit 100
  • 즉 사용자가 각 파티션별의 max offset 값 - limit 절의 값을 계산해서 where 절에 filter로 지정하는 방식

Hive의 경우 테이블 생성 시 명시적으로 Partition 컬럼을 지정하지만, Presto Kafka connector는 자동으로 _partition_id라는 파티션 컬럼이 자동으로 생성합니다. Hive에서 Partition Pruning을 하기 위해서는 where 조건에 반드시 파티션 컬럼 조건이 포함 되어야 하듯이, Kafka에서도 동일하게 partition_id 컬럼을 where 절에 포함하도록 하는 아이디어 입니다.

실제 구현도 이 두번째 아이디어를 이용하여 구현했습니다. 결과적으로는 Partition Pruning은 성공했지만 성능 향상에 있어서는 개선의 여지를 남겨 두게 되었습니다. 이유는 이 글의 마지막에 설명하도록 하겠습니다.

Kafka 데이터 조회 성능 향상 아이디어(3)

두번째 방법으로은 너무 복잡하고, Filter를 Storage 계층까지 내려보내야 하는 구현도 해야 하기 때문에 다른 방법을 더 소민하게 되었습니다.

세번째 방법은 Presto의  Session 별로 Property 를 설정하는 기능을 이용하는 것입니다.

1
set session name = expression

이 기능을 이용하여 최근 질의 실행 시 최근 몇개로부터만 조회하겠다는 정보를 Planner에 제공하고 이 정보와 Topic의 Partition meta 정보를 이용하여 Consumer 오픈 시 시작 offset을 지정하는 방법입니다. 질의 실행 시 다음과 같이 입력하고, 질의를 실행할 수 있습니다.

1
2
3
4
5
set session kafka.max.recent.records=100;
select *
from kafka.popit.log_topic
order by access_time desc
limit 100;

logic_topic 의 파티션 정보가 두번째 방법과 같은 경우에는 두개의 Task(각 파티션별로 하나)에서 Consumer가 실행되며, 각 Consumer는 scan 대상이 된 partition을 오픈하면서 시작 offset 값으로 다음 값을 사용할 수 있습니다.

  • Consumer의 start offset = "partition의 max-offset" - "kafka.max.recent.records 설정 값"
  • 0 번 Consumer = 1000 -100
  • 1 번 Consumer = 2500 - 100

이 방법이 사용자에게 불편을 최소화하면서 원하는 성능을 낼 수 있는 가장 현실적인 방법이라고 할 수 있습니다.  이 방법은 아직 기능 구현 중에 있습니다.

Presto의 Hive Partition Pruning

필자가 필요로 하는 기능은 Kafka Partition Pruning 이지만 이를 구현하기 위해 먼저 Presto에서 Hive Partition Pruning을 어떻게 하고 있는지 알아 보도록 하겠습니다.

  • Presto는 Plan을 세우기 위해 질의에 있는 각 테이블의 Connector로 TableLayout 정보를 요청함
  • Hive Connector에서는 Hivemetadata Class의getTableLayouts() 메소드에 구현되어 있음
  • getTableLayouts() 메소드에서는 다음 단계로 Partition Pruning 수행
    • 테이블의 전체 파티션 목록을 가져옴(getFilteredPartitionNames() 메소드에서 반환됨)
    • 각 파티션 명(date=20160701 형태)에 대해서 where 절의 Partition 컬럼과 관련된 Predicate 를 Evaluation하여 true 를 반환하면 Scan 대상 파티션 목록에 추가
    •  TableLayout에 파티션 컬럼 및 Scan 대상 파티션 정보를 추가하여 Return
  • Physical Plan을 수립시 Connector의 SplitManager에게 getSplit() 메소드를 호출하여 Split 정보를 전달받아 분산 Task 구성.
    • Hive Connector는 HiveSplitManager 클래스에서 이 기능을 수행하며, getSplit()의 메소드에 getTableLayouts()에서 전달받은 TableLayout 정보가 인자로 넘겨진다. 이 인자에 Partition 목록있고 이 목록을 이용하여 Scan 대상 Split 정보를 생성하여 반환한다.

실제 HiveMetadata의 getTableLayouts()에서 Partition Pruning 기능은 다음과 같이 구현되어 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 1. Partition Pruning 검사 대상이 되는 파티션 목록을 찾음
List<String> partitionNames = 
    getFilteredPartitionNames(metastore, tableName, partitionColumns, effectivePredicate);
// 2. Partition name에서 value를 추출하여 where 절의 partition 컬럼이 있는 predicate clause에 대해 
//    ColumnName, Value 정보를 이용하여 evaluation()을 수행하여 true를 반환하면 Scan 대상이 되고,
//    false를 반환하면 제외 
// do a final pass to filter based on fields that could not be used to filter the partitions
ImmutableList.Builder<HivePartition> partitions = ImmutableList.builder();
for (String partitionName : partitionNames) {
    Optional<Map<ColumnHandle, NullableValue>> values = 
        parseValuesAndFilterPartition(partitionName, partitionColumns, effectivePredicate);
    if (values.isPresent()) {
        partitions.add(new HivePartition(tableName, compactEffectivePredicate, 
          partitionName, values.get(), bucket));
    }
}

Kafka Partition Pruning만으로 성능 향상의 한계

Hive와 비슷한 방법으로 Kafka의 Partition Pruning 기능을 구현하였는데 방법은 다음과 같습니다.

  • 기본적으로 제공하는 Presto Kafka Connector에서 Presto의 Scan Task 단위는 Segment이다.
  • Segment 목록을 가져온 후, 사용자가 Filter 조건에 입력한 _partition_id와 _offset 값을 이용하여 조건에 맞는 Segment 만 찾아내어 반환한다.
  • Scan Task는 전달된 Segment만을 이용하여 생성한다.

이렇게 구현하면 필요한 Segment만 Scan 하기 때문에  Topic의 Retention 주기가 길어도 1 Task 당 1GB 정도의 데이터만 읽으면 되기 때문에 많은 성능 향상 효과를 볼 수 있습니다. 하지만, 최근 100개 정도의 데이털르 조회하는데 5 ~ 10초 정도 소요된다는 것은 시스템 리소스를 낭비하는 것이라고 볼 수 있습니다. 여기서 마지막 남은 성능을 높이는 방법은 Filter Push Down 입니다. 

사용자가 where 절에 "_partition_id = 0 and _partition_offset > 900" 이라고 입력하면, 위의 예제에서는 0번 파티션의 500 ~ 1000 offset이 저장되어 있는 Segment가 Pruning 절차를 거쳐 선택되게 됩니다. 여기서 실제 Consumer 객체가 생성될 때 offset을 900번부터 읽을 수만 있으면 성능이 훨씬 더 빨리지게 되는데, 이렇게 _partition_offset > 900 이런 Filter 조건을 Storage 계층까지 내려 보내게 되면 Kafka Consumer 생성 시 시작 offset을 900으로 지정하여 오픈할 수 있습니다. 이 기능까지 구현되어야 제대로된 성능이 나오는 Kafka Connector가 완성됩니다.

결론

여기까지 오면서 처음에 원했던 기능은 Partition Pruning 기능 아니라, 최근 몇개의 데이터를 빠르게 조회하는 기능이었습니다. 그리고 Kafka의 Partition이 사용자 정의가 아닌 임의로 만들어 지고, Partition 갯수가 많으면 Where 조건에 Partition filter 조건도 많아져야 하기 때문에 Kafka의 Partition Pruning은 맞지 않다고 생각했습니다.

최종적으로는 성능 향상을 위해서는 3번째 방안이 구현, 사용자 질의 모두 심플하기 때문에 최종적으로는 세번째 방법으로 결정하였으며, 현재 구현 중에 있습니다. 이미 두번째 방법을 구현 하면서 많은 부분이 구현되어 있기 때문에 오래 걸릴 것으로 예상하지는 않습니다.

이런 삽질기를 영어로 작성하여 Presto 커뮤니티에 공유하여 의견을 받으면서 Kafka Connector에서 최근 몇개의 데이터만 조회하는 질의의 성능을 높이는 방법을 찾으면 좋겠다는 생각도 해보았습니다. 제 영어가 짧은 것을 한탄하며, 이제 개발자의 Second Programming Language는 영어라야 한다는 생각이 들었습니다.


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.