生产消息时的基本步骤
配置 Producer 属性
Java Kafka Producer 需要配置的属性信息
- boostrap.servers: 集群地址
- key.serializer
- value.servializer
- interceptor.classes: 拦截器
- batch.size: 数据批次字节大小。此大小会和数据最大估计值进行比较,取大值
- retries
- request.timeout.ms
- linger.ms: 数据批次在缓冲区中停留时间
- acks: 请求应答类型:all(-1), 0, 1
- retry.backoff.ms: 两次重试之间的时间间隔
- buffer.memory: 数据收集器缓冲区内存大小
- max.in.flight.requests.per.connection: 每个节点连接的最大同时处理请求的数量
- enable.idempotnece: 幂等性
- partitioner.ignore.keys: 是否放弃使用数据 key 选择分区
- partitioner.class: 分区器类名
Python 中对于 Kafka 有两个库可以使用,分别是 kafka-python 和 confluent-kafka-python。
kafka-python
- 易用性:
kafka-python是一个简单易用的库,适合用于快速构建与 Kafka 交互的应用程序。 - 特性: 它提供了基本的 Kafka 生产者和消费者,适用于一般的 Kafka 使用场景。
- 社区支持: 由 Python 社区维护,受到广泛关注和使用。
confluent-kafka-python
- 性能与扩展性:
confluent-kafka-python是librdkafka的 Python 封装,性能更优,并且提供了更多的高级特性和配置选项,适合于在大规模或对性能有较高要求的生产环境中使用。 - 特性: 支持更丰富的配置选项、拦截器、事务等 Kafka 的高级特性。
- 商业支持: 由 Confluent 公司开发和维护,提供商业支持,适用于企业级应用场景。
创建待发送数据
生成 ProducerRecord 对象。
从创建到发送的深层流程
Kafka Producer:拦截器 → KV 序列化 → 计算分区 → 追加到数据收集器
Record Accumulator: 基于 batch.size 进行数据缓冲,批次和分区是绑定的。若当前批次能容纳数据,那么数据将追加到批次中,如果不能则旧批次关闭接受、等待发送,新的批次放入到当前分区的批次队列 Deque 中。
Sender:线程对象,从 RecordAccumulator 获取数据,向服务节点发送。启动后不断轮询获取已关闭的批次数据,对批次进行整合后再发送到 Broker 节点中。
基本代码
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configMap.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test", "key1", "value1"
);
producer.send(record);
producer.close();发送消息时的更多配置
拦截器
回调方法
producer.send(record, new Callback() { // TODO 回调对象
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("数据发送成功:" + recordMetadata.timestamp());
}
});异步发送
默认的是异步发送。
同步发送
同步发送就是调用 get 方法,因为 send 函数返回的是 Future 对象,通过 get 来进行同步。
producer.send(record, new Callback() { // TODO 回调对象
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("数据发送成功:" + recordMetadata.timestamp());
}
}).get();