這篇文章主要講解了“Storm排序怎么實(shí)現(xiàn)”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Storm排序怎么實(shí)現(xiàn)”吧!
創(chuàng)新互聯(lián)公司2013年開(kāi)創(chuàng)至今,是專(zhuān)業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元潤(rùn)州做網(wǎng)站,已為上家服務(wù),為潤(rùn)州各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18980820575
閱讀背景:
1 : 您需要對(duì)滑動(dòng)窗口要初步了解
2 : 您需要了解滑動(dòng)窗口在滑動(dòng)的過(guò)程之中,滑動(dòng)chunk的計(jì)算過(guò)程,尤其是每發(fā)射一次,就需要清空一次。
package com.cc.storm.bolt; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; /** * 1 在這里我們需要去實(shí)現(xiàn)一個(gè)滑動(dòng)窗口,請(qǐng)注意,在我們實(shí)現(xiàn)滑動(dòng)窗口的過(guò)程之中清空的是當(dāng)前滑動(dòng)窗口的下一個(gè) * * * * @author Yin Shuai * */ public class RollingCountBolt implements IRichBolt { private static final long serialVersionUID = 1765379339552134320L; private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>(); private int _numBuckets; private transient Thread cleaner; private OutputCollector _collector; /** * _trackMinute * 是我們整個(gè)滑動(dòng)窗口的大小,滑動(dòng)窗口的大小,本質(zhì)上決定了我們的時(shí)間區(qū)間,也就是說(shuō),假設(shè)我們目前滑動(dòng)窗口的總體大小為15分鐘。 * 那我們的商品點(diǎn)擊的實(shí)時(shí)排序的指標(biāo)值,好比商品瀏覽量的計(jì)算值,也就是15分鐘 * * 而單個(gè)窗口的大小也就是我,我們這個(gè)三十分鐘在隨著時(shí)間不斷的在推移 * * 舉例說(shuō)明:在最初的構(gòu)造過(guò)程之中,如果我們的桶的數(shù)目為10,那么單個(gè)窗口的時(shí)間長(zhǎng)度為3. * * [0,30],[3,33],[6,36],[9,39],[12,42] 統(tǒng)計(jì)的數(shù)值處在不斷的變化之中 * */ private int _trackMinutes; public RollingCountBolt(int numBuckets, int trackMinutes) { this._numBuckets = numBuckets; this._trackMinutes = trackMinutes; } public long totalObjects(Object obj) { long[] curr = _objectCounts.get(obj); long total = 0; for (long l : curr) { total += l; } return total; } public int currentBucket(int buckets) { return currentSecond() / secondsPerBucket(buckets) % buckets; } public int currentSecond() { return (int) (System.currentTimeMillis() / 1000); } /** * * @param buckets * 你設(shè)定的桶的數(shù)量 * @return 依據(jù)我們默認(rèn)的_trackMinutes / buckets 得到每一個(gè)桶的數(shù)量 */ public int secondsPerBucket(int buckets) { return _trackMinutes * 60 / buckets; } public long millisPerBucket(int buckets) { return (long) 1000 * secondsPerBucket(buckets); } @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub _collector = collector; cleaner = new Thread(new Runnable() { @SuppressWarnings("unchecked") @Override public void run() { // TODO Auto-generated method stub int lastBucket = currentBucket(_numBuckets); while (true) { int currBucket = currentBucket(_numBuckets); p("線程while循環(huán): 當(dāng)前的桶為:" + currBucket); if (currBucket != lastBucket) { p("線程while循環(huán):之前的桶數(shù)為:" + lastBucket); int bucketToWipe = (currBucket + 1) % _numBuckets; p("線程while循環(huán):要擦除掉的桶為:" + bucketToWipe); synchronized (_objectCounts) { Set objs = new HashSet(_objectCounts.keySet()); for (Object obj : objs) { long[] counts = _objectCounts.get(obj); long currBucketVal = counts[bucketToWipe]; p("線程while循環(huán):擦除掉的值為:" + currBucketVal); counts[bucketToWipe] = 0; long total = totalObjects(obj); if (currBucketVal != 0) { p("線程while循環(huán):擦除掉的值為不為0:那就發(fā)射數(shù)據(jù):obj total" + obj + ":" + total); _collector.emit(new Values(obj, total)); } if (total == 0) { p("線程while循環(huán): 總數(shù)為0以后,將obj對(duì)象刪除"); _objectCounts.remove(obj); } } } lastBucket = currBucket; } long delta = millisPerBucket(_numBuckets) - (System.currentTimeMillis() % millisPerBucket(_numBuckets)); Utils.sleep(delta); p("\n"); } } }); cleaner.start(); } @Override public void execute(Tuple input) { Object obj1 = input.getValue(0); Object obj = input.getValue(1); int currentBucket = currentBucket(_numBuckets); p("execute方法:當(dāng)前桶:bucket: " + currentBucket); synchronized (_objectCounts) { long[] curr = _objectCounts.get(obj); if (curr == null) { curr = new long[_numBuckets]; _objectCounts.put(obj, curr); } curr[currentBucket]++; System.err .print(("execute方法:接受到的merchandiseIDS:" + obj.toString() + ",long數(shù)組:")); for (long number : curr) { System.err.print(number + ":"); } p("execute方法:發(fā)射的數(shù)據(jù): " + obj + ":" + totalObjects(obj)); /** * 我們不斷的發(fā)射的也就是我們某一個(gè)商品id,在當(dāng)前滑動(dòng)窗口,也就是我們的時(shí)間周期內(nèi)的指標(biāo)計(jì)算值 * 要注意,在排序的過(guò)程之中,我們只針對(duì)key, 也就是我們的商品id,由此發(fā)射給后續(xù)的排序bolt依據(jù)包含了時(shí)間區(qū)間的信息 */ // 每來(lái)一條數(shù)據(jù),就會(huì)發(fā)射一次 _collector.emit(new Values(obj, totalObjects(obj))); _collector.ack(input); } p("\n"); } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("merchandiseID", "count")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void p(Object o) { System.err.println(o.toString()); } }
在這里,最需要我們關(guān)注的地方是,滑動(dòng)窗口每滑動(dòng)一次,將情況一組數(shù)據(jù)。 而發(fā)射數(shù)據(jù)的過(guò)程之中將統(tǒng)計(jì)這一組數(shù)
據(jù)。
感謝各位的閱讀,以上就是“Storm排序怎么實(shí)現(xiàn)”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Storm排序怎么實(shí)現(xiàn)這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
網(wǎng)頁(yè)標(biāo)題:Storm排序怎么實(shí)現(xiàn)
本文來(lái)源:http://muchs.cn/article42/gesshc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、定制網(wǎng)站、自適應(yīng)網(wǎng)站、Google、電子商務(wù)、響應(yīng)式網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)