MSA에서 메시징 트랜잭션 처리하기

비동기 메시지를 사용하여 상호 간에 통신하는 방식을 메시징Messaging[1]이라고 부른다. 마이크로서비스 환경에서 비동기 처리 시 보통 카프카Kafka나 래빗엠큐RabbitMQ 같은 메시지 브로커Message Broker를 사용하여 메시징을 구현한다.

아래와 같이 마이크로서비스들이 메시징을 통해 협업하여 주문을 처리한다고 가정해보자.

image2019-9-3_6-54-13

OrderService는 주문 상태 이벤트(예. ORDER_CREATED, ORDER_CANCELLED 등)를 메시지 브로커에 발행하고 이벤트에 관심 있는 서비스가 구독하여 처리한다. 주문 상태 이벤트의 발행은 두 가지로 구분할 수 있다.

  • 주문 상태 변경
  • 주문 상태 전파

상태가 바뀐다는 것은 데이터베이스에 데이터를 변경한다는 것을 의미이며, 바뀐 상태를 전파한다는 것은 메시지 브로커로 이벤트 메시지를 발행한다는 의미이다.

데이터 변경과 이벤트 메시지 발행은 데이터베이스 트랜잭션 안에서 이루어진다. 문제는 이벤트 발행에 사용하는 메시지 브로커에는 데이터베이스 트랜잭션을 함께 사용할 수 없다는 것이다. 따라서 데이터베이스에 데이터 변경이 성공했지만 이벤트 발행에 실패하거나 혹은 이벤트 발행은 성공했지만 데이터베이스 반영은 실패한다면 서비스 간 데이터 일관성이 깨지고 만다. 

image2019-9-9_10-25-10 (1)

결국 일관성을 유지하기 위해서는 데이터베이스 트랜잭션과 메시징은 하나의 트랜잭션으로 다루어야 한다.

이 글은 위의 문제를 해결하는 Transactional Outbox Pattern에 대해 소개하고 간단한 구현 코드를 보여준다.

Transactional Outbox Pattern

이 패턴을 한마디로 말하자면 메시지 큐Message Queue로서 데이터베이스 테이블을 사용하는 것이다.

OrderService는 메시지 브로커에 메시지를 바로 발행하지 않고 'OUTBOX'라는 임시 데이터베이스 테이블에 메시지를 저장한다. 그리고 저장된 메시지는 OrderService와는 별도로 MessageRelay가 OUTBOX 테이블에 메시지를 읽어 메시지 브로커로 발행한다.

출처 : Microservices Patterns, 97 쪽

출처 : Microservices Patterns, 97 쪽

OUTBOX 테이블은 같은 데이터베이스에 위치하고 있기 때문에 데이터베이스 트랜잭션으로 다룰 수 있다. 따라서 메시지 발행에 시차가 좀 생기겠지만 결과적으로 일관성Eventual Consistency[2]을 유지할 수 있다.

객체지향으로 바라보는 '상태'와 '이벤트'

'상태', '이벤트'는 매우 일반적으로 쓰이는 말이다. 구현하기 앞서 객체지향 프로그래밍(이하 OOP) 관점으로 구체적으로 살펴보자.

먼저 OOP에서 상태라는 것은 객체의 상태를 의미한다.

상태는 특정 시점에 객체가 가지고 있는 정보의 집합으로 객체의 구조적 특징을 표현한다. - 객체지향의 사실과 오해, 51 쪽

자바에서는 객체를 클래스로 상태를 클래스 필드로 정의한다. 클래스를 통해 인스턴스화(new Order())한 객체의 특정 시점에 가지는 필드(id, createdAt)에 담긴 값들의 집합이 바로 상태이다.

1
2
3
4
5
public class Order {
  private String id;
  private LocalDateTime createdAt;
  // ...
}

이어서 이벤트는 무엇일까? DDDDomain-Driven Design 맥락에서 이벤트는 도메인 이벤트Domain Event로 좁혀서 생각할 수 있다.

이벤트란 우리 말로 사건을 말하는데, 프로그램 개발을 하는 우리의 영역안에서 의미있게 보는 사건이 바로 도메인 이벤트다. - 필요한 내용만 추려서 DDD 당장 써먹기

OOP로 표현하자면 객체에서 발생하는 의미 있는 사건이 도메인 이벤트이고 도메인 이벤트는 객체의 상태로 표현된다.

