springbootkafka配置與使用-創(chuàng)新互聯(lián)

springboot kafka配置與使用 引入spring-kafka依賴
org.springframework.kafkaspring-kafka
application配置

可以根據(jù)情況只配置生產(chǎn)著或消費者

成都創(chuàng)新互聯(lián)公司主營淮北網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app軟件定制開發(fā),淮北h5微信小程序搭建,淮北網(wǎng)站營銷推廣歡迎淮北等地區(qū)企業(yè)咨詢
spring:
  kafka:
    # 以逗號分隔的地址列表,用于建立與 Kafka 集群的初始連接 (kafka 默認(rèn)的端口號為 9092)
    bootstrap-servers: ip:port,ip:port,ip:port
    # 生產(chǎn)者配置
    producer:
      # 消息重發(fā)的次數(shù)
      retries: 0
      # 一個批次可以使用的內(nèi)存大小
      batch-size: 16384
      # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小
      buffer-memory: 33444432
      # 鍵的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      acks: all

    # 消費者配置
    consumer:
      max:
        poll:
          records: 1
      broker-id: 0
      # 自動提交的時間間隔 在 spring boot 2.X 版本中這里采用的是值的類型為 Duration 需要符合特定的格式,如 1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
      # latest(默認(rèn)值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)
      # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄
      auto-offset-reset: earliest
      # 是否自動提交偏移量,默認(rèn)值是 true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為 false,然后手動提交偏移量
      enable-auto-commit: false
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      group-id: capb_group,可以消費的時候再指定
    listener:
      # 在偵聽器容器中運行的線程數(shù)。
      concurrency: 3
      # 手工ack,調(diào)用ack后立刻提交offset
      ack-mode: manual_immediate
向topic發(fā)送數(shù)據(jù)
private static final String TOPIC_NAME = "my_topic_name";

@Test
public void testSendKafka() {kafkaTemplate.send(TOPIC_NAME, "{\"json\":\"ok\"}").addCallback(success ->{String topic = success.getRecordMetadata().topic();
        int partition = success.getRecordMetadata().partition();
        long offset = success.getRecordMetadata().offset();
        System.out.println("發(fā)送成功:topic=" + topic + ", partition=" + partition + ", offset=" + offset);
    }, failue ->{System.out.println("發(fā)送消息失敗:" + failue.getMessage());
    });
}
KafkaListener消費topic數(shù)據(jù)

每個groupId都可以完整消費指定topic的所有數(shù)據(jù),要想重新消費所有數(shù)據(jù)可以更換groupid組

@KafkaListener(topics = {"my_topic_name"}, groupId = "my_groupId")
public void onMessage(ConsumerRecordrecord, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info("消費消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    System.out.println(JSON.parseObject(record.value().toString(), KafkaEvent.class));
    // 消費完后手動通知已消費,在上面有配置
    ack.acknowledge();
}

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

分享題目:springbootkafka配置與使用-創(chuàng)新互聯(lián)
文章源于:http://muchs.cn/article8/cddiop.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站企業(yè)建站、企業(yè)網(wǎng)站制作全網(wǎng)營銷推廣、營銷型網(wǎng)站建設(shè)商城網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司