생각

SpringKafka Consumer 설정값으로 요리조리 테스트 해보기

6161990 2024. 8. 6. 23:31

SpringKafka Consumer 설정값으로 요리저리 놀아보았습니다. 해당 포스트에서는 그 과정을 전시합니다. 

키워드는 두 가지 입니다. 

  • AckMode 
  • Concurrency

 

주로 쓰이는 kafka 설정값을 나열해보았습니다.

 

kafka config

 

 

여기서 눈여겨봤던 것이 ContainerPropertiessetAckMode 입니다. 그리고 하나는 acknowledgement 입니다. 이 둘은 연관이 있습니다. 어떤 연관을 가지고 있는지, 각각은 어떻게 가져갈 수 있는지 확인해보겠습니다.

 

 

일단 ContainerProperties 의 setAckMode는 메세지를 소비할 때 acknowledgement 모드를 설정하는데 사용됩니다. acknowledgement 는 메세지가 성공적으로 처리되었음을 브로커에게 알리는 역할입니다. 이걸 알리는 이유는 메세지의 중복, 손실을 알아차리기 위함이라고 생각됩니다. 이걸 언제, 어떻게 알리고 싶냐의 설정값이 바로 setAckMode 의 다양한 모드 설정값입니다. 


1. AckMode.BATCH : 레코드의 배치가 처리된 후에 확인을 보냅니다. 
2. AckMode.TIME : 주어진 시간 간격 후에 확인을 보냅니다. 
3. AckMode.COUNT : 주어진 메세지 수 만큼 처리된 후에 확인을 보냅니다. 
4. AckMode.COUNT_TIME : 주어진 메시지 수 또는 시간 간격 중 먼저 도달하는 조건에 따라 확인을 보냅니다.
5. AckMode.MANUAL : 애플리케이션 코드에서 수동으로 확인을 보냅니다.
6. AckMode.MANUAL_IMMEDIATE : 애플리케이션 코드에서 수동으로 확인을 보내며, 바로 확인이 수행됩니다. 


다시 코드로 돌아가서 확인해보면 acknowledgment.acknowledge(); 한 줄의 의미를 설정값과 연결지어 이해해볼 수 있습니다. 

 

highlighted acknowledge code

 

 

수동으로 확인을 보내는 수행이 acknowledgment.acknowledge() 입니다. 로그로 확인해보겠습니다.

 

received message log
committed offset 1

 

만약 MANUAL 이나 MANUAL_IMMEDIATE 을 설정값으로 두고 수동 확인 처리를 애플리케이션에서 수행하지 않으면 어떻게 될까요? 다르게 질문하면, 브로커가 메세지 확인 사실을 알 수 없을 때 어떤 영향이 있는가 입니다. 

 

직접 확인해보았습니다.

 

receive messge
not committed offset 1

 

 

컨슈머의 오프셋이 업데이트 되지 않습니다. 카프카는 해당 메시지가 성공적으로 처리되지않았다고 판단하고, 같은 메시지를 다시 전송하게 될 수 있습니다. 같은 메세지가 중복 처리되는 게 이슈 입니다. 

이쯤되니 "acknowledgment.acknowledge() 를 직접 수행해야하는 이유가 뭘까? 기본 모드로 동작하면 뭐가 다른 걸까?" 같은 생각이 들었습니다. AckMode 의 기본 모드인 AckMode.BATCH 를 확인해보았습니다. 

AckMode.BATCH 는 배치 단위로 확인되며, max.poll.records 설정에 의해 결정됩니다. 메시지가 소비되고, 배치의 모든 메시지가 처리된 후 다음 배치를 요청할 때 확인이 브로커에게 전송됩니다. 

좀 더 정교한 메시지 확인 처리가 필요한 경우 MANUAL 방식을 이용할 수 있겠다는 생각이 듭니다. 저는 다음과 같은 상황에 이용해본 적이 있습니다. 

 

acknowledge() used example

 

 

지원하는 이벤트 타입인 경우, 관련 service 로직을 호출한 뒤 ack 하였습니다. 지원하지 않는 이벤트 타입은 바로 ack 를 전송하였습니다. 이 경우는 소비해야하는 이벤트에 의해 비즈니스 로직 호출 보장을 위한 acknowledge 라고 볼 수 있습니다. 정확히 한 번(Exactly-once)을 지향하는 방식입니다. 이런 경우에도 MANUAL 설정값을 이용해볼 수 있습니다. 


반대로 자동 ack 모드를 설정하고, acknowledgment.acknowledge() 를 직접 날리면 어떻게 될까요?

 

set auto AckMode

 

 

결과를 예상해보겠습니다. 자동 ACK 모드에서는 메시지가 처리되면 자동으로 Kafka 브로커에 확인을 보냅니다. 이때 acknowledgment.acknowledge()를 수동으로 호출하면 동일한 메시지에 대해 두 번 확인을 보내는 결과가 됩니다. 수동 확인 호출로 인해 메시지 처리 순서나 오프셋 관리에 문제가 발생할 수 있을 것 같습니다. 메시지 소비의 일관성을 해칠 수 있습니다.

결과는 다음과 같았습니다.

 

MessageHandlingException

 

kafka_acknowledgment 헤더가 누락되어 있어 Acknowledgment 객체를 전달할 수 없다는 것을 나타내고 있습니다. Kafka 리스너가 메시지를 수신할 때 수동 오프셋 확인을 위한 Acknowledgment 객체가 올바르게 전달되지 않았기 때문에 발생합니다.  

 

 

