Storm排序怎么實(shí)現(xiàn)

這篇文章主要講解了“Storm排序怎么實(shí)現(xiàn)”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Storm排序怎么實(shí)現(xiàn)”吧!

創(chuàng)新互聯(lián)公司2013年開(kāi)創(chuàng)至今,是專(zhuān)業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元潤(rùn)州做網(wǎng)站,已為上家服務(wù),為潤(rùn)州各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18980820575

閱讀背景:

   1 : 您需要對(duì)滑動(dòng)窗口要初步了解

   2  :   您需要了解滑動(dòng)窗口在滑動(dòng)的過(guò)程之中,滑動(dòng)chunk的計(jì)算過(guò)程,尤其是每發(fā)射一次,就需要清空一次。

package com.cc.storm.bolt;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * 1 在這里我們需要去實(shí)現(xiàn)一個(gè)滑動(dòng)窗口,請(qǐng)注意,在我們實(shí)現(xiàn)滑動(dòng)窗口的過(guò)程之中清空的是當(dāng)前滑動(dòng)窗口的下一個(gè)
 * 
 * 
 * 
 * @author Yin Shuai
 * 
 */
public class RollingCountBolt implements IRichBolt {

	private static final long serialVersionUID = 1765379339552134320L;

	private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>();
	private int _numBuckets;
	private transient Thread cleaner;
	private OutputCollector _collector;

	/**
	 * _trackMinute
	 * 是我們整個(gè)滑動(dòng)窗口的大小,滑動(dòng)窗口的大小,本質(zhì)上決定了我們的時(shí)間區(qū)間,也就是說(shuō),假設(shè)我們目前滑動(dòng)窗口的總體大小為15分鐘。
	 * 那我們的商品點(diǎn)擊的實(shí)時(shí)排序的指標(biāo)值,好比商品瀏覽量的計(jì)算值,也就是15分鐘
	 * 
	 * 而單個(gè)窗口的大小也就是我,我們這個(gè)三十分鐘在隨著時(shí)間不斷的在推移
	 * 
	 * 舉例說(shuō)明:在最初的構(gòu)造過(guò)程之中,如果我們的桶的數(shù)目為10,那么單個(gè)窗口的時(shí)間長(zhǎng)度為3.
	 * 
	 * [0,30],[3,33],[6,36],[9,39],[12,42] 統(tǒng)計(jì)的數(shù)值處在不斷的變化之中
	 * 
	 */
	private int _trackMinutes;

	public RollingCountBolt(int numBuckets, int trackMinutes) {
		this._numBuckets = numBuckets;
		this._trackMinutes = trackMinutes;
	}

	public long totalObjects(Object obj) {
		long[] curr = _objectCounts.get(obj);
		long total = 0;
		for (long l : curr) {
			total += l;
		}
		return total;
	}

	public int currentBucket(int buckets) {
		return currentSecond() / secondsPerBucket(buckets) % buckets;
	}

	public int currentSecond() {
		return (int) (System.currentTimeMillis() / 1000);
	}

	/**
	 * 
	 * @param buckets
	 *            你設(shè)定的桶的數(shù)量
	 * @return 依據(jù)我們默認(rèn)的_trackMinutes / buckets 得到每一個(gè)桶的數(shù)量
	 */
	public int secondsPerBucket(int buckets) {
		return _trackMinutes * 60 / buckets;
	}

	public long millisPerBucket(int buckets) {
		return (long) 1000 * secondsPerBucket(buckets);
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		// TODO Auto-generated method stub
		_collector = collector;
		cleaner = new Thread(new Runnable() {

			@SuppressWarnings("unchecked")
			@Override
			public void run() {
				// TODO Auto-generated method stub
				int lastBucket = currentBucket(_numBuckets);

				while (true) {

					int currBucket = currentBucket(_numBuckets);
					p("線程while循環(huán): 當(dāng)前的桶為:" + currBucket);

					if (currBucket != lastBucket) {
						p("線程while循環(huán):之前的桶數(shù)為:" + lastBucket);

						int bucketToWipe = (currBucket + 1) % _numBuckets;
						p("線程while循環(huán):要擦除掉的桶為:" + bucketToWipe);

						synchronized (_objectCounts) {
							Set objs = new HashSet(_objectCounts.keySet());

							for (Object obj : objs) {

								long[] counts = _objectCounts.get(obj);
								long currBucketVal = counts[bucketToWipe];
								p("線程while循環(huán):擦除掉的值為:" + currBucketVal);
								counts[bucketToWipe] = 0;
								long total = totalObjects(obj);
								if (currBucketVal != 0) {
									p("線程while循環(huán):擦除掉的值為不為0:那就發(fā)射數(shù)據(jù):obj total"
											+ obj + ":" + total);
									_collector.emit(new Values(obj, total));

								}
								if (total == 0) {

									p("線程while循環(huán): 總數(shù)為0以后,將obj對(duì)象刪除");
									_objectCounts.remove(obj);

								}
							}
						}
						lastBucket = currBucket;
					}

					long delta = millisPerBucket(_numBuckets)
							- (System.currentTimeMillis() % millisPerBucket(_numBuckets));
					Utils.sleep(delta);

					p("\n");
				}
			}
		});
		cleaner.start();
	}

	@Override
	public void execute(Tuple input) {

		Object obj1 = input.getValue(0);
		Object obj = input.getValue(1);

		int currentBucket = currentBucket(_numBuckets);

		p("execute方法:當(dāng)前桶:bucket: " + currentBucket);

		synchronized (_objectCounts) {

			long[] curr = _objectCounts.get(obj);

			if (curr == null) {
				curr = new long[_numBuckets];
				_objectCounts.put(obj, curr);
			}

			curr[currentBucket]++;

			System.err
					.print(("execute方法:接受到的merchandiseIDS:" + obj.toString() + ",long數(shù)組:"));

			for (long number : curr) {
				System.err.print(number + ":");
			}

			p("execute方法:發(fā)射的數(shù)據(jù): " + obj + ":" + totalObjects(obj));

			/**
			 * 我們不斷的發(fā)射的也就是我們某一個(gè)商品id,在當(dāng)前滑動(dòng)窗口,也就是我們的時(shí)間周期內(nèi)的指標(biāo)計(jì)算值
			 * 要注意,在排序的過(guò)程之中,我們只針對(duì)key, 也就是我們的商品id,由此發(fā)射給后續(xù)的排序bolt依據(jù)包含了時(shí)間區(qū)間的信息
			 */

			// 每來(lái)一條數(shù)據(jù),就會(huì)發(fā)射一次
			_collector.emit(new Values(obj, totalObjects(obj)));
			_collector.ack(input);
		}

		p("\n");
	}

	@Override
	public void cleanup() {
		// TODO Auto-generated method stub

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("merchandiseID", "count"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void p(Object o) {
		System.err.println(o.toString());
	}
}

  在這里,最需要我們關(guān)注的地方是,滑動(dòng)窗口每滑動(dòng)一次,將情況一組數(shù)據(jù)。 而發(fā)射數(shù)據(jù)的過(guò)程之中將統(tǒng)計(jì)這一組數(shù)

據(jù)。

感謝各位的閱讀,以上就是“Storm排序怎么實(shí)現(xiàn)”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Storm排序怎么實(shí)現(xiàn)這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

網(wǎng)頁(yè)標(biāo)題:Storm排序怎么實(shí)現(xiàn)
本文來(lái)源:http://muchs.cn/article42/gesshc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、定制網(wǎng)站、自適應(yīng)網(wǎng)站、Google、電子商務(wù)響應(yīng)式網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司