Kafka 拦截器分为生产者拦截器和消费者拦截器。
生产者拦截器:onSend 和 onAcknowledgement 方法。前者生产前调用;两者不同线程,线程安全;后者在关键路径,不能太重;后者早于 callback。
消费者拦截器:onConsume 和 onCommit 方法。前者消费前调用,后者提交后调用。
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景
生产者拦截器
public class KafkaInterceptorMock implements ProducerInterceptor<String, String> {
// 发送前的预处理
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
// 发送完,获取应答后的处理
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
// 关闭时执行
@Override
public void close() {
}
// 创建对象时执行
@Override
public void configure(Map<String, ?> configs) {
}
}
// KafkaProducer configMap
configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());如果 onSend 返回一个 null,会出现什么?
之前版本的逻辑是:Producer 发送的是被 ProducerInterceptor 修改后的消息,返回 null 也是一种修改的行为,所以 kafka 不应该对这种情况特殊对待。不过将 null 发送到服务端没有意义,实际执行会出现 NPE,不过异常最终会被捕获传递给 ProducerInterceptor 的 onAcknowledgement 方法。
不清楚现在版本的情况。
onSend:该方法会在消息发送之前被调用。
onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。
消费者拦截器
指定消费者拦截器也是同样的方法,只是具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口。这里面也有两个核心方法。
onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
使用场景
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。