본문 바로가기
생각

Commit 실패된 메세지 Producer 가 책임질래 Consumer 가 책임질래

by 6161990 2024. 7. 27.

만약 @TransactionalEventListener 가 없다면 어떻게 될까. 바로 떠오르는 생각은 컨슈머에서 대응을 해야될 것 같았습니다. 해당 포스팅은 세계에 @TransactionalEventListener 가 없다는 가정하에 굳이굳이 컨슈머에서 그 영향을 대응하는 방법을 구현해보는 과정을 담았습니다. 저는 카프카를 잘 몰라서, 일단  Kafka Producer 의 기본 동작 방식을 차근차근 따라가보면서 시작해보았습니다.

 

 

해당 문서는 공식문서 를 기반으로 따라가보았습니다. 테스트는 testContainer 를 이용해서 테스트 했지만 @EmbeddedKafka 를 이용하는 방법도 있습니다.

 

testContainer 를 이용한 테스트를 참조 하고 싶다면 아래를 참조하세요!

더보기
package com.yoon.basically;

import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
public abstract class AbstractTestcontainersTest {

    @Container
    private static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    static {
        kafkaContainer.start();
    }

    @DynamicPropertySource
    public static void overrideProperties(DynamicPropertyRegistry dynamicPropertyRegistry){
        dynamicPropertyRegistry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
        dynamicPropertyRegistry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
    }

}

 

 

package com.yoon.basically.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yoon.basically.AbstractTestcontainersTest;
import com.yoon.basically.kafka.KafkaConfig;
import com.yoon.basically.vo.MyOutputData;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import java.util.concurrent.atomic.AtomicReference;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
@Import(KafkaConfig.class)
public class KafkaConsumerServiceTest extends AbstractTestcontainersTest {

    private final KafkaTemplate<Integer, Object> kafkaTemplate;

    private static final AtomicReference<Object> receivedMessage = new AtomicReference<>();
    private static final String TOPIC = "hello-world";

    @Autowired
    public KafkaConsumerServiceTest(KafkaTemplate<Integer, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @KafkaListener(groupId = "test-group", topics = TOPIC)
    public void listen(String message) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        MyOutputData myOutputData = objectMapper.readValue(message, MyOutputData.class);
        receivedMessage.set(myOutputData);
    }

    @Test
    void sendAndReceive() {
        MyOutputData myOutputData = new MyOutputData(1, "쀼");
        ProducerRecord<Integer, Object> record = new ProducerRecord<>(TOPIC, myOutputData.key(), myOutputData);
        kafkaTemplate.send(record);

        Awaitility.await()
                .pollInterval(Duration.ofSeconds(3))
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(() ->
                        assertThat(receivedMessage.get()).isEqualTo(myOutputData)
                );
    }

}



Kafka Producer 

본격적으로 베이직 튜토리얼을 진행해보겠습니다. 간단한 로직이지만 차근차근 따라 가보려고합니다. 

 

 

 

Producer 의 설정을 Map 으로 구성하여 ProducerFactory 를 생성한 이후, KafkaTemplate 를 만들어 해당 객체를 통해서 Message 를 전송하면 됩니다. 

 

비동기 전송 방식으로는 다음과 같이 구현할 수 있습니다.

 

Non Blocking (Async)

 

KafkaTemplate 의 Message 전송은 기본적으로 CompletableFuture 비동기로 진행되지만, 아래와 같이 Blocking 구현을 통해 동기식 전송도 가능합니다. 

 

 

 

Blocking (Sync)

 

 

 

Kafka Producer Interceptor

ProducerInterceptor 는 프로듀서 클라이언트에서 사용되는 인터셉터로, 메세지가 kafka 브로커로 전송되기 전에 특정 로직을 실행할 수 있도록 해줍니다. 아래 사용처로 이용해볼 수 있을 것 같습니다. 

1. 메세지 변환 
2. 모니터링 및 로깅 : 메세지가 성공적으로 전송되었는지, 실패되었는지를 기록하고 모니터링 할 수 있다.
3. 통계 수집 

 

 

주요 구현 메소드 두 가지만 구현해보았습니다. 

 

 

 

onSend 

ProducerRecordpublish 되기 이전에 실행되며 ProducerRecord 에 접근가능하며 수정이 가능합니다. 

onAcknowledgement
Message 전송 결과에 따라 메소드가 호출됩니다. 정상 전송된 경우 Exception 이 null 이며 전송 실패의 경우 Exception 에 해당 예외가 전달되어 호출됩니다. 

 

 

 

구현한 Interceptor 를 template 에 세팅하는 과정 잊지 않아야합니다!!! 

 

 

 

 

Kafka Producer Listener

Spring Kafka 에서는 Producer 전송 결과를 수신하는 ProducerListener 를 제공합니다. 

 

 

onSuccess
Message 가 전송된 이후 정상 처리된 경우 호출됩니다. 

