如何解析Kafka 1.0.0 多消費(fèi)者示例,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專(zhuān)注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、小程序定制開(kāi)發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶(hù)創(chuàng)新互聯(lián)還提供了西寧免費(fèi)建站歡迎大家使用!
package kafka.demo; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; /** * * <p>Description: kafka 1.0.0</p> * @author guangshihao * @date 2018年9月19日 * */ public class KafkaProduderDemo { public static void main(String[] args) { Map<String,Object> props = new HashMap<>(); /* * acks,設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個(gè)值0,1,-1 * 0,意味著producer永遠(yuǎn)不會(huì)等待一個(gè)來(lái)自broker的ack,這就是0.7版本的行為。 * 這個(gè)選項(xiàng)提供了最低的延遲,但是持久化的保證是最弱的,當(dāng)server掛掉的時(shí)候會(huì)丟失一些數(shù)據(jù)。 * 1,意味著在leader replica已經(jīng)接收到數(shù)據(jù)后,producer會(huì)得到一個(gè)ack。 * 這個(gè)選項(xiàng)提供了更好的持久性,因?yàn)樵趕erver確認(rèn)請(qǐng)求成功處理后,client才會(huì)返回。 * 如果剛寫(xiě)到leader上,還沒(méi)來(lái)得及復(fù)制leader就掛了,那么消息才可能會(huì)丟失。 * -1,意味著在所有的ISR都接收到數(shù)據(jù)后,producer才得到一個(gè)ack。 * 這個(gè)選項(xiàng)提供了最好的持久性,只要還有一個(gè)replica存活,那么數(shù)據(jù)就不會(huì)丟失 */ props.put("acks", "1"); //配置默認(rèn)的分區(qū)方式 props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); //配置topic的序列化類(lèi) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //配置value的序列化類(lèi) props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* * kafka broker對(duì)應(yīng)的主機(jī),格式為host1:port1,host2:port2 */ props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); //topic String topic = "test7"; KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props); for(int i = 1 ;i <= 100 ; i++) { String line = i+" this is a test "; ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,line ); producer.send(record); } producer.close(); } }
package kafka.demo; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; public class MutilConsumerThread implements Runnable{ private AtomicBoolean closed = new AtomicBoolean(false); KafkaConsumer<String, String> consumer = null; String topic = null; public MutilConsumerThread(KafkaConsumer<String, String> consumer,List<String> topic) { this.consumer=consumer; consumer.subscribe(topic); } public void run() { try{ while(!closed.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for(ConsumerRecord<String, String> record: records) { //一組consumer的時(shí)候每個(gè)partition對(duì)應(yīng)的線程是固定的 System.out.println("Thread-Name:"+Thread.currentThread().getName()+" "+"partition:"+record.partition()+" "+record.value()); } } }catch(WakeupException e ) { if(!closed.get()) throw e; }finally { consumer.close(); } } public void shutdown() { closed.set(true); consumer.wakeup(); } }
package kafka.demo; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.kafka.clients.consumer.KafkaConsumer; public class MutiConsumerTest { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); props.put("group.id", "group_test7"); //配置topic的序列化類(lèi) props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置value的序列化類(lèi) props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //自動(dòng)同步offset props.put("enable.auto.commit","true"); //自動(dòng)同步offset的時(shí)間間隔 props.put("auto.commit.intervals.ms", "2000"); //當(dāng)在zookeeper中發(fā)現(xiàn)要消費(fèi)的topic沒(méi)有或者topic的offset不合法時(shí)自動(dòng)設(shè)置為最小值,可以設(shè)的值為 latest, earliest, none,默認(rèn)為largest props.put("auto.offset.reset", "earliest "); String topic = "test7"; List<MutilConsumerThread> consumers = new ArrayList<>(); ExecutorService es = Executors.newFixedThreadPool(3); for(int i = 0 ;i<=2;i++) { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); MutilConsumerThread cThread = new MutilConsumerThread(consumer,Arrays.asList(topic)); consumers.add(cThread); es.submit(cThread); } //Thread.sleep(1000L); /* 這個(gè)方法的意思就是在JVM中增加一個(gè)關(guān)閉的鉤子,當(dāng)JVM關(guān)閉的時(shí)候, 會(huì)執(zhí)行系統(tǒng)中已經(jīng)設(shè)置的所有通過(guò)方法addShutdownHook添加的鉤子,當(dāng)系統(tǒng)執(zhí)行完這些鉤子后, JVM才會(huì)關(guān)閉。所以這些鉤子可以在JVM關(guān)閉的時(shí)候進(jìn)行內(nèi)存清理、對(duì)象銷(xiāo)毀等操作。*/ Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for(MutilConsumerThread consumer :consumers ) { consumer.shutdown(); } } }); } }
看完上述內(nèi)容,你們掌握如何解析Kafka 1.0.0 多消費(fèi)者示例的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
分享標(biāo)題:如何解析Kafka1.0.0多消費(fèi)者示例
URL標(biāo)題:http://muchs.cn/article26/ghoicg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App設(shè)計(jì)、商城網(wǎng)站、網(wǎng)站策劃、外貿(mào)網(wǎng)站建設(shè)、ChatGPT、網(wǎng)站導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)