* 이 글은 카프카 핵심 가이드 - 대규모 실시간 데이터와 스트림 처리 [ 개정증보판 ]를 읽고 스터디 후 작성한 글입니다.
3장. 카프카 프로듀서 : 카프카에 메시지 쓰기
- 카프카 사용 예시
- 큐, 메시지 버스, 데이터 저장 플랫폼 등으로 활용
- 프로듀서(데이터를 씀)나 컨슈머(데이터를 읽어 올 때 사용) 혹은 두 가지 기능을 모두 수행하는 애플리케이션 생성해야 함.
- 즉, 아파치 카프카는 개발자들이 카프카와 상호작용 하는 애플리케이션을 개발할 때 사용 할 수 있는 ‘클라이언트 API’와 함께 배포됨
- 프로듀서를 사용하는 방법에 대해 배움
- 프로듀서의 디자인
- 프로듀서 주요 요소의 전체적인 모습
- KafkaProducer와 ProducerRecord객체 생성법
- 카프카에 레코드 전송하는 법
- 카프카가 리턴할 수 있는 에러를 어떻게 처리하는지
- 프로듀서의 작동을 제어하기 위해 사용되는 설정 옵션
- 파티션 할당 방식을 정의하는 파티셔너과 객체의 직렬화방식을 정하는 ‘시리얼라이저’
- 이를 작성하기 위한 방법
🐦 서드 파티 클라이언트 카프카는 정식 클라이언트 이외에도 공식 클라이언트가 사용하는 TCP/IP 패킷의 명세를 공개함. 이에 따라 다양한 프로그래밍 언어별로 카프카의 TCP패킷 명세를 구현한 클라이언트들이 개발되어 있으며, 이를 통해 Java 이외에 C++, Python, Go 등으로도 쉽게 Kafka를 사용하는 애플리케이션을 개발할 수 있음.(https://cwiki.apache.org/confluence/display/KAFKA/Clients)
3.1. 프로듀서 개요
애플리케이션이 카프카에 메시지를 써야 하는 다양한 상황
- 감사 혹은 분석을 목적으로 하는 사용자 행동 기록
- 성능 매트릭 기록
- 로그 메시지 저장
- 스마트 가전 정보 수집
- 다른 애플리케이션과의 비동기적 통신 수행
- 임의의 정보 데이터베이스 저장 전 버퍼링
다양한 상황에 따라 요구되는 조건도 다양함
- 메시지의 유실 용납 여부
- 중복 허용 여부
- 반드시 지켜야 하는 지연(latency)이나 처리율(throughput)이 있는가
서로 다른 요구 조건은 카프카에 메시지를 쓰기 위한 프로듀서 API를 사용하는 방식과 설정에 영향을 끼침
프로듀서 동작 과정
- ProducerRecord(토픽, 파티션, 키, 밸류) 객체 생성
- 토픽(topic), 밸류(value) : 필수 지정
- 키(key), 파티션 지정 : 선택사항
- ProducerRecord 객체를 전송하는 API 호출
- 키와 값 객체가 네트워크 상에서 전송될 수 있도록 직렬화(바이트 배열로 변환)
- (파티션이 명시적으로 지정되어 있지 않은 경우) 해당 데이터를 파티셔너로 전송
- 파티션 결정(주로 ProducerRecord 객체의 키값이 보통 기준)
- Q) Key 값은 없지만 파티션만 지정할 수 있나?
- 없다(공식 문서 확인 결과 생성자에 해당 경우 없음 https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html)
- 토픽과 파티션이 확정된 메시지는, 같은 토픽 파티션으로 전송될 레코드를 모은 레코드 배치에 추가
- 별도의 스레드가 적절한 브로커에 전송
- 브로커가 메시지를 받으면, 응답을 되돌려줌
- 메시지 저장 성공 시 : 토픽, 파티션, 레코드 오프셋을 담은 RecordMetadata객체 리턴
- 메시지 저장 실패 시 : 에러 리턴
- 프로듀서가 에러를 수신한 경우, 메시지 쓰기를 포기하고 사용자에게 에러를 리턴하기 전까지 몇 번 더 재전송을 시도할 수 있음.
- ProducerRecord(토픽, 파티션, 키, 밸류) 객체 생성
- 토픽(topic), 밸류(value) : 필수 지정
- 키(key), 파티션 지정 : 선택사항
- ProducerRecord 객체를 전송하는 API 호출
- 키와 값 객체가 네트워크 상에서 전송될 수 있도록 직렬화(바이트 배열로 변환)
- (파티션이 명시적으로 지정되어 있지 않은 경우) 해당 데이터를 파티셔너로 전송
- 파티션 결정(주로 ProducerRecord 객체의 키값이 보통 기준)
- Q) Key 값은 없지만 파티션만 지정할 수 있나?
- 없다(공식 문서 확인 결과 생성자에 해당 경우 없음 https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html)
- 토픽과 파티션이 확정된 메시지는, 같은 토픽 파티션으로 전송될 레코드를 모은 레코드 배치에 추가
- 별도의 스레드가 적절한 브로커에 전송
- 브로커가 메시지를 받으면, 응답을 되돌려줌
- 메시지 저장 성공 시 : 토픽, 파티션, 레코드 오프셋을 담은 RecordMetadata객체 리턴
- 메시지 저장 실패 시 : 에러 리턴
- 프로듀서가 에러를 수신한 경우, 메시지 쓰기를 포기하고 사용자에게 에러를 리턴하기 전까지 몇 번 더 재전송을 시도할 수 있음.
3.2. 카프카 프로듀서 생성하기
카프카 프로듀서 필수 속성값
bootstrap.servers
카프카 클러스터와 첫 연결을 생성하기 위해 프로듀서가 사용할 브로커의 host:port 목록
- 모든 브로커를 포함할 필요 없음(프로듀서가 첫 연결 생성한 뒤 추가 정보를 받아오기 때문)
- 2개 이상 지정 권유(작성한 브로커가 정지한 경우에도 연결을 개시할 수 있도록)
key.serializer
카프카에 쓸 레코드의 키값을 직렬화하기 위한 시리얼라이저(serializer) 클래스의 이름
- org.apache.kafka.common.serialization.Serializer 인터페이스를 구현하는 클래스 이름 지정
- 카프카의 client 패키지 ByteSerializer, StringSerializer, IntegerSerializer 등등 포함됨
- 즉, 자주 사용되는 타입을 사용할 경우 시리얼라이저 구현 필요 없음
- 레코드에 키값 없이 밸류만 보낼 때에도 key.serializer 설정을 해주어야 함
- VoidSerializer를 사용해서 Key값을 void 타입으로 지정할 수 있음
value.serializer
카프카에 쓸 밸류값을 직렬화하기 위해 사용할 시리얼라이저 클래스의 이름
producerRecord
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
Creates a record with a specified timestamp to be sent to a specified topic and partition
필수 속성만 지정하고, 나머지는 default 속성을 이용하는 프로듀서 생성 예시
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer<String,String> producer = new KafkaProducer<String, String>(kakaProps);
메시지 전송 방법
파이어 엔 포겟(Fire and forget)
메시지를 서버에 전송만 하고, 성공 혹은 실패 여부를 신경 쓰지 않음
- 카프카 프로듀서는 자동으로 전송 실패한 메시지를 재전송 시도하기 때문에 대부분의 경우 성공적 전달
- 재시도를 할 수 없는 에러가 발생하거나, 타임아웃 시에는 애플리케이션은 아무런 정보나 예외를 전달받지 않음
동기적 전송(Synchronous send)
카프카 프로듀서는 항상 비동기적으로 작동.
- send() 메서드는 Future객체를 리턴. 다음 메시지를 전송하기 전 get() 메서드를 호출해 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인해야 함.
비동기적 전송(Asynchronous send)
콜백함수와 함께 send() 메서드 호출 시, 카프카 브로커로부터 응답을 받은 시점에서 자동으로 콜백 함수 호출
카프카 프로듀서 객체는 메시지를 전송하려는 다수의 스레드를 동시에 사용할 수 있음.
3.3. 카프카 메시지 전달하기
3.3.0. 파이어 엔 포겟(Fire and forget)으로 메시지 전송하기
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Precision Products", "France"); // step 1
try {
producer.send(record); // step 2
} catch (Exception e){
e.printStackTrace(); // step 3
}
- ProducerRecord 객체 생성
- 토픽 이름 : 항상 String
- 키 : key.serializer 에 지정된 타입
- 밸류 : value.serializer 에 지정된 타입
- 프로듀서 객체의 send() 메서드 사용
- 메시지는 버퍼에 저장되었다가 별도의 스레드에 의해 브로커로 보내짐.
- send() 메서드를 RecordMetadata를 반환하지만, 여기서는 리턴값을 무시.
- 메시지의 성공 여부를 알 방법이 없음.
- 메시지가 조용히 누락되어도 상관없는 경우 위처럼 사용
- 카프카 브로커에 메시지를 전송할 때 발생하는 에러 혹은 브로커 자체에 발생하는 에러 등을 무시한다고 하더라도, 카프카로 메시지를 보내기 전 에러가 발생한 경우 여전히 예외가 발생할 수 있기 때문에 예외처리.
- SerializationException : 직렬화 실패
- TimeoutException : 버퍼가 가득 찬 경우
- InterruptException : 전송작업을 수행하는 스레드에 인터럽트가 걸린 경우
3.3.1. 동기적으로 메시지 전송하기
동기적으로 처리하더라도 카프카 브로커가 쓰기 요청(produce request)에 에러 응답을 내놓거나, 재전송 횟수가 소진되었을 때 발생되는 예외를 받아서 처리할 수 있음. 여기서 중요한 균형점은 성능(performance).
“성능 이슈로 동기적 메시지 전송은 대부분의 애플리케이션에서 사용되지 않음”
카프카 클러스터에 얼마나 작업이 몰리느냐에 따라서 브로커는 쓰기 요청에 응답하기까지 최소 2ms에서 최대 몇 초까지 지연될 수 있음. 즉, 동기적으로 메시지를 전송하는 경우 전송을 요청하는 스레드는 이 시간 동안 아무것도 하지 않으면서 대기해야 함(성능 하락).
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Precision Products", "France");
try {
producer.send(record).get(); // step 1
} catch (Exception e){
e.printStackTrace(); // step 2
}
- 카프카로부터 응답이 올 때까지 대기하기 위해 Future.get() 메서드 사용.
- 레코드 전송 실패 : get() 메서드는 예외를 발생시킴
- 레코드 전송 성공 : get() 메서드는 RecordMetadata 객체 리턴
- 전송하기 전이나 전송하는 도중에 에러 발생 시 예외 발생.
KafkaProducer의 두 종류의 에러
- 재시도 가능한 에러
- 메시지를 다시 전송함으로써 해결될 수 있는 에러
- 재전송 횟수를 초과해도 에러가 해결되지 않은 경우에 한해 재시도 가능한 에러가 발생.
- → 이러한 에러가 발생 시 자동으로 재시도하도록 Kafka Producer 설정 가능
- ex) 연결에러(연결 회복 시 해결), 파티션 리더가 전달받지 않은 에러(파티션 리더가 새로 선정되고, 메타데이터가 업데이트되면 해결)
- 메시지를 다시 전송함으로써 해결될 수 있는 에러
- 재시도한다고 해도 해결되지 않는 에러 → KafkaProducer는 재시도 없이 바로 예외 발생시킴
- ex) 메시지가 너무 큰 경우
3.3.2. 비동기적으로 메시지 전송하기(콜백)
대부분의 애플리케이션의 경우 카프카에 메시지를 작성 성공하고 난 뒤 응답(메타데이터)이 필요 없음. 반대로 메시지 전송에 완전히 실패한 경우에는 실패한 내용을 알아야 함. 메시지를 비동기적으로 처리하고도 여전히 에러를 처리하는 경우, 프로듀서는 레코드를 전송할 때 콜백을 지정할 수 있도록 함.
private class DemoProducerCallback implements Callback { // step 1
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e){
if(e != null) {
e.printStackTrace(); // step 2
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountury", "Biomedical Materials", "USA"); // step 3
producer.send(record, new DemoProducerCallback()); // step 4
- 콜백 기능 사용을 위해 “org.apach.kafka.clients.priducer.Callback”인터페이스를 구현하는 클래스 필요
- 해당 인터페이스에는 onCompletion()이라는 단 하나의 메서드만 정의됨
- 카프카가 에러를 리턴하면, onCompletion() 메서득 null이 아닌 Exception객체를 받게 됨.
- 기존과 동일
- 레코드 전송 시 Callback 함수를 두 번째 매개변수로 전달.
🦂 콜백 : 메인 스레드에서 실행. 따라서 콜백은 충분히 빨라야 함. 따라서 콜백 안에서 블로킹 작업을 수행하는 것 도 권장되지 않음.
3.4. 프로듀서 설정하기
필수 설정값 이외에 다양한 설정값은 합리적인 기본값을 가지고 있기 때문에 일일이 설정값을 잡아줄 필요는 적음
몇몇 설정값은 메모리 사용량, 성능, 신뢰성 등에 영향을 끼침. 아래는 해당 설정값들임.
3.4.1. client.id
프로듀서와 그것을 사용하는 애플리케이션을 구분하기 위한 논리적 식별자. 잘 설정해야 트러블 슈팅에 용이.
- “IP 104.27.155.134에서 인증 실패가 자주 발생하네?”
- “주문 확인 서비스가 인증에 실패하고 있는 듯하네?”
3.4.2. acks
- default
- 2.8.0 : 1
- 3.0 : all
프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정. 메시지 유실 가능성 즉, 신뢰성에 영향을 끼침.
acks = 0
프로듀서는 메시지가 성공적으로 전달되었다고 간주하고 중개인의 응답을 기다리지 않음
- 네트워크가 허용하는 한 빠르게 메시지를 보낼 수 있음(높은 처리량, 높지 않은 신뢰도 필요시 설정)
acks = 1
프로듀서는 리더 레플리카가 메시지를 받는 순간 성공 응답을 받음. 만약, 리더에 메시지를 쓸 수 없다면 프로듀서는 에러를 응답받고, 데이터 유실을 피하기 위해 재전송.
- 리더에 크레시가 난 상태에서 해당 메시지가 복제되지 않은 채로 새 리더가 선출된 경우 메시지 유출 가능
acks = all
프로듀서는 모든 인-싱크 레플리카(in-sync replica)에 메시지가 전달된 후에야 중개인으로부터 성공했다는 응답을 받음.
- 오타? 실제로는 도서 P32쪽 min.insync.replicas 에 설정된 수 이상의 레플리카에 복제된 후에 성공 응답하는 것으로 알고 있음
- 가장 안전한 형태. 지연시간은 더 길어짐.
🦂 acks 설정을 내려서 신뢰성을 낮추면 프로듀서 지연을 줄일 수 있음. 하지만 “종단 지연(end to end latency)”의 경우 세값이 모두 똑같음. → 카프카는 일관성을 유지하기 위해 컨슈머가 레코드를 읽어가기 위한 조건에 모든 인싱크 레플리카에 값이 복제된 이후에만 값을 읽을 수 있는 제한을 걸어놓았기 때문.
Q) 어차피 ISR이 모두 싱크 되어야 읽어올 수 있다면, acks를 all로 두는 것이 가장 합리적인 방법 아닌가?
→ 신뢰성은 보장될 필요는 없지만 대량의 데이터를 파이프라인에 태울 때, 전송 속도가 느리면 스레드가 계속 블록 되고 밀리고 밀리면 결국 프로듀싱하는 쪽에서 문제가 생길 수 있음. ex) 온라인 코테 시스템 마우스 인아웃 로그 적채용 카프카
3.4.3. 메시지 전달 시간
ProducerRecord를 보낼 때 걸리는 시간에 대한 두 구간
- send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간
- cf) 동기적 호출일 경우 연속적으로 블록 되기 때문에 각각 구간을 나눠서 계산할 수 없음
- 이 시간 동안 send()를 호출한 스레드는 블록 됨
- send()에 대한 비동기 호출이 성공적으로 리턴한 시각부터(성공했건, 실패했건) 콜백이 호출될 때까지 걸리는 시간
- ProducerRecord가 전송을 위해 배치에 추가된 시점에서부터 카프카가 성공 응답을 보내거나 재시도 불가능한 실패가 일어나거나, 아니면 전송을 위해 할당된 시간이 소진될 때까지의 시간과 동일.
max.bolck.ms
프로듀서가 얼마나 오랫동안 블록되는지 결정
- send()를 호출했을 때.
- partitionsFor을 호출해서 명시적으로 메타데이터를 요청했을 때
위 메서드는 프로듀서의 전송 버퍼가 가득 차거나, 메타 데이터가 아직 사용 가능하지 않을 때 블록됨. 이 상태에서 max.block.ms만큼 시간이 흐르면 예외가 발생함.
delivery.timeout.ms
이 설정은 레코드 전송 준비가 완료된 시점(즉, send()가 문제없이 리턴되고 레코드가 배치에 저장된 시점)부터 브로커의 응답을 받거나 아니면 전송을 포기하게 되는 시점까지의 제한시간을 정함.
- linger.ms 와 request.timeout.ms를 더한 값 보다 커야 함.
만약 프로듀서가 재시도를 하는 도중에 delivery.timeout.ms를 넘어가버린다면, 마지막으로 재시도하기 전에 브로커가 리턴한 에러에 해당하는 예외와 함께 콜백이 호출됨.
레코드가 배치로 넘어가기 전에 delivery.timeout.ms가 넘어가면, 타임아웃 예외와 함께 콜백이 호출됨.
🦂 사용자 입장에서 메시지 전송에 기다릴 수 있는 시간만큼 delivery.timeout.ms를 최댓값으로 설정할 수 있음. 이 값을 몇 분 정도로 두고 retries의 기본값을 그대로 두는 것(사실상 무제한).
request.timeout.ms
프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지를 결정.
즉, 쓰기 요청 후 전송을 포기하기까지 대기하는 시간.(재시도, 실제 전송에 소요되는 시간 등을 포함하지 않음)
응답 없이 타임아웃이 발생하는 경우, 프로듀서는 재전송을 시도하거나, TimeoutException과 함께 콜백 실행.
retries, retry.backoff.ms
프로듀서가 서버로부터 에러 메시지를 받았을 때, 재전송할 횟수와 재전송을 보낼 시간 텀을 설정함.
retries : 재전송 요청 횟수
- 프로듀서가 전송을 포기하고 에러를 발생시킬 때까지 재전송하는 횟수
retry.backoff.ms : 재전송 요청 시간 텀
- 기본적으로 100ms동안 대기하고 난 뒤 재전송을 시도하지만, 해당 값을 설정할 수 있음.
retry관련 값을 조정하는 것은 권장되지 않음
- 대신 크래시난 브로커가 정상 상태로 돌아오기 전까지의 시간을 테스트 한 뒤 delivery.timeout.ms 매개변수를 잡아주는 것을 권고함.
- 즉, 재전송을 시도하는 전체 시간이 크래시 복구시간보다 더 길도록 설정.
개발자는 재시도 불가능한 에러를 처리하거나 재시도 횟수가 고갈되었을 경우에 대한 처리에만 집중하면 됨.
🦂 재전송 끄는 법 : retries = 0
3.4.4. linger.ms
현재 배치를 전송하기 전까지 대기하는 시간 결정.
KafkaProducer가 메시지 배치를 전송하는 조건
- 현재 배치가 가득 찼을 때
- linger.ms에 설정된 제한 시간이 되었을 때
- default : 메시지 전송에 사용할 수 있는 스레드가 있을 때 곧바로 전송
- 0보다 큰 값 : 프로듀서가 중개인에 메시지 배치를 전송하기 전에 메시지를 추가하도록 해당 ms만큼 기다리게 만들 수 있음.
- 해당 값 증가 → 처리율 증가(압축이 되어있거나 하는 경우 훨씬 효과적이기 때문)
3.4.5. buffer.memory
메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기를 결정.
애플리케이션이 서버에 전달 가능한 속도보다 더 빠르게 메시지를 전송한다면, 메모리가 가득 찰 수 있음
- 이 경우 추가로 호출되는 send()는 max.block.ms동안 블록되어 버퍼 메모리에 공간이 생기기를 기다려야 함.
- 해당 시간 동안 대기하고도 공간 미확보 시 예외 발생.
3.4.6. compression.type
default : 메시지는 압축되지 않은 상태로 전송됨.
snappy, gzip, lz4, zstd 중 하나로 해당 값을 설정하면 해당 압축 알고리즘을 사용해서 메시지를 압축한 뒤 브로커에 전달.
Snappy
- cpu 부하 적음.
- 성능 좋음.
- 구축률 좋음
- 즉, 압축 성능과 네트워크 대역 폭 모두 중요할 때 권고
Gzip
- cpu와 시간을 더 많이 사용하지만 압축률은 더 좋음
- 즉, 네트워크가 제한적일 때 권고
압축 사용 시 카프카로 메시지를 전송할 때 자주 일어나는 병목 되는 네트워크 사용량, 저장공간을 절약
3.4.7. batch.size
배치에 사용될 메모리의 양 결정(’ 바이트 단위’)
작게 설정할 시 오버헤드 발생
3.4.8. max.in.flight.requests.per.connection
프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지 수 결정.
- default : 5
해당 값 증가 → 메모리 사용량 증가 → 처리량 역시 증가.
- 단일 데이터 센터에 카프카를 설정할 경우 해당 값이 2일 때 처리량 최대.
- 기본값인 5를 사용할 때도 비슷한 성능
enable.idempotence = true설정
해당 설정을 통해 flight가 여러 개일 때에도 순서를 보장하고, 재전송이 발생하더라도 중복이 발생하는 것을 방지.
Q) 중복을 방지한다는 점은 확인했으나, 순서를 보장하는 방법은 어떤 방식으로 이루어지는 건지?
→ 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여서 보내고, 브로커는 기대하던 인덱스가 아닌 다른 인덱스의 레코드가 들어오면 값을 우선 저장하지 않는 방식..? 뒤의 장에서 enable.idempotence 인덱싱 관련 브로커 오류, 프로듀서 오류시의 대응 등에 관해서도 깊게 다루는 부분이 있으니 추후에 다뤄보자.
3.4.9. max.request.size
프로듀서가 전송하는 쓰기 요청의 크기 결정.
- 메시지의 최대 크기를 제한하기도 하지만, 한 번의 요청에 보낼 수 있는 메시지의 개수를 제한하기도 함
- 용량을 넘지 않을 때까지만 해당 요청에 보낼 메시지를 추가할 수 있기 때문
message.max.byte : 브로커가 받아들일 수 있는 메시지 크기
해당 설정과 위 설정을 동일하게 맞추는 것을 권고
3.4.10. receive.buffer.bytes, send.buffer.bytes
데이터를 읽거나 쓸 때 소켓이 사용하는 TCP송수신 버퍼의 크기를 결정
-1 : 운영체제 기본 값 사용
프로듀서나 컨슈머가 다른 데이터 센터에 위치한 브로커와 통신할 경우 네트워크 대역폭은 낮고 지연은 길어지는 것이 보통이기 때문에, 이 값들을 올려 잡아주는 것이 권고
3.4.11. enable.idempotence
0.11부터 카프카는 ‘정확히 한 번’ 의미구조 exactly once semantics를 지원하기 시작.
즉, 신뢰성을 보장하고자 함.
enable.idempotence =true
멱등적 프로듀서 기능이 활성화되면, 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여서 보냄.
만약 브로커가 동일한 번호를 가진 레코드를 2개 이상 받은 경우
- 브로커는 둘 중 하나만 저장
- 프로듀서는 별다른 문제를 발생시키지 않는 DuplicateSequenceException을 받게 됨
🦂 멱등적 프로듀서 기능 활성화 조건
1. max.in.flight.request.per.connection : 5 이하
2. retries : 1 이상
3. acks : all
조건 미충족시 ConfigException 발생
3.5. 시리얼라이저
String, Integer, Byte 등의 시리얼라이저를 기본으로 제공하지만, 이것만으로 모든 데이터를 직렬화할 수 없음.
3.5.1. 커스텀 시리얼라이저
카프카로 전송해야 하는 객체가 단순히 문자열이나 정숫값이 아닌 경우 두 가지 선택지
- 레코드를 생성하기 위해 에이브로, 스리프트, 프로토버프 등 범용 직렬화 라이브러리 사용
- 사용하고 있는 객체를 직렬화하기 위한 커스텀 직렬화 로직 작성
2번의 경우 기존 형식과 새로운 형식 사이의 호환성 유지에 어려움이 있기에 직렬화 라이브러리 권고.
3.5.2. 아파치 에이브로를 사용해서 직렬화하기
권장되는 방법.
3.5.3. 카프카에서 에이브로 레코드 사용하기
3.6. 파티션
접착성 stiky 처리
Q) 접착성 처리 교재 사진 잘못된 거겠죠? 접착성 처리 예시 사진에서 7→2?
기본 파티셔너 사용 중에 키값이 null인 레코드가 주어지는 경우, 레코드는 현재 사용 가능한 토픽의 파티션 중 하나에 랜덤 하게 저장됨(라운드 로빈 알고리즘).
프로듀서가 메시지 배치를 채울 때 다음 배치로 넘어가기 전, 이전 배치를 먼저 채우게 되어있음.
- 더 적은 요청으로 같은 수의 메시지를 전송하게 함
- 지연시간 감소, broker의 CPU사용량 감소
키값이 지정된 상황에서 기본 파티셔너를 사용할 경우, 카프카는 키값을 해시한 결과를 기준으로 메시지를 저장할 파티션을 정함.
- 특정 파티션에 장애가 발생한 상태에서 해당 파티션에 데이터를 쓰려하는 경우 장애 발생
RoundRobinPartitioner
키값을 포함하고 있는 경우에도 랜덤 파티션 할당
UniformSkickyPartitioner
키값을 포함하고 있는 경우에도 접착성 파티션 할당
- 전체 파티션에 대해서 균등한 분포를 가지도록 파티션이 할당됨
파티션 수가 변하지 않는 한, 기본 파티셔너가 사용될 때 특정 키값에 대응되는 파티션은 변화하지 않음
→ 파티션 수가 변하는 순간 해당 성질은 더 이상 유효하지 않음.
→ 따라서 토픽 생성 시 충분한 파티션을 생성하고 이후에는 추가하지 않는 것을 권고
3.6.1. 커스텀 파티셔너 구현하기
보통의 경우 파티션 결정은 키값의 해시처리에 의한 파티션 결정이 이루어짐.
3.7. 헤더
레코드의 키/밸류값을 건들지 않고 메타데이터를 심을 때 사용. 메시지의 전달 내역을 기록하는 것이 주목적. 메시지를 파싱 할 필요 없이 헤더에 심어진 정보만으로 메시지를 라우팅 하거나 출처를 추적할 수 있음.
3.8. 인터셉터
카프카 클라이언트의 코드를 고치지 않으면서 작동을 변경해야 하는 경우, ProducerInterceptor를 사용.
ProducerRecord <K, V> onSend(ProducerRecord <K, V> record)
프로듀서가 레코드를 브로커로 보내기 전, 직렬화되기 직전에 호출됨.
- 레코드에 담긴 정보를 볼 수 있음.
- 유효한 ProducerRecord를 리턴하게만 하면 됨.
void onAcknowledgement(RecordMetadata metadata, Exception e)
카프카 브로커가 보낸 응답을 수정할 수는 없지만, 그에 담긴 정보를 읽을 수 있음.
3.9. 쿼터, 스로틀링
카프카 브로커에는 쓰기/읽기 속도를 제한할 수 있는 기능이 있음. 한도(quota)를 설정해 주면 됨.
쿼터는 기본값을 설정하거나, 특정한 client.id값에 대해 설정하거나, 특정한 사용자에 대해 설정할 수 있음.
- 사용자에 대해 설정된 쿼터는 보안 기능과 클라이언트 인증 기능이 활성화되어있는 클라이언트만 작동한다.
- 렉을 줄이기 위한 커스터마이징으로 이해
'Web_Backend > Kafka' 카테고리의 다른 글
[Kafka] 8장 ‘정확히 한 번’의미구조(멱등성) (1) | 2023.12.26 |
---|---|
[Kafka] 6장 카프카 내부 매커니즘 (1) | 2023.12.26 |
[Kafka] 4장. 카프카 컨슈머 (0) | 2023.11.02 |
[Kafka] 1. 카프카 시작하기 (0) | 2023.11.02 |
야나의 코딩 일기장 :) #코딩블로그 #기술블로그 #코딩 #조금씩,꾸준히
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!