KafkaProducer攔截器-創(chuàng)新互聯(lián)

Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個(gè)功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來(lái)對(duì)消息進(jìn)行攔截或者修改,也可以用于Producer的Callback回調(diào)之前進(jìn)行相應(yīng)的預(yù)處理。

成都創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括溫嶺網(wǎng)站建設(shè)、溫嶺網(wǎng)站制作、溫嶺網(wǎng)頁(yè)制作以及溫嶺網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,溫嶺網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到溫嶺省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

使用Kafka Producer端的攔截器非常簡(jiǎn)單,主要是實(shí)現(xiàn)ProducerInterceptor接口,此接口包含4個(gè)方法:

    1. ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在將消息序列化和分配分區(qū)之前會(huì)調(diào)用攔截器的這個(gè)方法來(lái)對(duì)消息進(jìn)行相應(yīng)的操作。一般來(lái)說(shuō)最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對(duì)其有準(zhǔn)確的判斷,否則會(huì)與預(yù)想的效果出現(xiàn)偏差。比如修改key不僅會(huì)影響分區(qū)的計(jì)算,同樣也會(huì)影響B(tài)roker端日志壓縮(Log Compaction)的功能。
    1. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應(yīng)答(Acknowledgement)之前或者消息發(fā)送失敗時(shí)調(diào)用,優(yōu)先于用戶設(shè)定的Callback之前執(zhí)行。這個(gè)方法運(yùn)行在Producer的IO線程中,所以這個(gè)方法里實(shí)現(xiàn)的代碼邏輯越簡(jiǎn)單越好,否則會(huì)影響消息的發(fā)送速率。
    1. void close():關(guān)閉當(dāng)前的攔截器,此方法主要用于執(zhí)行一些資源的清理工作。
    1. configure(Map<String, ?> configs):用來(lái)初始化此類的方法,這個(gè)是ProducerInterceptor接口的父接口Configurable中的方法。

一般情況下只需要關(guān)注并實(shí)現(xiàn)onSend或者onAcknowledgement方法即可。下面我們來(lái)舉個(gè)案例,通過(guò)onSend方法來(lái)過(guò)濾消息體為空的消息以及通過(guò)onAcknowledgement方法來(lái)計(jì)算發(fā)送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        if(record.value().length()<=0)
            return null;
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        double succe***atio = (double)sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 發(fā)送成功率="+String.format("%f", succe***atio * 100)+"%");
    }

    @Override
    public void configure(Map<String, ?> configs) {}
}

自定義的ProducerInterceptorDemo類實(shí)現(xiàn)之后就可以在Kafka Producer的主程序中指定,示例代碼如下:

public class ProducerMain {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");

        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        for(int i=0;i<100;i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);
            producer.send(producerRecord).get();
        }
        producer.close();
    }
}

Kafka Producer不僅可以指定一個(gè)攔截器,還可以指定多個(gè)攔截器以形成攔截鏈,這個(gè)攔截鏈會(huì)按照其中的攔截器的加入順序一一執(zhí)行。比如上面的程序多添加一個(gè)攔截器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1

這樣Kafka Producer會(huì)先執(zhí)行攔截器ProducerInterceptorDemo,之后再執(zhí)行ProducerInterceptorDemoPlus。

有關(guān)interceptor.classes參數(shù),在kafka 1.0.0版本中的定義如下:

NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESIMPORTANCE
interceptor.calsssesA list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors.listnulllow

本文的重點(diǎn)是你有沒(méi)有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過(guò)多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助

需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器

創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國(guó)云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開(kāi)啟,新人活動(dòng)云服務(wù)器買多久送多久。

名稱欄目:KafkaProducer攔截器-創(chuàng)新互聯(lián)
本文地址:http://muchs.cn/article4/ceegoe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、網(wǎng)站維護(hù)網(wǎng)站內(nèi)鏈、網(wǎng)站收錄、網(wǎng)站改版、網(wǎng)站設(shè)計(jì)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作