Kafka筆記整理(二):KafkaJavaAPI使用

[TOC]

北林網(wǎng)站建設公司創(chuàng)新互聯(lián)公司,北林網(wǎng)站設計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為北林上1000+提供企業(yè)網(wǎng)站建設服務。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設公司要多少錢,請找那個售后服務好的北林做網(wǎng)站的公司定做!


下面的測試代碼使用的都是下面的topic:

$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: hadoop   Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102
        Topic: hadoop   Partition: 1    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103
        Topic: hadoop   Partition: 2    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101

Kafka Java API之producer

關于producer API的使用說明,可以查看org.apache.kafka.clients.producer.KafkaProducer這個類的代碼注釋,有非常詳細的說明,下面就直接給出程序代碼及測試。

程序代碼

KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;

import com.uplooking.bigdata.kafka.constants.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;

/**
 * 通過這個KafkaProducerOps向Kafka topic中生產(chǎn)相關的數(shù)據(jù)
 * <p>
 * Producer
 */
public class KafkaProducerOps {
    public static void main(String[] args) throws IOException {
        /**
         * 專門加載配置文件
         * 配置文件的格式:
         * key=value
         *
         * 在代碼中要盡量減少硬編碼
         *  不要將代碼寫死,要可配置化
         */
        Properties properties = new Properties();
        InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
        properties.load(in);
        /**
         * 兩個泛型參數(shù)
         * 第一個泛型參數(shù):指的就是kafka中一條記錄key的類型
         * 第二個泛型參數(shù):指的就是kafka中一條記錄value的類型
         */
        String[] girls = new String[]{"姚慧瑩", "劉向前", "周  新", "楊柳"};
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
        String key = "1";
        String value = "今天的姑娘們很美";
        ProducerRecord<String, String> producerRecord =
                new ProducerRecord<String, String>(topic, key, value);
        producer.send(producerRecord);
        producer.close();
    }
}
Constants.java
package com.uplooking.bigdata.kafka.constants;

public interface Constants {
    /**
     * 生產(chǎn)的key對應的常量
     */
    String KAFKA_PRODUCER_TOPIC = "producer.topic";
}
producer.properties
############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

#####設置自定義的topic
producer.topic=hadoop

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

其實這個配置文件就是kafka conf目錄下的配置文件,只是這里要做相應的修改,關于每個字段的含義,可以查看org.apache.kafka.clients.producer.KafkaProducer這個類的代碼注釋。

測試

在終端中啟動消費者監(jiān)聽topic的消息:

[uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181

然后執(zhí)行生產(chǎn)者程序,再查看終端輸出:

[uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181 
今天的姑娘們很美

Kafka Java API之consumer

程序代碼

KafkaConsumerOps.java
package com.uplooking.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

public class KafkaConsumerOps {
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
        properties.load(in);
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        Collection<String> topics = Arrays.asList("hadoop");
        // 消費者訂閱topic
        consumer.subscribe(topics);
        ConsumerRecords<String, String> consumerRecords = null;
        while (true) {
            // 接下來就要從topic中拉取數(shù)據(jù)
            consumerRecords = consumer.poll(1000);
            // 遍歷每一條記錄
            for (ConsumerRecord consumerRecord : consumerRecords) {
                long offset = consumerRecord.offset();
                int partition = consumerRecord.partition();
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);
            }

        }
    }
}
consumer.properties
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181

bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

測試

先執(zhí)行消費者的代碼,然后再執(zhí)行生產(chǎn)者的代碼,在消費者終端可以看到如下輸出:

2   0   1   今天的姑娘們很美
(分別是:offset partition key value)

Kafka Java API之partition

可以通過自定義partitioner來決定我們的消息應該存到哪個partition上,只需要在我們的代碼上實現(xiàn)Partitioner接口即可。

程序代碼

MyKafkaPartitioner.java
package com.uplooking.bigdata.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Random;

/**
 * 創(chuàng)建自定義的分區(qū),根據(jù)數(shù)據(jù)的key來進行劃分
 * <p>
 * 可以根據(jù)key或者value的hashCode
 * 還可以根據(jù)自己業(yè)務上的定義將數(shù)據(jù)分散在不同的分區(qū)中
 * 需求:
 * 根據(jù)用戶輸入的key的hashCode值和partition個數(shù)求模
 */
