Post

카프카 컨슈머

옵션, 예제

  • 컨슈머? 파티션 리더에게 메시지 가져오기 (Consume)
  • 다른 메시지큐 처럼 소비하면 없어지나? – X
    • 기본값 7일 동안 저장했다가 삭제
    • 필요하다면 이미 가져온 데이터도 다시 가져올 수 있음
  • 여러 컨슈머 그룹에서 메시지 수신 가능

컨슈머 중요 옵션

  • group.id 컨슈머 그룹 식별자
  • fetch 관련
    • fetch.min.bytes 한 번에 가져오는 데이터의 최소 사이즈. 지정한 사이즈 보다 작은 경우 요청에 응답하지 않고 데이터가 누적될 때 까지 기다림. (default 1byte)
    • fetch.max.wait.ms 사이즈가 작아서 기다린다면 최대 몇 초 동안 기다릴건지? (default 500ms)
    • max-wait 시간 지나면 쌓인 데이터가 min-bytes 보다 작아도 그냥 컨슈머로 보내기 때문에, min-bytes가 실제 수신하게 되는 데이터의 최소 사이즈는 아님.
  • heartbeat 관련
    • 컨슈머가 살았는지 죽었는지는 컨슈머가 지속적으로 하트비트를 보내는 것으로 판단함
    • 하트비트 관련 상세 내용은 다음 링크 하단의HeartBeat 스레드 참조
    • session.timeout.ms
      • 컨슈머<> 브로커 세션 타임 아웃 제한 시간. 지정된 시간 동안 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않으면 컨슈머 장애로 판단하고 리밸런스 진행함.
    • heartbeat.interval.ms
      • 이 하트비트를 얼마 간격으로 보낼건지 지정하는 옵션 (보통 세션 타임아웃의 1/3로 설정)
    • max.poll.interval.ms
      • HeartBeatThread 만 poll하고KafkaConsumer 는 poll하지 않아서 실제로 메시지를 가져가지 않는 경우를 대비한 옵션.
      • 지정된 시간 동안 컨슈머가 poll 호출하지 않으면 장애라고 판단함. 위 d2 링크 참조.
    • max.in.flight.requests.per.connection

컨슈머로 읽어오기 (콘솔 예제)

1
2
./kafka-console-consumer.sh --bootstrap-server $KAFKA_CLUSTER --topic peter-topic --group peter-consumer-group --partition 0 --from-beginning
./kafka-console-consumer.sh --bootstrap-server $KAFKA_CLUSTER --topic peter-topic --group peter-consumer-group --partition 0 --offset 1

http://10.168.241.137:9000/clusters/dev-umbum-kafka/consumers

토픽, 컨슈머 오프셋 CMAK에서 확인 가능

컨슈머로 읽어오기 (코드 예제)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class KafkaBookConsumer1 {
    @Test
    public void main() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.168.249.27:9092,10.168.249.17:9092,10.168.249.4:9092");
        props.put("group.id", "peter-consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("peter-topic"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        } finally {
            consumer.close();
        }
    }
}
  • 위는 그냥 예제 코드이고, 실제 사용할 때는 코드를 이렇게 imperative하게 작성하여 while true poll 하지는 않는다.
  • spring-kafka 를 사용한다. (@KafkaListener)

파티션과 메시지 순서 보장

카프카는 파티션 내에서의 순서만 보장한다. 파티션 간의 데이터 순서는 보장이 안된다.

  • a b c d e 1 2 3 4 5 차례대로 입력하는 경우

  • 위와 같이 파티션에 적재 되었다고 가정하면
    • 파티션의 앞에서 부터 offset을 증가시키며 컨슘하기 때문에 파티션 내의 순서는 보장 됨.
      • e.g., 전체 결과 내에서a d 1 4 는 반드시 이 순서대로 여야 함.da 보다 선행할 수 없음
    • 파티션 간의 순서는 보장이 안됨
      • e.g.,b a c 가 어떤 순서대로 받아오게 될지는 보장이 되지 않는다.
  • 극단적으로 전체 결과가a d 1 4 b e 2 5 c 3 가 될 수도 있음

그렇다면, 순서 보장이 필요하다면 어떻게 해야?

kafka 순서 보장

파티션 : 컨슈머 수

case : 파티션 수 < 컨슈머 수

컨슈머 그룹 안에서, 하나의 파티션에는 하나의 컨슈머만 연결할 수 있다.
하나의 파티션에 두개의 컨슈머가 연결되면 안정적으로 메시지 순서를 보장할 수 없기 때문임.
따라서 컨슈머04는 놀게 된다.

