生产消息时的基本步骤

配置 Producer 属性

Java Kafka Producer 需要配置的属性信息

  • boostrap.servers: 集群地址
  • key.serializer
  • value.servializer
  • interceptor.classes: 拦截器
  • batch.size: 数据批次字节大小。此大小会和数据最大估计值进行比较,取大值
  • retries
  • request.timeout.ms
  • linger.ms: 数据批次在缓冲区中停留时间
  • acks: 请求应答类型:all(-1), 0, 1
  • retry.backoff.ms: 两次重试之间的时间间隔
  • buffer.memory: 数据收集器缓冲区内存大小
  • max.in.flight.requests.per.connection: 每个节点连接的最大同时处理请求的数量
  • enable.idempotnece: 幂等性
  • partitioner.ignore.keys: 是否放弃使用数据 key 选择分区
  • partitioner.class: 分区器类名

Python 中对于 Kafka 有两个库可以使用,分别是 kafka-python 和 confluent-kafka-python。

kafka-python

  • 易用性kafka-python  是一个简单易用的库,适合用于快速构建与 Kafka 交互的应用程序。
  • 特性: 它提供了基本的 Kafka 生产者和消费者,适用于一般的 Kafka 使用场景。
  • 社区支持: 由 Python 社区维护,受到广泛关注和使用。

confluent-kafka-python

  • 性能与扩展性confluent-kafka-python  是  librdkafka  的 Python 封装,性能更优,并且提供了更多的高级特性和配置选项,适合于在大规模或对性能有较高要求的生产环境中使用。
  • 特性: 支持更丰富的配置选项、拦截器、事务等 Kafka 的高级特性。
  • 商业支持: 由 Confluent 公司开发和维护,提供商业支持,适用于企业级应用场景。

创建待发送数据

生成 ProducerRecord 对象。

从创建到发送的深层流程

Kafka Producer:拦截器 KV 序列化 计算分区 追加到数据收集器

Record Accumulator: 基于 batch.size 进行数据缓冲,批次和分区是绑定的。若当前批次能容纳数据,那么数据将追加到批次中,如果不能则旧批次关闭接受、等待发送,新的批次放入到当前分区的批次队列 Deque 中。

Sender:线程对象,从 RecordAccumulator 获取数据,向服务节点发送。启动后不断轮询获取已关闭的批次数据,对批次进行整合后再发送到 Broker 节点中。

基本代码

Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
        "test", "key1", "value1"
);
producer.send(record);
producer.close();

发送消息时的更多配置

拦截器

怎么使用 Kafka 拦截器?应用场景?

回调方法

producer.send(record, new Callback() { // TODO 回调对象
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        System.out.println("数据发送成功:" + recordMetadata.timestamp());
    }
});

异步发送

默认的是异步发送。

同步发送

同步发送就是调用 get 方法,因为 send 函数返回的是 Future 对象,通过 get 来进行同步。

producer.send(record, new Callback() { // TODO 回调对象
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        System.out.println("数据发送成功:" + recordMetadata.timestamp());
    }
}).get();