[Kafka] 4장. 카프카 컨슈머Web_Backend/Kafka2023. 11. 2. 19:11
Table of Contents
카프카 컨슈머 : 중요 개념
1. 컨슈머와 컨슈머 그룹
1) 컨슈머 그룹이란?
- 카프카의 데이터 읽기는 다른 메시지 시스템의 읽기와 약간 다르다
- 여러 프로듀서들이 해당 토픽에 메시지를 쓰는 속도가 컨슈머가 메시지를 처리하는 속도보다 빠르다면
- 하나의 컨슈머만으로 처리한다면 추가되는 메시지 속도를 따라갈 수 없다
- 당연히 토픽을 소비하는 컨슈머의 수를 늘려야 한다
- 다수의 컨슈머들이 같은 토픽의 메시지들을 분담해 읽을 수 있어야 한다
- 카프카 컨슈머들은 컨슈머 그룹에 속한다.
- 다수의 컨슈머가 같은 토픽을 소비하며 같은 컨슈머 그룹에 속하면
- 각 컨슈머가 해당 토픽의 서로 다른 파티션을 분담해 메시지를 읽을 수 있다.
2) 컨슈머 그룹의 컨슈머 수
- 파티션 수 > 컨슈머 수 (1)
- C1 컨슈머는 T1 토픽의 네 개 파티션 모두에 있는 메시지들을 읽을 것이다.
- 파티션 수 > 컨슈머 수
- G1 컨슈머 그룹에 또 다른 C2 컨슈머를 추가한다면, 각 컨슈머는 두 개의 파티션에서만 읽어 들이면 됨!
- 파티션 수 = 컨슈머 수
- 만일 G1 컨슈머 그룹에 네 개의 컨슈머가 있다면, 각 컨슈머는 하나의 파티션에서만 읽으면 된다!
- 파티션 수 < 컨슈머 수
- 만약 컨슈머 그룹에 더 많은 수의 컨슈머를 추가한다면, 일부 컨슈머가 놀게 된다!
3) 용도 및 주의사항
- 이런 방법은 대기 시간이 긴 작업을 수행하는 컨슈머에 많이 쓰인다.
- 하나의 컨슈머로는 데이터가 추가되는 속도를 따라잡을 수 없기 때문!
- Ex) 토픽의 데이터를 DB에 쓰거나, 시간 소요가 많은 연산을 수행하는 것
- 많은 수의 파티션을 갖도록 토픽을 생성하는 이유이기도 하다.
- [주의] 한 토픽의 파티션 개수보다 더 많은 수의 컨슈머를 추가하는 건 의미가 없다.
4) 컨슈머 그룹 추가하기
- 같은 토픽의 데이터를 다수의 애플리케이션이 읽어야 하는 경우
- 각 애플리케이션이 토픽의 일부 메시지가 아닌 모든 메시지를 읽어야 한다.
- 이렇게 하려면 각 애플리케이션이 자신의 컨슈머 그룹을 가지도록 해야 한다.
- 새로운 컨슈머 그룹인 G2를 추가하면, G1 그룹과 무관하게 T1 토픽의 모든 메시지를 읽는다
- G1의 경우처럼 각 컨슈머가 파티션들을 분담하게 된다.
5) 정리하면
- 각 애플리케이션에서 하나 이상의 토픽에 저장된 모든 메시지를 읽어야 할 땐 애플리케이션마다 컨슈머 그룹 생성
- 토픽의 메시지 소비를 확장할 때는 기존 컨슈머 그룹에 새 컨슈머 추가
- 해당 그룹의 각 컨슈머는 토픽의 일부 파티션에 있는 메시지만 읽으면 된다.
2. 컨슈머 그룹과 리밸런싱
1) 리밸런싱이란
- 파티션 소유권 (ownership) : 각 컨슈머가 특정 파티션에 대응되는 것
- 컨슈머 그룹의 컨슈머들은 자신들이 읽는 토픽 파티션의 소유권을 공유한다.
- 이는 특정 컨슈머가 중단될 때도 마찬가지다!
- 그 컨슈머가 읽던 파티션은 남은 컨슈머 중 하나가 재할당받아 읽는다.
- 받아서 읽어야 하는 토픽들에 변경사항이 생길 때에도 재할당이 일어날 수 있다.
- 리밸런싱 (rebalancing) : 한 컨슈머부터 다른 컨슈머로 파티션 소유권을 이전하는 것
- 컨슈머 그룹의 가용성과 확장성을 높여주므로 중요하다!
- 쉽고 안전하게 컨슈머를 추가하고 삭제할 수 있다.
2) 무조건 좋은가?
- 리밸런싱은 정상적인 처리에서는 바람직하지 않다
- 리밸런싱을 하는 동안 컨슈머들은 메시지를 읽을 수 없어 해당 컨슈머 그룹 전체가 사용 불가 상태가 된다.
- 한 컨슈머로부터 다른 컨슈머로 파티션이 이전될 때, 해당 컨슈머의 이전 파티션에 대한 정보가 삭제된다.
3) 안전한 리밸런싱
- 그룹 조정자 (group coordinator) : GroupCoordinator 클래스의 인스턴스로 생성되어,
- 백그라운드 프로세스로 실행되는 카프카 브로커…
- 그룹 조정자로 지정된 카프카 브로커에게 컨슈머가 하트비트 (heartbeat) 전송
- 자신이 속한 컨슈머 그룹의 멤버십과 자신에게 할당된 파티션 소유권 유지 가능!
- 컨슈머가 일정 간격으로 하트비트를 보내면 잘 동작한다는 신호로 감지!
- 타임아웃 시간이 경과될 때까지 하트비트 전송을 하지 않으면 리밸런싱 시작!
3. 독자 실행 컨슈머
1) 하나의 컨슈머만 사용하는 이유
- 경우에 따라서는 훨씬 간단하게, 하나의 컨슈머만 필요할 때도 있다.
- 이 때는 한 토픽의 모든 파티션이나 하나의 특정 파티션 데이터를 항상 하나의 컨슈머가 읽으면 된다.
- 컨슈머 그룹이나 리밸런싱이 필요 없게 된다!
- 해당 컨슈머 전용의 토픽과 파티션을 할당하고, 메시지를 읽고 오프셋을 커밋하면 된다.
- 할당 (assign) : 컨슈머가 어떤 파티션을 읽어야 하는지 정확히 알고 있을 때는, 토픽을 구독하지 말고 할당한다.
- 컨슈머는 토픽을 구독할 때 다음 중 하나를 택한다.
- 컨슈머 그룹의 일원이 된다.
- 스스로 파티션을 할당한다.
- 컨슈머는 토픽을 구독할 때 다음 중 하나를 택한다.
2) 파티션 할당 예제
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("TOPIC");
// 특정 토픽의 사용 가능한 파티션을 모두 가져온다.
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
consumer.assign(partition); // 이 컨슈머에게 해당 파티션들이 할당됨!
while (true) {
ConsumerRecord<String, String> records = consumer.poll(1000);
for (ConsumerRecords<String, String> record : records) {
// 할 일 해주기...
}
cosumer.commitSync();
}
}
4. 커밋과 오프셋
- 카프카는 다른 JMS (Java Message Service) 시스템과 다른 방법으로 컨슈머가 읽는 레코드를 추적 관리한다.
- 커밋 (commit) : 파티션 내부의 현재 위치는 변경하는 것.
1) 컨슈머는 어떻게 오프셋을 커밋할까?
- 컨슈머가 오프셋을 커밋하면 내부적으로 __consumer_offsets라는 이름의 특별한 토픽에 메시지를 쓴다
- 이 토픽은 모든 컨슈머의 오프셋을 가진다.
- 정상적으로 굴러갈 땐 오프셋을 커밋해도 아무 일도 일어나진 않는다
- 비정상적(?) 일 때, 오프셋 커밋은 리밸런싱을 유발한다.
- 기존 컨슈머가 비정상적으로 종료되었을 때
- 새로운 컨슈머가 컨슈머 그룹에 추가됐을 때
- 두 번 처리되는 메시지
- 마지막으로 커밋된 오프셋이 컨슈머가 가장 최근에 읽고 처리한 메시지의 오프셋보다 작으면
- 그 사이의 메시지들이 두 번 처리된다.
- 오프셋 간 누락되는 메시지
- 마지막으로 커밋된 오프셋이 가장 최근에 읽고 처리한 메시지의 오프셋보다 크다면
- 그 사이의 메시지들은 컨슈머 그룹에서 누락된다.
- [정리] 오프셋 관리는 컨슈머 클라이언트 애플리케이션에 큰 영향을 준다
2) 자동 커밋
- 가장 쉬운 오프셋 커밋 방법… KafkaConsumer 객체가 자동으로 오프셋을 커밋해 준다.
- 자동 커밋도 폴링 루프에서 처리된다.
- 매번 폴링 할 때마다 KafkaConsumer 객체가 커밋한 시간이 되었는지 확인!
- [주의] 자동 커밋을 가끔 해주면… 레코드가 자주 중복되어 처리될 수 있다
- poll() 메서드에서 반환된 모든 메시지는 다시 poll()을 호출하기 전에 처리가 끝나도록 하는 게 중요!
- 결국… 편하긴 한데 중복 메시지 방지를 제어하기에는 충분하지 않다!
- 대부분 쓸 때는 직접 오프셋이 커밋되는 시간을 제어하려고 노력하게 된다.
3**) 현재 오프셋 커밋하기**
- enable.auto.commit=false로 설정
-
- 가장 신뢰성 있는 API - commitSync() : “poll()이 리턴한 마지막 오프셋을 커밋”한 뒤 커밋이 성공적으로 완료되면 리턴
- poll()에서 리턴된 모든 레코드의 처리가 완료되기 전 commitSync()를 호출하면, 애플리케이션이 크래시 되었을 때 커밋은 되었지만 아직 처리되지 않았던 메시지들이 누락될 수 있음. 마지막 메시지 배치의 맨 앞 레코드부터 리밸런스 시작지점까지 모든 레코드가 두 번 처리됨.
-
4**) 비동기적 커밋**
- 수동 커밋의 단점 : 하나의 브로커가 커밋 요청에 응답할 때까지 애플리케이션이 블록(애플리케이션 처리량 제한됨) → 덜 커밋한다면 처리량은 올라가지만, 리밸런스에 의한 중복메시지는 증가
- 또 다른 대안 : 비동기적 커밋(브로커가 커밋에 응답할 때까지 기다리는 대신, 요청만 보내고 처리를 계속)
- 단점 : commitSync()가 성공하거나 재시도 불가능한 실패가 발생할 때 까지 재시도하는 반면, commitAsync()는 재시도를 하지 않음.(이미 다른 시도가 성공했을 수 있기 때문)
- 콜백을 지정할 수 있지만, 재시도를 하기 위해 콜백을 사용하고자 하는 경우에는 커밋 순서에 관련된 문제에 주의를 기울여야 함.
5**) 동기적 커밋과 비동기적 커밋 함께 사용**
- 정상적인 상황에서는 comitAsync()를 사용
- 컨슈머를 닫는 상황에서는 commitSync()를 호출
6**) 특정 오프셋 커밋하기**
- customer 토픽의 파티션 3에서 마지막으로 처리한 메시지의 오프셋이 5000이라면…
- commitSync() 혹은 commitAsync()를 호출할 때
- record.topic(), recored.partition(), record.offset()+1을 전달할 수 있음
5**. 리밸런스 리스너**
- 컨슈머는 종료하기 전 혹은 리밸런싱이 시작하기 전에 정리작업 필요
- 파티션 해제 예정 : 해당 파티션에서 마지막으로 처리한 이벤트의 오프셋 커밋
- 파일핸들, DB연결 닫기
- 따라서, 컨슈머 API는 컨슈머에 파티션이 할당되거나 해제될 때 사용자의 코드가 실행되도록 하는 메커니즘 제공
- subscribe()를 호출 시 ConsumerRebalanceListener를 전달해 주면 됨
- ConsumerRebalance
- public void onPartitionAssigned(Collection <TopicPartition> partitions)
- 파티션이 컨슈머에 재할당된 후에 컨슈머가 메시지를 읽기 전 호출
- max.poll.interval.ms 안에 모두 실행될 수 있어야 함
- (협력) 리밸런스 발생시마다 호출. 즉 리밸런스가 발생했음을 컨슈머에게 알려주는 역할(새로 할당된 파티션이 없는 경우 빈 목록과 함께 호출)
- public void onPartitionRevoked(Collection <TopicPartition> partitions)
- 파티션 할당 해제 시(리밸런스, 컨슈머 닫기 등 상황)에서 호출
- 조급한 리밸런스 : 메시지 읽기 멈춤 뒤, 리밸런스가 시작되기 전
- 협력적 리밸런스 : 리밸런스가 완료될 때, 컨슈머에서 해제되어야 할 파티션에 대해서만
- 메서드가 호출될 때 빈목록이 주어지는 경우는 없음
- 오프셋 커밋 필요함.
- public void onPartitionLost(Collection <TopicPartition> partitions)
- 협력적 리밸런스 : 할당된 파티션이 리밸런스 알고리즘에 의해 해제되기 전에 다른 컨슈머에 먼저 할당된 예외적인 상황에서만 호출
- 주어지는 파티션들은 이 메서드가 호출되는 시점에서 이미 다른 컨슈머에 할당되어 있음
- 미 구현시 onPartitionRevoked 호출
- 협력적 리밸런스 : 할당된 파티션이 리밸런스 알고리즘에 의해 해제되기 전에 다른 컨슈머에 먼저 할당된 예외적인 상황에서만 호출
- public void onPartitionAssigned(Collection <TopicPartition> partitions)
6. 특정 오프셋의 레코드 읽어오기
- seekToBeginning(collection <TopicPartition> partitions)
- seekToEnd(collection <TopicPartition> partitions)
7. 폴링루프 벗어나는 법
- 즉시 탈출 : 다른 스레드에서 consumer.wakeup() 호출
- 대기 중이던 poll()이 WakeupException 발생시키며 중단
- 다음번에 처음으로 poll()이 호출될 때 예외 발생
- WakeupException를 처리해 줄 필요는 없지만, cousumer.close()는 호출되어야 함.
- 컨슈머를 닫으면 필요한 경우 오프셋을 커밋하고 그룸 코디네이터에게 컨슈머가 떠난다는 메시지를 전송
- → 컨슈머 코디네이터는 즉각적으로 리밸런식 실행(그룹 안의 다른 컨슈머에게 할당될 때까지 타임아웃 할 필요 없음
'Web_Backend > Kafka' 카테고리의 다른 글
[Kafka] 8장 ‘정확히 한 번’의미구조(멱등성) (1) | 2023.12.26 |
---|---|
[Kafka] 6장 카프카 내부 매커니즘 (1) | 2023.12.26 |
[Kafka] 3장. 카프카 프로듀서 : 카프카에 메시지 쓰기 (1) | 2023.11.02 |
[Kafka] 1. 카프카 시작하기 (0) | 2023.11.02 |
@Yanako :: Yana's coding story였는데요, 우당탕탕 개발일지가 맞는것같
야나의 코딩 일기장 :) #코딩블로그 #기술블로그 #코딩 #조금씩,꾸준히
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!