Golang에서 카프카 컨슈머 그룹과 재시도로 결과적 일관성 구현하기

필자는 마이크로서비스 아키텍처Microservice Architecture(이하 MSA) 기반으로 커머스 시스템을 만들고 있다. MSA에서 어려운 점 중 하나는 데이터 일관성을 유지하는 일이다. 예를 들어 주문 프로세스(결제, 주문 원장 기록, 재고 차감 등등)는 모두 성공하거나 하나라도 실패한다면 이전 상태로 되돌아가야 한다.

모놀리틱 아키텍처Monolithic Architecture와 관계형 데이터베이스를 사용하는 전통적인 시스템은 데이터베이스 트랜잭션을 사용하여 데이터 일관성을 보장한다. 반면 마이크로서비스마다 데이터베이스를 따로 사용하는 MSA는 데이터베이스 트랜잭션만으로 보장이 안된다. 왜냐하면 통합된 하나의 데이터베이스를 사용하지 않기 때문이다. 물론 분산 트랜잭션을 사용하여 여러 개의 데이터베이스를 하나의 트랜잭션으로 제어할 수도 있겠지만 긴 트랜잭션Long trasaction 문제가 있고[1] 이 기종 데이터베이스에서는 사용할 수 없다.

MSA에서 일관성은 어떻게 유지할 수 있을까? 필자 생각에는 결과적 일관성Eventual consistency을 사용하는 것이다. 예전에 운영했던 주문 시스템에서의 일이다. 주문 프로세스의 끝에 구매자에게 휴대전화 문자나 SNS로 '주문 완료' 알림을 보내는 것이 있었다. 알림을 담당하는 시스템은 별도로 존재하고 있어서 데이터베이스 트랜잭션으로 제어하기 어려웠다. 그렇다고 알림이 실패하면 주문을 롤백 할 수는 없는 노릇이었다. 문제를 해결하기 위해서 2단계로 처리했는데 먼저 주문 트랜잭션과 함께 알림 정보를 데이터베이스 테이블에 넣고나서 트랜잭션과 상관없이 주기적으로 배치Batch 프로그램으로 실행하여 알림 테이블 정보를 읽어 알림 시스템과 연동했다. 이렇게 단기적으로 일관성을 잃더라도(주문은 성공했지만 알림은 바로 가지 않을 수 있다) 결국에는 일관성을 유지하는 모델을 결과적 일관성이라고 한다. 데이터베이스 트랜잭션과 메시징을 하나의 트랜잭션으로 다루는 Transactional Outbox Pattern 역시 결과적 일관성을 이용한 것이다.[2]

결과적 일관성을 구현하는 방법 중 하나는 비동기 이벤트를 사용하는 것이다. 아래 그림은 비동기 이벤트를 기반으로 주문 처리하는 것을 도식화한 것이다. Mall은 구매자의 주문 정보(상품, 배송지, 결제 정보 등)를 OrderService에 전달(그림 3번)하고 OrderService는 구매 주문을 만들고 구매 주문 상태 메시지를 메시징 큐에 발행한다.(그림 4번) 주문 상태 이벤트에 관심이 있는 마이크로서비스들은 이를 구독(그림 5번)하여 주문 상태에 따라 처리를 한다.

image-20200625-231732

이벤트를 구독하는 서비스는 처리하다가 오류가 나는 경우에도 계속 재시도[3] 하여 결국(언젠가)은 처리하는 것이다.

물론 4번과 5번 영역을 RESTRepresentational state transfer로 구현할 수도 있다. 구현의 난이도는 상대적으로 낮고 이해하기 쉬우며 직관적이라는 장점이 있지만 동기 호출이 갖는 문제(구매자는 모든 호출이 끝날 때까지 기다리고, 그동안 시스템을 자원을 점유한다)가 있으며, 1개의 API만 호출한다면 간단한 일이지만 여러 개의 REST API를 호출할 때 REST API 트랜잭션 관리가 까다롭다.[4]

이 글은 메시징 시스템인 카프카Apache Kafka로 위 그림의 5번 영역에서 결과적 일관성을 구현하는 방법을 소개한다.

Golang 카프카 클라이언트

공식 카프카 웹사이트에서는 여러 개의 Golang 클라이언트를 소개하고 있다. 그중에서 필자가 선택한 클라이언트는 Shopify가 만들어 공개한 sarama 클라이언트였다. 왜냐하면 소개한 클라이언트 중에 GitHub 스타도 제일 많고 글을 쓰는 지금 시점까지 지속적으로 커밋하고 있기 때문이다.

