事务型 Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。

设置事务型 Producer 的方法也很简单,满足两个要求即可:

  • 开启 enable.idempotence = true
  • 设置 Producer 端参数 transactional.id,最好为其设置一个有意义的名字。

此外,你还需要在 Producer 代码中做一些调整,如这段代码所示:

producer.initTransactions();
try {
	producer.beginTransaction();
	producer.send(record1);
	producer.send(record2);
	producer.commitTransaction();
} catch (KafkaException e) {
	producer.abortTransaction();
}

和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransactionbeginTransactioncommitTransactionabortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。

实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:

  1. read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  2. read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。