아래는 Order 객체가 주문 생성(OrderCreated) 도메인 이벤트를 발생시키는 예시 코드이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class OrderCreated implements DomainEvent {
  private String orderId;
  private LocalDateTime createdAt;
  private LocalDateTime occurredOn;
  //...
}
public class Order {
  // ...
  public List<DomainEventEnvelope> placeOrder() {
    // 비지니스 로직
    // ...
    // 도메인 이벤트 생성
    List<DomainEventEnvelope> domainEventEnvelopes = new ArrayList<>();
    domainEventEnvelopes.add(
        new DomainEventEnvelope.Builder()
            .aggregateType(this.getClass().getSimpleName())
            .aggregateId(this.id)
            .eventType(OrderCreated.class.getSimpleName())
            .event(new OrderCreated(this.id, this.createdAt))
            .build());
    return domainEventEnvelopes;
  }
}

코드 예시 - OrderService 마이크로서비스 모듈

자바 스프링 프레임워크와 JPA 사용해 구현하였으며, 클래스 다이어그램은 아래와 같다.

Order 객체는 OrderCreated 도메인 이벤트를 발생시키고 DomainEventPublisher는 'OUTBOX' 테이블에 저장한다. 그리고 이 과정은 하나의 데이터베이스 트랜잭션으로 처리한다.

image2019-9-6_10-27-43

도메인 이벤트는 Order 애그리게잇에서 발생한다. OrderServiceImpl(스프링 서비스)는 발행된 도메인 이벤트(OrderCreated)를 DomainEventPublisher로 전달한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
public class OrderServiceImpl implements OrderService {
  private OrderRepository orderRepository;
  private DomainEventPublisher domainEventPublisher;
  // ...
  @Transactional
  @Override
  public void placeOrder(Order order) {
    List<DomainEventEnvelope> domainEventEnvelopes = order.placeOrder();
    orderRepository.save(order);
    domainEventPublisher.publish(domainEventEnvelopes);
    // ...
  }
}
@Entity
@Table(name = "orders")
public class Order {
  @Id
  private String id;
  @OneToMany(mappedBy = "order", cascade = CascadeType.ALL)
  private List<LineItem> lineItems = new ArrayList<>();
  @OneToMany(mappedBy = "order", cascade = CascadeType.ALL)
  private List<PaymentInformation> paymentInfos = new ArrayList<>();
  @OneToOne(mappedBy = "order", cascade = CascadeType.ALL)
  private ShippingAddress shippingAddress;
  private LocalDateTime createdAt;
  // ...
  public List<DomainEventEnvelope> placeOrder() {
    this.validate();
    // 비지니스 로직
    // ...
    // 도메인 이벤트 생성
    List<DomainEventEnvelope> domainEventEnvelopes = new ArrayList<>();
    domainEventEnvelopes.add(
        new DomainEventEnvelope.Builder()
            .aggregateType(this.getClass().getSimpleName())
            .aggregateId(this.id)
            .eventType(OrderCreated.class.getSimpleName())
            .event(new OrderCreated(this.id, this.createdAt))
            .build());
    return domainEventEnvelopes;
  }
}

DomainEventPublisher 인터페이스의 구현체는 DomainEventPublisherOutBoxImpl이다. DomainEventPublisherOutBoxImpl는 전달받은 도메인 이벤트를 OUTBOX 테이블[3]에 저장한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class DomainEventPublisherOutBoxImpl implements DomainEventPublisher {
  private OutBoxRepository outBoxRepository;
  // ...
  @Override
  public void publish(List<DomainEventEnvelope> envelopes) {
    List<OutBox> outBoxes = envelopes.stream()
        .map(envelope -> new OutBox(envelope))
        .collect(Collectors.toList());
    outBoxRepository.saveAll(outBoxes);
  }
}
@Entity
public class OutBox {
  @Id
  @GeneratedValue
  @Type(type="uuid-char")
  private UUID id;
  private String aggregateType;
  private String aggregateId;
  private String type;
  @Lob
  private String payload;
  public OutBox() {
  }
  public OutBox(DomainEventEnvelope envelope) {
    this.aggregateType = envelope.getAggregateType();
    this.aggregateId = envelope.getAggregateId();
    this.type = envelope.getEventType();
    ObjectMapper mapper =
        Jackson2ObjectMapperBuilder.json().featuresToDisable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).modules(new JavaTimeModule()).build();
    try {
      this.payload = mapper.writeValueAsString(envelope.getEvent());
    } catch (JsonProcessingException e) {
      e.printStackTrace();
    }
  }
}

아래는 OUTBOX 테이블에 저장된 데이터이다.

image2019-9-10_8-58-17

코드 예시 - Message Relay

MessageRelay 는 두 가지 방식으로 구현할 수 있다.