출처: https://shopify.github.io/sarama/

출처: https://shopify.github.io/sarama/

컨슈머 그룹

일반적으로 마이크로서비스는 가용성이나 확장성을 고려하여 여러 개의 인스턴스로 배포하여 운영한다. 카프카로 발행한 주문 상태 이벤트 메시지를 각각의 마이크로서비스 인스턴스가 이벤트를 따로 구독 처리하다면 같은 메시지를 중복으로 처리하는 문제가 발생한다.

image-20200701-010754

이 문제는 카프카 컨슈머 그룹Consumer Group으로 해결할 수 있다. 물리적으로 여러 개로 나뉜 마이크로서비스를 하나의 컨슈머 그룹을 묶으면 이벤트 메시지를 각기 처리하는 것이 아니라 가용한 마이크로서비스 인스턴스 중 하나가 처리한다.

image-20200701-011512

Shopify/sarama 카프카 클라이언트로 컨슈머 그룹을 아래와 같이 구현할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import (
  // ...
  "github.com/Shopify/sarama"
)
type orderEventMessageHandler struct {
}
func (orderEventMessageHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (orderEventMessageHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h orderEventMessageHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  for msg := range claim.Messages() {
    log.Info(fmt.Sprintf("Message topic:%q partition:%d offset:%d -> %s = %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)))
    // 이벤트 메시지 처리
    // ...
    session.MarkMessage(msg, "")
  }
  return nil
}
func InitOrderEventMessageConsumer() {
  go func() {
    kafkaConfig := sarama.NewConfig()
    version, err := sarama.ParseKafkaVersion(config.Config.EventMessageBroker.KafkaVersion)
    if err != nil {
      log.Panicf("Error parsing Kafka version: %v", err)
    }
    kafkaConfig.Version = version // specify appropriate version
    kafkaConfig.Consumer.Return.Errors = true
    kafkaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
    group, err := sarama.NewConsumerGroup(config.Config.EventMessageBroker.Brokers, config.Config.EventMessageBroker.ConsumerGroupId, kafkaConfig)
    if err != nil {
      panic(err)
    }
    defer func() { _ = group.Close() }()
    // Track errors
    go func() {
      for err := range group.Errors() {
        fmt.Println("ERROR", err)
      }
    }()
    // Iterate over consumer sessions.
    ctx := context.Background()
    for {
      topics := []string{config.Config.EventMessageBroker.Topic}
      handler := orderEventMessageHandler{}
      err := group.Consume(ctx, topics, handler)
      if err != nil {
        panic(err)
      }
    }
  }()
}

위 코드에서 먼저 주목할 점은 컨슈머 그룹을 생성할 때 사용하는 sarama.NewConsumerGroup 함수이다. 하나의 컨슈머 그룹을 묶기 위해서는 각각의 마이크로서비스 인스턴스는 같은 groupID를 주어야 한다.

image-20200701-234106

컨슈머 그룹은 자신이 가져간 메시지의 위치 정보를 기록하고 있다. 이를 오프셋Offset이라고 하고 오프셋을 업데이트하는 것을 커밋Commit이라고 한다.

출처 : https://kafka.apache.org/intro

출처 : https://kafka.apache.org/intro

Shopify/sarama 카프카 클라이언트에서 컨슈머 그룹을 사용할 때 자동으로 오프셋을 커밋 하지 않는다. 따라서 메시지를 처리하고 나서 반드시 MarkMessage를 호출해야 한다.

1
2
3
4
5
6
7
8
func (h orderEventMessageHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  for msg := range claim.Messages() {
    // 이벤트 메시지 처리
    // ...
    session.MarkMessage(msg, "")
  }
  return nil
}

재시도하기

앞서 결과적 일관성 언급하며 이벤트를 구독하는 서비스는 처리하다가 오류가 나는 경우에 계속 재시도하여 결국 처리해야 한다고 했다. 재시도 메커니즘은 아래와 같이 구현할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (h orderEventMessageHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  for msg := range claim.Messages() {
    log.Info(fmt.Sprintf("Message topic:%q partition:%d offset:%d -> %s = %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)))
    for i := 0; i < config.Config.EventMessageBroker.RetryPolicy.MaxAttempts; i++ {
      var orderEvent dto.OrderEvent
      if err := json.Unmarshal(msg.Value, &orderEvent); err != nil {
        errMsg := fmt.Sprintf("order event consumer json.Unmarshal error : %s", err.Error())
        log.Error(errMsg)
        err = errors.New(errMsg)
        break
      }
      delivery := factory.NewDelivery(orderEvent)
      if err := service.DeliveryService().CreateOrderDelivery(session.Context(), delivery); err != nil {
        log.Error("DeliveryService().CreateOrderDelivery error - ", err.Error())
        time.Sleep(time.Duration(config.Config.EventMessageBroker.RetryPolicy.BackOffDelaySec) * time.Second)
        continue
      }
      break
    }
    session.MarkMessage(msg, "")
  }
  return nil
}

메시지를 장시간 처리하지 못하는 경우 다른 메시지들이 처리하지 못하고 대기하는 문제를 방지하기 위해 재시도 횟수를 제한하였다.

재시도 횟수를 초과하여 처리하지 못한 메시지는 어떻게 해야 할까? 실패한 메시지를 데이터베이스에 기록하고 필요한 경우 메시지를 재현하여 재시도할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (h orderEventMessageHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  for msg := range claim.Messages() {
    log.Info(fmt.Sprintf("Message topic:%q partition:%d offset:%d -> %s = %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)))
    messageProcessed := false
    var err error
    for i := 0; i < config.Config.EventMessageBroker.RetryPolicy.MaxAttempts; i++ {
      var orderEvent dto.OrderEvent
      if err = json.Unmarshal(msg.Value, &orderEvent); err != nil {
        errMsg := fmt.Sprintf("order event consumer json.Unmarshal error : %s", err.Error())
        log.Error(errMsg)
        err = errors.New(errMsg)
        break
      }
      delivery := factory.NewDelivery(orderEvent)
      if err = service.DeliveryService().CreateOrderDelivery(session.Context(), delivery); err != nil {
        log.Error("DeliveryService().CreateOrderDelivery error - ", err.Error())
        time.Sleep(time.Duration(config.Config.EventMessageBroker.RetryPolicy.BackOffDelaySec) * time.Second)
        continue
      }
      messageProcessed = true
      break
    }
    if messageProcessed == false {
      invalidMessage := entity.InvalidMessage{
        MessageChannel: msg.Topic,
        Message:        string(msg.Value),
        Error:          err.Error(),
        CreatedAt:      time.Now(),
      }
      if err := repository.InvalidMessageRepository().Create(session.Context(), invalidMessage); err != nil {
        log.Error("Create InvalidMessage error - ", err.Error())
      }
    }
    session.MarkMessage(msg, "")
  }
  return nil
}

또한 무효 메시지 채널Invalid Message Channel(여기서 채널은 카프카의 토픽Topic을 의미)을 사용하여 다른 구독자에 의해 알림 등을 활용할 수 있으며 메시지를 로깅하여 필요한 경우 재시도할 수 있다.

메시지를 처리하는 수신자는 데이터를 해석하고 그 의미도 이해해야 한다. 그러나 상황이 항상 그런 것은 아니다. 메시지 본문에 구문 오류나 문법 오류, 유효성 오류 등이 있을 수 있다. 메시지 헤더에 필요한 속성이 누락되거나 해석할 수 없는 속성 값이 있을 수도 있다. 발신자가 엉뚱한 채널로 메시지를 발신해, 엉뚱한 수신자가 메시지를 수신할 수 있다. 악의적인 발신자는, 수신자를 엉망으로 만들려고, 의도적으로 잘못된 메시지를 발신할 수도 있다. 이와 같이 수신자가 수신한 메시지를 처리하지 못하는 경우들이 생긴다. 그러므로 유효하지 않는 메시지를 처리할 방법이 있어야 한다. - Enterprise Integration Patterns, 173 쪽
출처 : https://www.enterpriseintegrationpatterns.com/patterns/messaging/InvalidMessageChannel.html

출처 : https://www.enterpriseintegrationpatterns.com/patterns/messaging/InvalidMessageChannel.html

메시지 중복과 멱등성Idempotent

카프카 메시지를 구독하여 처리할 때에는 늘 메시지 중복에 대비해야 한다. 왜냐하면 카프카 프로듀스가 메시지를 중복으로 발행할 수 있고, 또한 컨슈머가 메시지를 구독하여 처리했지만 오프셋을 커밋하기 전에 종료된다면 메시지를 재수신하기 때문이다.

이 문제에 대한 해결책으로 컨슈머(마이크로서비스)가 메시지를 멱등하게 처리하는 방법이 있다. 이를 멱등 수신자Idempotent Receiver라고 한다.

수신자를 멱등 수신자(Idempotent Receiver)로 설계한다. 멱등 수신자는 안전하게 동일한 메시지를 여러 번 수신할 수 있는 수신자다. (중략) 메시징에서 멱등 개념은 한 번 또는 여러 번 수신해도 동일한 효과를 주는 메시지로 해석한다. 이것은 동일한 메시지를 중복해서 수신해도 수신자는 문제가 발생하지 않으므로 안전하게 메시지를 재전송할 수 있다는 것을 의미 한다. - Enterprise Integration Patterns, 601 쪽

주문 도메인에서는 주문 번호(OrderId)와 주문의 상태(OrderShipped)로 멱등 수신자를 아래와 같이 구현할 수 있다. 앞서 코드에서 메시지를 처리하는 DeliveryService().CreateOrderDelivery 함수는 해당 주문 번호에 배송 추적 정보가 생성되지 않는 경우에만 생성한다. 이미 배송 추적 정보가 생성되었다면 중복이므로 메시지만 소비한다.

1
2
3
4
5
6
7
8
9
10
11
12
func (deliveryService) CreateOrderDelivery(ctx context.Context, delivery entity.Delivery) error {
	// 주문 번호를 기준으로 생성이 안된 경우만 생성한다.
	_, err := repository.DeliveryRepository().FindByOrderId(ctx, delivery.OrderId)
	if err != nil {
		if err == common.ErrNoResult {
			// 처리
			return nil
		}
		return err
	}
	return nil
}

주문처럼 주문 번호나 상태가 없는 경우에는 어떻게 해야 할까? Enterprise Integration Patterns에서는 메시지마다 고유한 메시지 식별자를 사용하라고 한다.

수신자는 기수신한 메시지들의 추적을 유지함으로 명시적으로 중복 메시지를 제거할 수 있다. 이 경우 고유한 메시지 식별자를 사용하면, 작업이 단순해지고 수신한 메시지 중 동일한 내용의 메시지들의 탐지하기가 쉬워진다. 이를 위해 메시지 식별자 필드를 별도로 사용해 메시지 중복의 의미가 메시지 내용에 묶이지 않게 한다. 그런 다음 메시지마다 고유한 메시지 식별자를 할당한다. - Enterprise Integration Patterns, 601 ~ 602 쪽

카프카 프로듀서에서 메시지를 발행할 때 메시지마다 고유한 식별자를 할당한다. 카프카 컨슈머는 메시지 식별자를 사용하여 이미 처리한 메시지인지 확인할 수 있다.

메시지마다  처리했는지를 확인하는 것이 성능상 병목이 될 수 있다. 필자가 시도해 보진 않았지만 처리한 메시지를 레디스Redis 같은 키, 값 저장소에 만료시간Expire과 함께 저장하여 처리하는 접근 방식을 사용할 수 있다.

주석

[1] 처리량을 극대화하기 위해 최신 트랜잭션 시스템은 트랜잭션을 최대한 짧게 유지하도록 설계된다. 따라서 여러 요청에 걸친 트랜잭션을 만들지 말아야 한다. - Patterns of Enterprise Application Architecture

[2] https://www.popit.kr/msa에서-메시징-트랜잭션-처리하기/

[3] 마이크로서비스 아키텍처 구축 5.12.1 나중에 재시도하기

주문을 받고 처리했다는 사실은 우리가 나중에 창고의 수집 테이블에 삽입을 재시도하기에 충분하다. 우리는 이 연산의 일부를 큐나 로그 파일에 큐잉하여 나중에 재시도할 수 있다. 이런 재시도 방법이 의미가 있는 몇 가지 연산에 대해서는 재시도로 해결될 것이라고 추정해야 한다. 이것은 여러모로 최종적 일관성eventual consistency의 또 다른 형태이다. 트랜잭션이 완료되었을 때 시스템이 일관성을 유지하는 상태임을 보장하기 위해 트랜잭션 경계를 사용하는 대신 향후 특정 시점에 시스템이 스스로 일관성을 유지하는 상태가 될 수 있음을 허용한다.

[4] REST API를 트랜잭션으로 처리하기 위한 방법으로 TCCTry-Confirm/Cancel가 있다.


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.