kafka保证消息不丢失

简述

在kafka的使用过程中,消息传递有下图三个步骤,消息的丢失也就在这三个步骤中:发送过程中丢失、同步过程中丢失、拉取过程中丢失。

image-20240331213103843

发送过程中丢失

发送方式

生产者的发送方式有三种:

1、简单发送,不关心发送结果,所以发送失败消息丢失也不知道。

1
2
3
4
5
6
7
8
ProducerRecord<String,String> record = new ProducerRecord<>("topicName","key","value");
try{
//这里只是把消息放进了一个缓冲区中,然后使用单独的线程将消息发送到服务端
producer.send(record);
}
catch(Exception){
e.printStackTrace();
}

2、同步发送,等待发送返回

1
2
3
4
5
6
7
8
9
ProducerRecord<String,String> record = new ProducerRecord<>("topicName","key","value");
try{
//send方法返回的是Future<RecordMetaData> 对象,然后我们可以调用get()方法等待响应
Future<RecordMetaData> future = producer.send(record);
future.get();
}
catch(Exception){
e.printStackTrace();
}

3、异步发送,执行回调方法

1
2
3
4
5
6
7
8
private class DemoProducerCallback implements Callback{
@override
public void onCompletion(RecordMetadata recordMetadata,Exception e){
//发生错误的回调方法,可以写入日志,或写入DB通过其它线程重重试,保证最终的数据送达
}
}
ProducerRecord<String,String> record = new ProducerRecord<>("topicName","key","value");
producer.send(record,new DemoProducerCallback()))

在这三种发送方法中,第一种方法,无返回,不感知,所以也无法保证消息不丢失。所以要保证消息不丢失,只能选择第二种或者第三种,一般情况下更推荐第三种。

acks参数设置

在生产者中有acks参数,该参数指定了kafka的多少个副本同步后才算消息发送成功。该参数取值范围:

1、acks=0,表示生产者在消息后不管有没有在leader磁盘上落盘,就认为消息发送成功。

2、acks=1,表示生产者在消息后,在leader磁盘上落盘,就认为消息发送成功,不管其他follower有没有同步。

3、acks=all,表示生产者在消息后,在leader磁盘上落盘,其他follower都同步落盘, 认为消息发送成功。

image-20240331222238322

小结

通过设置发送方式和acks参数,可以保证在发送方式中不丢失,甚至acks参数都可以保证后面的同步过程中消息不丢失。

发送方式一般设置为异步发送,acks参数默认设置为1。

同步过程中丢失

在同步过程中,除了上面说的答acks参数,还有其他副本机制保证消息不丢失。避免leader节点的崩溃导致消息的丢失。

broker中的配置项,unclean.leader.election.enable = false,表示不允许非ISR中的副本被选举为首领,以免数据丢失。

ISR:是指与leader保持一定程度(这种范围是可通过参数进行配置的)同步的副本和 leader 共同被称为ISR

OSR:与leader同步时,滞后很多的副本(不包括leader)被称为OSR

AR,分区中所有的副本统称为AR。AR = ISR + OSR

拉取过程中丢失

设置 enable.auto.commit = false,在consumer端消费消息操作完成以后再手动提交 offset,类似于下文中的代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void consumerMsg(){
while(true){
//这里的poll(100)指的是kafka server端没有消息时,连接等待的时间,超过该时间立即返回空给consumer
ConsumerRecords<String,String> records = consomer.poll(100);
for(ConsumerRecord<String,String> record : records){
// 这里是消费消息的逻辑(简单逻辑输入到控制台)
System.out.printIn(record.value));
//提交偏移量
try{
consumer.commitSync(); //同步提交 如果异步的话,可以使用 consumer.commitAsync();
}
catch(CommitFailedException ex){
log.error("commit fail");
}
}
}
}