KafkaProducer攔截器

Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來對消息進行攔截或者修改,也可以用于Producer的Callback回調(diào)之前進行相應(yīng)的預(yù)處理。

網(wǎng)站的建設(shè)成都創(chuàng)新互聯(lián)專注網(wǎng)站定制,經(jīng)驗豐富,不做模板,主營網(wǎng)站定制開發(fā).小程序定制開發(fā),H5頁面制作!給你煥然一新的設(shè)計體驗!已為混凝土攪拌罐等企業(yè)提供專業(yè)服務(wù)。

使用Kafka Producer端的攔截器非常簡單,主要是實現(xiàn)ProducerInterceptor接口,此接口包含4個方法:

    1. ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在將消息序列化和分配分區(qū)之前會調(diào)用攔截器的這個方法來對消息進行相應(yīng)的操作。一般來說最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對其有準(zhǔn)確的判斷,否則會與預(yù)想的效果出現(xiàn)偏差。比如修改key不僅會影響分區(qū)的計算,同樣也會影響B(tài)roker端日志壓縮(Log Compaction)的功能。
    1. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應(yīng)答(Acknowledgement)之前或者消息發(fā)送失敗時調(diào)用,優(yōu)先于用戶設(shè)定的Callback之前執(zhí)行。這個方法運行在Producer的IO線程中,所以這個方法里實現(xiàn)的代碼邏輯越簡單越好,否則會影響消息的發(fā)送速率。
    1. void close():關(guān)閉當(dāng)前的攔截器,此方法主要用于執(zhí)行一些資源的清理工作。
    1. configure(Map<String, ?> configs):用來初始化此類的方法,這個是ProducerInterceptor接口的父接口Configurable中的方法。

一般情況下只需要關(guān)注并實現(xiàn)onSend或者onAcknowledgement方法即可。下面我們來舉個案例,通過onSend方法來過濾消息體為空的消息以及通過onAcknowledgement方法來計算發(fā)送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        if(record.value().length()<=0)
            return null;
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        double succe***atio = (double)sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 發(fā)送成功率="+String.format("%f", succe***atio * 100)+"%");
    }

    @Override
    public void configure(Map<String, ?> configs) {}
}

自定義的ProducerInterceptorDemo類實現(xiàn)之后就可以在Kafka Producer的主程序中指定,示例代碼如下:

public class ProducerMain {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");

        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        for(int i=0;i<100;i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);
            producer.send(producerRecord).get();
        }
        producer.close();
    }
}

Kafka Producer不僅可以指定一個攔截器,還可以指定多個攔截器以形成攔截鏈,這個攔截鏈會按照其中的攔截器的加入順序一一執(zhí)行。比如上面的程序多添加一個攔截器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1

這樣Kafka Producer會先執(zhí)行攔截器ProducerInterceptorDemo,之后再執(zhí)行ProducerInterceptorDemoPlus。

有關(guān)interceptor.classes參數(shù),在kafka 1.0.0版本中的定義如下:

NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESIMPORTANCE
interceptor.calssses A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. list null low

本文的重點是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹(jǐn)記這一點。同時我經(jīng)過多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個知識點高級進階干貨,希望對想成為架構(gòu)師的朋友有一定的參考和幫助

需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費獲取

Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器

當(dāng)前題目:KafkaProducer攔截器
標(biāo)題鏈接:http://muchs.cn/article0/ihjeoo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、軟件開發(fā)、網(wǎng)站策劃、虛擬主機、網(wǎng)站收錄、電子商務(wù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設(shè)計公司