• 컨슈머? 파티션 리더에게 메시지 가져오기 (Consume)
  • 다른 메시지큐 처럼 소비하면 없어지나?
    ⇒ X 기본값 7일 동안 저장했다가 삭제
    ⇒ 필요하다면 이미 가져온 데이터도 다시 가져올 수 있음
    ⇒ 여러 컨슈머 그룹에서 메시지 수신 가능

 

컨슈머 중요 옵션

  • group.id 컨슈머 그룹 식별자
  • fetch 관련
    • fetch.min.bytes 한 번에 가져오는 데이터의 최소 사이즈. 지정한 사이즈 보다 작은 경우 요청에 응답하지 않고 데이터가 누적될 때 까지 기다림. (default 1byte)
    • fetch.max.wait.ms 사이즈가 작아서 기다린다면 최대 몇 초 동안 기다릴건지? (default 500ms)
    • max-wait 시간 지나면 쌓인 데이터가 min-bytes 보다 작아도 그냥 컨슈머로 보내기 때문에, min-bytes가 실제 수신하게 되는 데이터의 최소 사이즈는 아님.
  • heartbeat 관련
    • 컨슈머가 살았는지 죽었는지는 컨슈머가 지속적으로 하트비트를 보내는 것으로 판단함
    • 하트비트 관련 상세 내용은 다음 링크 하단의 HeartBeat 스레드 참조
    • session.timeout.ms
      • 컨슈머<> 브로커 세션 타임 아웃 제한 시간. 지정된 시간 동안 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않으면 컨슈머 장애로 판단하고 리밸런스 진행함.
    • heartbeat.interval.ms
      • 이 하트비트를 얼마 간격으로 보낼건지 지정하는 옵션 (보통 세션 타임아웃의 1/3로 설정)
    • max.poll.interval.ms
      • HeartBeatThread만 poll하고 KafkaConsumer 는 poll하지 않아서 실제로 메시지를 가져가지 않는 경우를 대비한 옵션.
      • 지정된 시간 동안 컨슈머가 poll 호출하지 않으면 장애라고 판단함. 위 d2 링크 참조.

 

컨슈머로 읽어오기 (콘솔 예제)

./kafka-console-consumer.sh --bootstrap-server $KAFKA_CLUSTER --topic peter-topic --group peter-consumer-group --partition 0 --from-beginning
./kafka-console-consumer.sh --bootstrap-server $KAFKA_CLUSTER --topic peter-topic --group peter-consumer-group --partition 0 --offset 1

http://10.168.241.137:9000/clusters/dev-umbum-kafka/consumers

토픽, 컨슈머 오프셋 CMAK에서 확인 가능

 

컨슈머로 읽어오기 (코드 예제)

public class KafkaBookConsumer1 {

    @Test
    public void main() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.168.249.27:9092,10.168.249.17:9092,10.168.249.4:9092");
        props.put("group.id", "peter-consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("peter-topic"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        } finally {
            consumer.close();
        }
    }
}

 

  • 위는 그냥 예제 코드이고, 실제 사용할 때는 코드를 이렇게 imperative하게 작성하여 while true poll 하지는 않는다.
  • spring-kafka를 사용한다. (`` @KafkaListener``)

 

 

파티션과 메시지 순서 보장

카프카는 파티션 내에서의 순서만 보장한다. 파티션 간의 데이터 순서는 보장이 안된다.

  • a b c d e 1 2 3 4 5 차례대로 입력하는 경우

  • 위와 같이 파티션에 적재 되었다고 가정하면
    • 파티션의 앞에서 부터 offset을 증가시키며 컨슘하기 때문에 파티션 내의 순서는 보장 됨.
      • e.g., 전체 결과 내에서 a d 1 4 는 반드시 이 순서대로 여야 함. da 보다 선행할 수 없음
    • 파티션 간의 순서는 보장이 안됨
      • e.g., b a c 가 어떤 순서대로 받아오게 될지는 보장이 되지 않는다.
  • 극단적으로 전체 결과가 a d 1 4 b e 2 5 c 3 가 될 수도 있음

 