@Header(KafkaHeaders.ACKNOWLEDGMENT) 없이, Acknowledgment 객체만 받더라도  Acknowledgment 는 null 로 전달되어 에러가 발생하였습니다. 

 


여기서  acknowledgment.acknowledge()consumer.commitSync() 는 뭐가 다른걸까요?

Consumer 객체를 가지고 직접 consumer.commitSync() 를 수행할 수도 있습니다. 이 때의 commit 은 컨슈머가 특정 오프셋까지 메시지를 처리했음을 broker 에 알리는 행위입니다. 이 역시 자동 커밋과 수동 커밋이 있습니다. 

 

 

이렇게 구현해볼 수 있습니다. 

 

manual commit setting

 

manual commit code

 

 

사실 acknowledgment.acknowledge() 를 호출하면 결국 브로커에 오프셋 커밋하는 과정을 거치게 되긴합니다. 

1. acknowledgment.acknowledge() 호출
2. 호출에 따라 kafka 가 메시지의 offset을 기록
3. 배치가 끝나거나 다음 메시지 처리전, broker offsetcommit

 


지금까지 AckMode 를 살펴보았습니다. 

 

 

확인하고 싶은 설정값이 하나 더 남았습니다. 테스트를 위해 kafkaListenerContainerFactory 를 하나 더 추가해보았습니다.

 

add kafkaListenerContainerFactory2

 

factory.setConcurrency(2) 설정값 입니다. 동시 실행 쓰레드 수를 결정하는 값입니다. 병렬처리가 가능하다는 의미일까요? 그렇다면 이 값은 파티션의 갯수를 결정하는 것일 수도 있겠다고 생각했습니다.

컨슈머 그룹 아이디가 같지만 다른 토픽의 리스너 두 개가 있는 상황, SetConcurrency 2로 설정한 경우 어떻게 동작할까요?

 

topicA 와 topicB 두 개의 토픽이 있으며, 각각 두 개의 파티션을 가지고 있는 상황입니다. 그럼 파티션은 topicA-0topicA-1topicB-0topicB-1 총 4개입니다. 두 리스너는 동일한 컨슈머 그룹을 사용합니다. 

 

 

 

ConcurrentMessageListenerContainer 는 topicA 와 topicB 에 대해 각각의 리스너를 관리합니다. 각 리스너 컨테이너는 설정된 동시성에 따라 최대 2 개의 스레드를 사용하여 메세지를 처리하게됩니다. 두 개의 파티션을 두 개의 스레드에 할당되는 건데요, 헷갈려서 문장으로 정리해보겠습니다.

topicA 의 경우, 스레드 1이 topicA-p0 을 처리하고, 스레드 2가 topicA-p1 을 처리합니다. 
topicB 의 경우, 스레드 1이 topicB-p0 을 처리하고, 스레드 2가 topicB-p1 을 처리합니다. 

 

 

파티션 증설 후, 카프카를 실행시켜보았습니다.

 

> testContainer 에서 partition 추가하는 방법

 

 

 

설정 상태는 @KafkaListener를 2개를 선언하였으며 ConcurrentMessageListenerContainer.setConcurrency 2입니다. 설정한 예상대로 0-0-C-1, 0-1-C-1, 1-1-C-1, 1-0-C-1 4개의 쓰레드가 생성되긴했는데요, 해당 쓰레드에서 수신하고 있는 Topic-Partition이 예상과 다르게 분배되어있었습니다. 저는 토픽의 파티션 4개가 2개의 쓰레드에 분배되어 수신을 할것으로 생각했습니다.

결과는 총 4개의 스레드가 동작하게 되며, 각 스레드는 동일토픽 내의 하나의 파티션을 처리하게 됩니다. 

 

근데 이 상황에서 topicA 이 하나의 파티션(topicA-p0)만 가진다고 가정해보겠습니다. 이 경우 setConcurrency(2) 를 설정할 경우를 살펴보았습니다. 

결과적으로 한 스레드는 topicA-p0 파티션을 처리하지만, 다른 스레드는 할당받을 파티션이 없어서 유휴 상태가 됩니다. 순차 보장을 위해 하나의 파티션을 사용하면서 동시성을 2 이상 설정하는 것은 의미가 없다는 말이됩니다. 파티션 수와 동시성 설정에서 유의해야할 점입니다. 

자잘하게 consumer 에서 눈에띄는 설정값들을 살펴보았습니다. 사실 이런 결과는 비교적 간단한거라 블로그를 둘러보면 바로 알 수 있긴한데요. 회사에서 카프카 관련 설정은 이미 보일러 플레이트처럼 적용되고 있습니다. 저만 그렇게 사용하고 있는 걸수도 있고요..

 

사실 카프카의 설정 방법과 설정 값은 무궁한데 반면에, 회사에 다니면서 이 모든 걸 경험하고 직접 확인하고 테스트해볼 수는 없으니까요. 이렇게 상상에 상상을 더해가며 직접 확인해볼 수 밖에는 없다는 생각이 들었습니다. 가령, 테스트를 진행하면서 애플리케이션에서 파티션을 늘려려볼 수 있다는 사실을 알았는데 , "이 로직은 어떤 상황에서 쓰일 수 있을까? 얼마나 유용할까?" 이런생각이요. 덕분에 재밌는 공부였습니다 :))))