实现 exactly one 的逻辑

幂等性生产者(Idempotent Producer)

Kafka 引入了幂等性生产者,确保每个消息在 Kafka 中只会被写入一次。幂等性生产者通过以下方式实现,参考:Kafka 生产者幂等性机制

  • 序列号(Sequence Number):每个消息都有一个唯一的序列号,Kafka 会检查这个序列号,确保相同序列号的消息不会被重复写入。
  • 事务 ID(Transactional ID):生产者可以配置一个事务 ID,Kafka 会根据事务 ID 来管理消息的幂等性。

事务(Transactions)

Kafka 支持事务,允许生产者将一组消息作为一个事务进行提交。事务机制确保了这组消息要么全部成功写入,要么全部失败,从而避免了部分消息成功写入的情况。

消费者偏移量管理

消费者在读取消息时,会记录自己的偏移量(Offset)。为了实现 Exactly Once 语义,消费者需要将偏移量的提交与消息处理的结果绑定在一起。通常有两种方式:

  • 手动提交偏移量:消费者手动提交偏移量,确保只有在消息处理成功后才提交偏移量。
  • 事务性提交:消费者可以使用 Kafka 的事务机制,将偏移量的提交与消息处理的结果作为一个事务进行提交。Kafka 消费者事务性提交

生产者配置

为了实现 Exactly Once 语义,生产者需要进行以下配置:

  • enable.idempotence=true:启用幂等性生产者。
  • acks=all:确保所有副本都确认接收到消息后才算写入成功。
  • retries=Integer.MAX_VALUE:设置重试次数为最大值,确保消息不会因为临时故障而丢失。

消费者配置

为了实现 Exactly Once 语义,消费者需要进行以下配置:

  • enable.auto.commit=false:禁用自动提交偏移量,改为手动提交。
  • isolation.level=read_committed:设置隔离级别为 read_committed,确保消费者只能读取到已提交的消息。

实现 exactly one 的示例代码

以下是一个简单的示例代码,展示了如何配置 Kafka 生产者和消费者以实现 Exactly Once 语义:

生产者配置

Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("enable.idempotence", "true");
producerProps.put("acks", "all");
producerProps.put("retries", Integer.MAX_VALUE);
 
Producer<String, String> producer = new KafkaProducer<>(producerProps);

消费者配置

Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList("my-topic"));