onError
그 반대겠죠?

 

 

 

 

 

지금까지 살펴본 기본 구현 내용에 Spring Transaction 을 끼얹어 보겠습니다. 💦  
다음과 같은 로직이 있습니다.

 

 

1. 멤버를 생성하고 저장합니다.
2. 멤버 생성 이벤트를 발행합니다.

 

 

@Transactional 에 묶여있다는 점을 포인트로 두고, 상상을 하나 해보겠습니다. 
PointDispenser 라는 도메인이 있고 해당 도메인의 관심사는 member 의 생성입니다. 생성된 member 의 id 로 멤버의 정보를 찾아 포인트를 지급하는 역할을 PointDispenser 가 담당하기 때문입니다. 위 로직에서 이슈가 발생할 수 있는 여지가 보이기 시작합니다. 트랜잭션 커밋 전, 아직 저장되지않은 member 를 조회하면 예외가 발생합니다.

 

 

Kafka 메세지 발행을 트랜잭션 내부에서 실행하는 경우, 트랜잭션 커밋 성공 여부에 따라 메세지를 컨트롤하고 싶을 때 다음 내용을 이해해두면 좋을 것 같습니다. 트랜잭션 이후, 이벤트를 발행하기 위해서는 먼저 Kafka Transaction 을 설정해야합니다. 

 

 

Transactions are enabled by providing the `DefaultKafkaProducerFactory` with a `transactionIdPrefix`. In that case, instead of managing a single shared `Producer`, the factory maintains a cache of transactional producers.

 

DefaultKafkaProducerFactory 의 transactionIdPrefix 를 제공하여 활성화됩니다. 각 프로듀서의 transactional.id 속성은 transactionIdPrefix + n 이며, 여기서 n 은 0부터 시작하여 새로운 프로듀서마다 증가합니다.

With Spring Boot, it is only necessary to set the `spring.kafka.producer.transaction-id-prefix` property - Spring Boot will automatically configure a `KafkaTransactionManager` bean and wire it into the listener container.


 주의해야할 사항은 여러 인스턴스에서 실행되는 애플리케이션의 경우, transactionalPrefix 는 인스턴스마다 고유해야한다는 점입니다.
 


모든 인스턴스가 같은 트랜잭션 prefix id 로 운용된다면 어떻게 될까요? kafka 는 트랜잭션 ID 를 통해 트랜잭션을 식별하고 관리합니다. 동일한 트랜잭션 ID 가 여러 인스턴스에서 사용되면, kafka 는 이를 잘못된 트랜잭션으로 간주하여 트랜잭션을 거부하거나 실패시킬 수 있습니다. 트랜잭션 실패가 되면 어떻게 되면 데이터가 손실될 수 있습니다. 데이터가 손실되면 데이터 불일치가 발생하고 이런 실패가 실패를 낳는 순환이 돌고돌면 애플리케이션 신뢰성 저하로 어떤 시스템은 종료될 수 도 있습니다.

환경변수를 사용해서 설정하든, 프로퍼티에서 인스턴스별로 설정값을 가져가든 커맨드라인 인수를 통해 설정하든 인스턴스 고유의 트랜잭션을 따로 가져가야한다는 점에 유의해야할 것 같습니다. Spring Boot를 사용하는 경우, spring.kafka.producer.transaction-id-prefix 속성만 설정하면 Spring Boot가 자동으로 KafkaTransactionManager 빈을 구성하고 이를 리스너 컨테이너에 연결합니다.

 

역시 공식문서에 상세히 설명되어 있습니다. 


어찌됐든, DefaultKafkaProducerFactory 에 TransactionIdPrefix 를 설정하게 된다면 Kafka Transaction 으로 동작하게되며 이벤트는 KafkaTemplate.send 라인에서 바로 발행되지만 Spring Transaction 이후 해당 이벤트가 Commit 됩니다.  해당 설정으로 로그를 살펴보았습니다.

 

 


kafka transaction 설정 이전

 

 

 

 

kafka transaction 설정 이후

 

 

... 트랜잭션 설정 전과 후 결과가 크게 다르지 않아 당황했습니다. 설정 후 기대한 결과는 Transaction 종료된 후, 이벤트 발행을 생각했었는데요, 카프카 이벤트 발행 commit 방식을 이해해볼 필요가 있었습니다. 

로그를 다시 자세히 들여다 보았습니다. 

 

 

17:07:25.096 경 트랜잭션 내에서 이벤트 발행
17:07:25.141 경 트랜잭션 committed

설정 이전과 달리 beginTransaction 과 commitTransaction 두 개의 과정이 추가되었습니다. Kafka Transaction 은 DB Transaction 과 다르게 우선은 이벤트를 발행한 이후 해당 이벤트에 Transaction 결과를 마크하는 방식입니다. 

 

예외가 발생했을 때 로그는 다음과 같이 달라졌습니다. 

 