public class MyKafkaPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {

    }

    /**
     * 根據(jù)給定的數(shù)據(jù)設置相關的分區(qū)
     *
     * @param topic      主題名稱
     * @param key        key
     * @param keyBytes   序列化之后的key
     * @param value      value
     * @param valueBytes 序列化之后的value
     * @param cluster    當前集群的元數(shù)據(jù)信息
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer partitionNums = cluster.partitionCountForTopic(topic);
        int targetPartition = -1;
        if (key == null || keyBytes == null) {
            targetPartition = new Random().nextInt(10000) % partitionNums;
        } else {
            int hashCode = key.hashCode();
            targetPartition = hashCode % partitionNums;
            System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
        }
        return targetPartition;
    }

    public void close() {
    }
}
KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;

import com.uplooking.bigdata.kafka.constants.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;

/**
 * 通過這個KafkaProducerOps向Kafka topic中生產(chǎn)相關的數(shù)據(jù)
 * <p>
 * Producer
 */
public class KafkaProducerOps {
    public static void main(String[] args) throws IOException {
        /**
         * 專門加載配置文件
         * 配置文件的格式:
         * key=value
         *
         * 在代碼中要盡量減少硬編碼
         *  不要將代碼寫死,要可配置化
         */
        Properties properties = new Properties();
        InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
        properties.load(in);
        /**
         * 兩個泛型參數(shù)
         * 第一個泛型參數(shù):指的就是kafka中一條記錄key的類型
         * 第二個泛型參數(shù):指的就是kafka中一條記錄value的類型
         */
        String[] girls = new String[]{"姚慧瑩", "劉向前", "周  新", "楊柳"};
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        Random random = new Random();
        int start = 1;
        for (int i = start; i <= start + 9; i++) {
            String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
            String key = i + "";
            String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~";
            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<String, String>(topic, key, value);
            producer.send(producerRecord);
        }
        producer.close();
    }
}

繼續(xù)使用前面的消費者的代碼,同時需要在producer.properties中指定我們定義的partitioner,如下:

partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner

測試

先執(zhí)行消費者代碼,然后再執(zhí)行生產(chǎn)者代碼,查看終端輸出。

生產(chǎn)者終端輸出(主要是自定義partitioner中的輸出):

key: 1, value: 今天的<--劉向前-->很美很美哦~, hashCode: 49, partition: 1
key: 2, value: 今天的<--楊柳-->很美很美哦~, hashCode: 50, partition: 2
key: 3, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 51, partition: 0
key: 4, value: 今天的<--周  新-->很美很美哦~, hashCode: 52, partition: 1
key: 5, value: 今天的<--劉向前-->很美很美哦~, hashCode: 53, partition: 2
key: 6, value: 今天的<--周  新-->很美很美哦~, hashCode: 54, partition: 0
key: 7, value: 今天的<--周  新-->很美很美哦~, hashCode: 55, partition: 1
key: 8, value: 今天的<--劉向前-->很美很美哦~, hashCode: 56, partition: 2
key: 9, value: 今天的<--楊柳-->很美很美哦~, hashCode: 57, partition: 0
key: 10, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1567, partition: 1

消費者終端輸出:

3   0   3   今天的<--姚慧瑩-->很美很美哦~
4   0   6   今天的<--周  新-->很美很美哦~
5   0   9   今天的<--楊柳-->很美很美哦~
0   2   2   今天的<--楊柳-->很美很美哦~
1   2   5   今天的<--劉向前-->很美很美哦~
2   2   8   今天的<--劉向前-->很美很美哦~
1   1   1   今天的<--劉向前-->很美很美哦~
2   1   4   今天的<--周  新-->很美很美哦~
3   1   7   今天的<--周  新-->很美很美哦~
4   1   10  今天的<--姚慧瑩-->很美很美哦~
(分別是:offset partition key value)

本文題目:Kafka筆記整理(二):KafkaJavaAPI使用
分享地址:http://muchs.cn/article6/pgocog.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護、營銷型網(wǎng)站建設、品牌網(wǎng)站設計、外貿(mào)建站做網(wǎng)站、建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化