아래는 Polling publisher 방식으로 구현한 예시이다. 자바 JDBC와 카프카 클라언트를 사용하였다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class MessageRelay {
  public static void main(String[] args) {
    MessageProducer messageProducer = new MessageProducer();
    // 1. JDBC로 out_box 테이블 메시지 조회
    Connection conn = null;
    Statement stmt = null;
    try{
      Class.forName(JDBC_DRIVER);
      conn = DriverManager.getConnection(DB_URL,USER,PASS);
      stmt = conn.createStatement();
      String sql = "SELECT id, aggregate_id, aggregate_type, payload, type FROM out_box ORDER BY id ASC";
      ResultSet rs = stmt.executeQuery(sql);
      List<String> sentIdList = new ArrayList<>();
      while(rs.next()){
        String id = rs.getString("id");
        String aggregateId = rs.getString("aggregate_id");
        String type = rs.getString("type");
        String payload = rs.getString("payload");
        // 2. Kafka 메시지 생성 및 발송
        String partitionKey = aggregateId;
        Map<String, Object> header = new HashMap<>();
        header.put("id", id);
        header.put("type", type);
        Message message = new Message(header, payload);
        try {
          messageProducer.send(ORDER_EVENT_TOPIC, partitionKey, message);
          // 발송 성공한 메시지 저장
          sentIdList.add(id);
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
      rs.close();
      // 3. 발송 성공한 메시지 삭제
      if(sentIdList.isEmpty() == false) {
        sql = "DELETE FROM out_box WHERE id IN (" + sentIdList.stream().collect(Collectors.joining(", ", "'", "'")) + ")";
        stmt.executeUpdate(sql);
      }
      stmt.close();
      conn.close();
    }catch(SQLException se){
      // ..
    }catch(Exception e){
      // ..
    }finally{
      // ...
    }
  }
}

눈여겨볼 점은 OUTBOX 테이블 aggregate_id 컬럼의 쓰임새이다.

종종 메시지를 구독하는 입장에서 발행된 순서대로 메시지를 받고 싶을 때가 있다. 예들 들면 ORDER_CREATED 이벤트를 받기 전에 ORDER_CANCELLD 이벤트를 받으면 안 된다.

image2019-9-3_6-54-13

카프카는 파티션이라는 단위로 메시지를 처리하는데 하나의 토픽에는 여러 파티션이 존재할 수 있다. 카프카는 동일한 파티션 내에서는 메시지 순서를 보장하지만 파티션과 파티션 사이에서는 순서는 보장하지 않는다.

카프카 프로듀서(OrderService)가 메시지를 발행할 때 추가로 파티션 키 값을 주면 카프카는 메시지를 같은 파티션에 넣어 준다. 따라서 순서가 보장할 수 있다. 이 파티션 키 값으로 쓸 수 있는 것이 바로 aggregate_id 컬럼이다.

중복 메시지

MessageRelay가 실행 중에 프로세스가 종료되거나 혹은 네트워크 단절 문제 등으로 메시지를 중복으로 발행할 수 있다. 일반적으로 메시지 브로커는 중복 메시지 제거 메커니즘을 제공한다. 하지만 추가 오버헤드를 야기한다.

다른 해결 방법은 메시지 구독하는 마이크로서비스가 중복 메시지를 처리하는 것이다. 이를 멱등 수신자Idempotent Receiver라고 한다.

수신자를 멱등 수신자(Idempotent Receiver)로 설계한다. 멱등 수신자는 안전하게 동일한 메시지를 여러 번 수신할 수 있는 수신자다. (중략) 메시징에서 멱등 개념은 한 번 또는 여러 번 수신해도 동일한 효과를 주는 메시지로 해석한다. 이것은 동일한 메시지를 중복해서 수신해도 수신자는 문제가 발생하지 않으므로 안전하게 메시지를 재전송할 수 있다는 것을 의미 한다. - Enterprise Integration Patterns, 601 쪽

주문 상태 이벤트 메시지는 메시지 식별자로 주문 아이디가 있다. 구독(수신)하는 측에서는 이미 처리된 주문인지 확인 후 이미 처리되었다면 처리하지 않고 메시지만 소비하는 방식으로 멱등성을 구현할 수 있다.

주석

[1] "메시징은 프로그램 간에 빠르고 신뢰할 수 있는 통신을 비동기 방식으로 가능하게 하는 전송 기술이다. 프로그램들은 메시지message라 불리는 데이터 패킷을 전송함으로 상호 간에 통신한다. 또 큐queue로 알려진 채널channel은 프로그램을 연결하고 메시지를 전달하는 논리적인 경로다." - Enterprise Integration Patterns, 36 쪽

[2] REST 기반의 간단한 분산 트랜잭션 구현 - 3편 TCC Confirm(Eventual Consistency) 참조

[3] OUTBOX 테이블 스키마는 Reliable Microservices Data Exchange With the Outbox Pattern에서 차용했다.

[4] Debezium으로 구현한 예시를 아래 링크에서 확인할 수 있다.

https://dzone.com/articles/implementing-the-outbox-pattern


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.