transaction 이 aborted 된 것을 알 수 있습니다. 

그런데, 그런데, 그런데 !!
로그로 알 수 있듯이, 트랜잭션 성공하든 실패하든 메세지가 무조건 발행됩니다. 저는 메세지 발행 자체를 안할거라고 생각했습니다. 그럼 이 트랜잭션이 실패한 메세지는 언제 누가 컨트롤 하는 걸까요..? 바로 수신인, 구독자, Consumer 의 역할이었습니다. 

 

 

 

Consumer 의 isolation.level

Consumer Configurationisolation.level 의 값을 read_committed 으로 설정하게 된다면 kafka Transaction 이 commit 된 이벤트만 구독하게 됩니다. 

이렇게 적용한다면 유의해야할 사항이 있습니다. 

1. 해당 토픽에 있는 메세지가 예외 발생 메세지임을 알 수 있다. 
2. 구독자가 isolation.level 설정을 적용해야한다.
3. kafka Transaction 이 적용된 이벤트를 offset 증가가 반드시 1이 아니다. 
4. kafka Transaction 이 적용된 이벤트가 마지막일 경우 ConsumerGroup 의 Lag 가 1이다. 

3, 4번에 대해 추가로 설명해보겠습니다. 

| 3. kafka Transaction 이 적용된 이벤트를 offset 증가가 반드시 1이 아니다. 

 

kafka 트랜잭션을 사용할 때, 여러 메세지를 하나의 트랜잭션으로 묶어 전송할 수 있습니다. isolation.level 을 read_committed 로 설정하면 컨슈머는 트랜잭션이 적용된 메세지의 오프셋 증가가 연속적이지 않을 수 있습니다. 

 


만약 트랜잭션 1이 커밋되고, 트랜잭션 2가 아직 커밋되지않은 상태라면, 컨슈머는 메세지 A와 B만 읽습니다. 이 경우 메세지 B의 오프셋 다음에 읽을 수 있는 메세지는 C 가 아닌, E 입니다. 트랜잭션 2가 커밋될 때까지 건너뛰는 방식입니다. 메시지의 순서가 보장되어야하는 경우, 취약점입니다.

 

 

 

 

| 4. kafka Transaction 이 적용된 이벤트가 마지막일 경우 ConsumerGroup 의 Lag 가 1이다. 

kafka 트랜잭션이 적용된 마지막 메시지가 커밋되지 않은 상태라면, 컨슈머 그룹의 lag(지연) 이 1이 됩니다. 이는 컨슈머가 해당 파티션의 마지막 메세지를 읽을 수 없기 때문입니다. 커밋되지 않은 트랜잭션 메시지는 컨슈머에게 노출되지않으므로, 해당 메세지가 커밋되기 전까지는 lag 가 존재하게 됩니다. 

* 파티션이 마지막 메세지 E (offset 2) 가 트랜잭션에 포함되어 있다.
* 이 메세지는 아직 커밋되지 않았다. 

이 경우, 컨슈머는 offset 1 까지의 메세지만 읽을 수 있으며 offset 2 에 있는 메시지는 커밋되지 않았으므로 읽을 수 없습니다. 따라서, 컨슈머 그룹의 lag 는 1이 됩니다.

이 lag = 1 을 얼마나 민감하게 보느냐에 따라서 심각한 고려대상이 되기도 할 것 같습니다. 실시간 주문 처리나, 금융 거래 시스템, 재고 관리 시스템에서는 하나의 지연도 모니터링 대상이 될 수 있으니까요 ..

 

 

 

 

정리

적지 않은 근심 사항이 있긴하지만, @TransactionEventListener 없이도 트랜잭션 실패 메세지를 컨트롤할 수 있습니다. isolation.level 은 트랜잭션 실패 메세지 수신을 컨슈머가 결정할 수 있는 설정값이고, @TransactionEventListener 는 트랜잭션 실패 메세지 발신을 스프링이 결정한다고 어름지을 수 있을 것 같습니다. 

근데 사실 isolation.level 은 카프카 메세지 소비의 일관성을 위한 설정입니다. 트랜잭션이 성공할 때만 메세지 발행해야한다면 , @TransactionEventListener 를 이용하는게 바람직해보입니다. 메세지 순서나 오프셋 문제를 골치아프게 고려할 필요가 없어지니까요,, 

왠지 돌고돌아 @TransactionalEventListener 를 사용해야하는 이유를 알게된 것 같습니다. 트랜잭션이 성공할 때만 메세지가 발행되도록 보장하고 싶다면, 발행쪽이 그 역할을 담당하는 게 맞고 컨슈머는 컨슈머의 다른 일에 집중하는 게 맞겠네요. commit 에 실패한 메세지는 @TransactionalEventListener를 통해 producer 가 책임지고 메세지의 일관성은 isolation.level 을 통해 consumer 가 책임지면 영심이 딱딱 맞아 영심2