消费消息时的基本步骤

配置 Consumer 属性

Java Kafka Consumer 需要配置的属性信息

  • boostrap.servers: 集群地址
  • key.deserializer
  • value.deservializer
  • group.id: 消费组 ID
  • auto.offset.reset
  • group.instance.id: 消费者实例 ID,如果指定,那么在消费者组中使用此 ID 作为 memberId 前缀
  • partition.assignment.strategy: 分区分配策略
  • enable.auto.commit: 默认 true
  • auto.commit.interval.ms: 默认 5000ms
  • fetch.max.bytes: 消费者获取服务器端一批消息最大的字节数。如果服务端一批次的数据大于该值,仍然可以拉去回来这批数据。即不是一个绝对最大值,受到 message.max.bytes (broker config) or max.message.bytes (topic) 影响。默认值 50m。
  • offset.topic.num.partitions: 偏移量消费主题分区数,默认 50.

问题:比如说我的模式是自动提交,自动提交间隔是 20 秒一次,那我消费了 10 个消息,很快一秒内就结束。但是这时候我自动提交时间还没到(那是不是意味着不会提交 offer),然后这时候我又去 poll 获取消息,会不会导致一直获取上一批的消息? 还是说如果 consumer 消费完了,自动提交时间还没到,如果你去 poll,这时候会自动提交,就不会出现重复消费的情况。

作者回复: 不会的。consumer 内部维护了一个指针,能够探测到下一条要消费的数据。
那到底 poll 的时候,获得了哪些数据?broker offset 后的数据?然后 consumer 再进行处理?

消费者事务

无论是自动提交还是手动提交,都可能出现消费完成,但是提交出现故障,从而导致重复消费的情况。

想要完成 kafka 消费者端的事务处理,需要将数据消费过程和偏移量提交过程进行原子性绑定,也就
是说数据处理完了,必须要保证偏移量正确提交,才可以做下一步的操作,如果偏移量提交失败,那么数据就恢复成处理之前的效果。

对于生产者事务而言,消费者消费的数据也会受到限制。默认情况下,消费者只能消费到生产者提交的数据,也就是未提交完成的数据,消费者是看不到的。如果想要消费到未提交的数据,需要更高消费事务隔离级别。默认使用 read_committed,也可以选择 read_uncommitted。

消费数据

05.Kafka Consumer-2.png