Kafka 0.9 Consumer 클라이언트 소개

다음 기사는 Kafka 0.9 Consumer 클라이언트 소개 글이다. 0.9 버전의 변경사항과 Kafka Consumer의 기본 개념을 학습할 수 있다.

원본: http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client

번역: http://free-strings.blogspot.kr/2016/05/09.html

주요 변경 사항

  • (“simple”과  “high-level” consumer )통합 된 새로운 Consumer 클라이언트
  • Kafka 0.9 클라이언트는 Scala Runtime과 ZooKeeper 의존성 제거
  • 주키퍼와의 통신 최소화

Consumer 예제

Unix의 poll과 유사한 API 스타일로 지정된 시간까지 대기하며, 메시지를 수신한다. 여기서는 Long.MAX_VALUE 만큼 대기하며, 메시지를 수신 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ConsumerLoop implements Runnable {
  private final KafkaConsumer<String, String> consumer;
  private final List<String> topics;
  private final int id;
  public ConsumerLoop(int id,
                      String groupId, 
                      List<String> topics) {
    this.id = id;
    this.topics = topics;
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put(“group.id”, groupId);
    props.put(“key.deserializer”, StringDeserializer.class.getName());
    props.put(“value.deserializer”, StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<>(props);
  }
  @Override
  public void run() {
    try {
      consumer.subscribe(topics);
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("partition", record.partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out.println(this.id + ": " + data);
        }
      }
    } catch (WakeupException e) {
      // ignore for shutdown 
    } finally {
      consumer.close();
    }
  }
  public void shutdown() {
    consumer.wakeup();
  }
}

Executors.newFixedThreadPool를 이용해 numConsumers(3)만큼 Thread와 Consumer를 생하고, "consumer-tutorial" topic을 소비하는 예제이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) { 
 int numConsumers = 3;
 String groupId = "consumer-tutorial-group"
 List<String> topics = Arrays.asList("consumer-tutorial");
 ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
 final List<ConsumerLoop> consumers = new ArrayList<>();
 for (int i = 0; i < numConsumers; i++) {
  ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
  consumers.add(consumer);
  executor.submit(consumer);
 }
 Runtime.getRuntime().addShutdownHook(new Thread() {
   @Override
   public void run() {
    for (ConsumerLoop consumer : consumers) {
     consumer.shutdown();
    } 
    executor.shutdown();
    try {
     executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
     e.printStackTrace;
    }
   }
 });
}

실행 결과로, Thread 0, 1, 2가 각각의 파티션에서 데이터를 읽는 것을 확인 할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}

Consumer Liveness

Consumer 그룹 안에, Consumer는 Topic의 하나의 파티션을 처리하며, 이를 위해 0.9 버전 부터는 그룹 Coordinator에서 Group Lock 기능을 제공 한다. 이는 session.timeout.ms 값만큼 Heartbeat가 그룹 Coordinator에 도착하지 않으면 해당 Consumer가 고립된 상황으로 간주, 다른 Consumer에게 파티션을 할당 한다. Hearbeat는 poll을 호출할 때만 발송 된다.

이 값의 기본값은 30초이며,  Consumer가 충분히 메시지를 처리할 수 있는 시간이어야 한다. 메시지 처리 시 30초 이상이 걸리면, 해당 값을 조절해야 한다.

Delivery Semantics

초기 오프셋은 auto.offset.reset 설정에 의해 결정(기본값 largest: 신규 메시지, smallest: 저장 된 최초 메시지)되며, auto.commit.interval.ms 값에 따라 자동으로 오프셋을 커밋 한다. 만약 수동으로 커밋을 관리하려면, enable.auto.commit 값을 false로 설정 한다.

1
props.put("enable.auto.commit", "false");

아래 예는 성능을 고려하여 파티션 단위로 오프셋을 수동 커밋하는 예제이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
    consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
                             Exception exception) {
        if (exception != null) {
          // application specific failure handling
        }
      }
    });
  }
} finally {
  consumer.close();
}

더 상세한 내용은 원본 또는 번역글을 참고하기 바란다.


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.