case : 파티션 수 > 컨슈머 수

컨슈머3이 처리속도가 2배가 아니라면 파티션 2,3에 있는 메시지 처리가 지연될 것이므로 새로운 컨슈머4를 추가해주는 것이 좋음.

커밋과 오프셋

  • 컨슈머 그룹의 컨슈머들은 각각의 파티션에 대해 자신이 가져간 메시지의 위치 정보(offset)을 기록하고 있음.
    • 즉, 어디까지 읽었는지? (=어디서 부터 읽어야 할지?)
    • 이 컨슈머 오프셋 정보도, 카프카 토픽으로 관리함. (0.9≤v)
  • 각 파티션에 대해 현재 위치를 업데이트 하는 동작을 commit 이라고 함.
  • 관련 옵션
    • enable.auto.commit
      • 오프셋을 자동으로 커밋 할 것인지 여부
      • 설정 시,poll() 호출할 때 마다 커밋할 시간(인터벌)이 되었는지 체크하고, 인터벌이 지났다면 이전poll() 요청을 통해서 가져온 마지막 오프셋 을 커밋함.
    • auto.commit.interval.ms
      • 오프셋 오토 커밋 인터벌 (default : 5s)

중복과 누락

컨슈머01이 아래와 같이 진행 했을 때
A.1, 2 를 가져옴
B.3, 4 를 가져오면서 2를 커밋
C.5, 6 을 가져 오면서 4를 커밋

중복이 발생하는 케이스

컨슈머1이 B 진행 후 처리하다 오류 및 리밸런싱 (컨슈머2가 이어받음)
⇒ offset은 2 이므로, 새로운 컨슈머2가3, 4 를 다시 컨슘 ⇒ 중복

누락이 발생하는 케이스

클라에서3, 4 처리가 아직 안끝났는데5, 6 가져오면서 4를 커밋(C)
결국3, 4 처리 실패하고 리밸런싱 ⇒ offset은 4이므로, 새로운 컨슈머2는5, 6 부터 컨슘 ⇒ 누락

[!info] 이러한 케이스를 견고하게 막아야 하는 경우, 수동 커밋 방식으로 처리하는 것이 가능함.

특정 파티션만을 지정하여 메시지를 가져와야 하는 경우

  1. key-value 형태로 파티션에 나뉘어져 저장되어 있고, 특정 파티션의 데이터만 가져와야 하는 경우
  2. 컨슈머 프로세스가 자동으로 관리되는 경우 (카프카가 관리해줄 필요 없는 경우. 죽으면 알아서 해당 파티션 컨슈머가 다시 뜬다거나)
1
2
3
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

특정 offset 부터 메시지 가져오도록 수동 설정 해야 하는 경우

1
consumer.seek(partition0, 2);

파티션과 오프셋 번호를 넘겨주면 컨슈머가 다음 poll() 하는 위치를 지정할 수 있음.

Lag 쌓여 있을 때 컨슈머 리밸런싱 하지 마라

  • 일반적으로 consuming 로직이 많이 무겁지 않기 때문에, 1개 thread가 n개의 partition을 컨슘 해서 처리하도록 구성 한다. (최대한 thread를 놀지 않게 하기 위해서)
  • e.g. 파티션 수 = 4, pod 수 = 1, concurrency = 2 이면, 총 2개의 thread가 각자 2개의 파티션을 맡아서 처리하게 된다.
  • 여기서 트래픽이 몰려 pod가 2개로 scale-out 되면, 파티션 수 = 4, pod 수 = 2, concurrency = 2 이므로, 총 4개의 thread가 파티션을 각각 1개씩 맡아서 처리하게 된다.
  • 얼핏 들었을 때는 thread 2 -> 4개가 처리하니 2배 더 빨리 처리할 것 같지만,
    • 실제로 thread 2개가 별로 무리해서 처리하고 있는 상황이 아니었다면 큰 속도 향상은 없고 (특히 nio이면)
    • Lag가 intensive하게 쌓이고 있는 상황에서는 컨슈머 리밸런싱으로 인한 처리 지연이 오히려 상황을 악화시킬 수 있다. (그 사이에 Lag이 오히려 더 쌓여서 해소되기 어려워진다.)

이미지 출처 및 참고

카프카, 데이터 플랫폼의 최강자

This post is licensed under CC BY 4.0 by the author.