그렇다면, 순서 보장이 필요하다면 어떻게 해야?

⇒ 파티션을 1개로 해야 한다.

⇒ 분산처리 X

⇒ 처리량 ⬇

 

파티션 : 컨슈머 수

case : 파티션 수  < 컨슈머 수

컨슈머 그룹 안에서, 하나의 파티션에는 하나의 컨슈머만 연결할 수 있다.

하나의 파티션에 두개의 컨슈머가 연결되면 안정적으로 메시지 순서를 보장할 수 없기 때문임.

따라서 컨슈머04는 놀게 된다.

 

case : 파티션 수 > 컨슈머 수

컨슈머3이 처리속도가 2배가 아니라면 파티션 2,3에 있는 메시지 처리가 지연될 것이므로 새로운 컨슈머4를 추가해주는 것이 좋음.

 

커밋과 오프셋

  • 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보(offset)을 기록하고 있음.
    • 즉, 어디까지 읽었는지? (=어디서 부터 읽어야 할지?)
  • 각 파티션에 대해 현재 위치를 업데이트 하는 동작을 commit 이라고 함.
  • 관련 옵션
    • enable.auto.commit
      • 오프셋을 자동으로 커밋 할 것인지 여부
      • 설정 시, poll() 호출할 때 마다 커밋할 시간(인터벌)이 되었는지 체크하고, 인터벌이 지났다면 이전 poll() 요청을 통해서 가져온 마지막 오프셋을 커밋함.
    • auto.commit.interval.ms
      • 오프셋 오토 커밋 인터벌 (default : 5)

 

올드 컨슈머 vs 뉴 컨슈머

 

중복과 누락

A. 1, 2를 가져옴

B. 3, 4를 가져오면서 2를 커밋

C. 5, 6을 가져 오면서 4를 커밋

 

중복이 발생하는 케이스

컨슈머1이 B 진행 후 처리하다 오류 및 리밸런싱

⇒ offset은 2 이므로, 새로운 컨슈머2가 3, 4 를 다시 컨슘 ⇒ 중복

 

누락이 발생하는 케이스

클라에서 3, 4 처리가 아직 안끝났는데 5, 6 가져오면서 4를 커밋(C)

결국 3, 4 처리 실패하고 리밸런싱 ⇒ offset은 4이므로, 새로운 컨슈머2는 5, 6 부터 컨슘 ⇒ 누락

 

이러한 케이스를 견고하게 막아야 하는 경우, 수동 커밋 방식으로 처리하는 것이 가능함.

 

특정 파티션만을 지정하여 메시지를 가져와야 하는 경우

  1. key-value 형태로 파티션에 나뉘어져 저장되어 있고, 특정 파티션의 데이터만 가져와야 하는 경우
  2. 컨슈머 프로세스가 자동으로 관리되는 경우 (카프카가 관리해줄 필요 없는 경우. 죽으면 알아서 해당 파티션 컨슈머가 다시 뜬다거나)
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

 

특정 offset 부터 메시지 가져오도록 수동 설정 해야 하는 경우

consumer.seek(partition0, 2);

파티션과 오프셋 번호를 넘겨주면 컨슈머가 다음 poll() 하는 위치를 지정할 수 있음.

 

 

이미지 출처 및 참고

카프카, 데이터 플랫폼의 최강자

 

'Data Store' 카테고리의 다른 글

ELK with SpringBoot  (0) 2021.09.11
카프카 컨슈머  (0) 2021.07.23
docker 안의 DB를 사용할 때 timezone 문제  (0) 2021.05.19
[Oracle] Pagination  (0) 2021.05.17
Flyway  (0) 2021.05.13
[Oracle] longest match  (0) 2021.02.16