本篇內(nèi)容介紹了“Storm MongoDB接口怎么使用”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比廣德網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式廣德網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋廣德地區(qū)。費(fèi)用合理售后完善,10多年實(shí)體公司更值得信賴。
整體的Storn接口分為以下的幾個(gè)class
1:MongoBolt.java
2 : MongoSpout.java
3 : MongoTailableCursorTopology.java
4 : SimpleMongoBolt.java
看代碼說話:
1
package storm.mongo; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import com.mongodb.DB; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoException; import com.mongodb.WriteConcern; /** * * 注意在這里,沒有實(shí)現(xiàn)批處理的調(diào)用,并且只是一個(gè)抽象類,對于Mongo的Storm交互做了一次封裝 * * @author Adrian Petrescu <apetresc@gmail.com> * */ public abstract class MongoBolt extends BaseRichBolt { private OutputCollector collector; // MOngDB的DB對象 private DB mongoDB; //記錄我們的主機(jī),端口,和MongoDB的數(shù)據(jù)DB民粹 private final String mongoHost; private final int mongoPort; private final String mongoDbName; /** * @param mongoHost The host on which Mongo is running. * @param mongoPort The port on which Mongo is running. * @param mongoDbName The Mongo database containing all collections being * written to. */ protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; } @Override public void prepare( @SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try { //prepare方法目前在初始化的過程之中得到了一個(gè)Mongo的連接 this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } } @Override public void execute(Tuple input) { //注意我們在這里還有一個(gè)判斷,判斷當(dāng)前是否該發(fā)射 if (shouldActOnInput(input)) { String collectionName = getMongoCollectionForInput(input); DBObject dbObject = getDBObjectForInput(input); if (dbObject != null) { try { mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1)); collector.ack(input); } catch (MongoException me) { collector.fail(input); } } } else { collector.ack(input); } } /** * Decide whether or not this input tuple should trigger a Mongo write. * * @param input the input tuple under consideration * @return {@code true} iff this input tuple should trigger a Mongo write */ public abstract boolean shouldActOnInput(Tuple input); /** * Returns the Mongo collection which the input tuple should be written to. * * @param input the input tuple under consideration * @return the Mongo collection which the input tuple should be written to */ public abstract String getMongoCollectionForInput(Tuple input); /** * Returns the DBObject to store in Mongo for the specified input tuple. * 拿到DBObject的一個(gè)抽象類 * @param input the input tuple under consideration * @return the DBObject to be written to Mongo */ public abstract DBObject getDBObjectForInput(Tuple input); //注意這里隨著計(jì)算的終結(jié)被關(guān)閉了。 @Override public void cleanup() { this.mongoDB.getMongo().close(); } }
2 :
package storm.mongo; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.utils.Utils; import com.mongodb.BasicDBObject; import com.mongodb.Bytes; import com.mongodb.DB; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoException; /** * A Spout which consumes documents from a Mongodb tailable cursor. * * Subclasses should simply override two methods: * <ul> * <li>{@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields} * <li>{@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns * a Mongo document into a Storm tuple matching the declared output fields. * </ul> * ** <p> * <b>WARNING:</b> You can only use tailable cursors on capped collections. * * @author Dan Beaulieu <danjacob.beaulieu@gmail.com> * */ // 在這里,抽象的過程中,依舊保持了第一層的Spout為一個(gè)抽象類,MongoSpout為abstract的一個(gè)抽象類,子類在繼承這// 個(gè)類的過程之中實(shí)現(xiàn)特定的方法即可 // 這里還有一個(gè)類似Cursor的操作。 public abstract class MongoSpout extends BaseRichSpout { private SpoutOutputCollector collector; private LinkedBlockingQueue<DBObject> queue; private final AtomicBoolean opened = new AtomicBoolean(false); private DB mongoDB; private final DBObject query; private final String mongoHost; private final int mongoPort; private final String mongoDbName; private final String mongoCollectionName; public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) { this.mongoHost = mongoHost; this.mongoPort = mongoPort; this.mongoDbName = mongoDbName; this.mongoCollectionName = mongoCollectionName; this.query = query; } class TailableCursorThread extends Thread { // 內(nèi)部類 TailableCursorThread線程 //注意在其中我們使用了LinkedBlockingQueue的對象,有關(guān)java高并發(fā)的集合類,請參考本ID的【Java集合類型的博文】博文。 LinkedBlockingQueue<DBObject> queue; String mongoCollectionName; DB mongoDB; DBObject query; public TailableCursorThread(LinkedBlockingQueue<DBObject> queue, DB mongoDB, String mongoCollectionName, DBObject query) { this.queue = queue; this.mongoDB = mongoDB; this.mongoCollectionName = mongoCollectionName; this.query = query; } public void run() { while(opened.get()) { try { // create the cursor mongoDB.requestStart(); final DBCursor cursor = mongoDB.getCollection(mongoCollectionName) .find(query) .sort(new BasicDBObject("$natural", 1)) .addOption(Bytes.QUERYOPTION_TAILABLE) .addOption(Bytes.QUERYOPTION_AWAITDATA); try { while (opened.get() && cursor.hasNext()) { final DBObject doc = cursor.next(); if (doc == null) break; queue.put(doc); } } finally { try { if (cursor != null) cursor.close(); } catch (final Throwable t) { } try { mongoDB.requestDone(); } catch (final Throwable t) { } } Utils.sleep(500); } catch (final MongoException.CursorNotFound cnf) { // rethrow only if something went wrong while we expect the cursor to be open. if (opened.get()) { throw cnf; } } catch (InterruptedException e) { break; } } }; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.queue = new LinkedBlockingQueue<DBObject>(1000); try { this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName); } catch (Exception e) { throw new RuntimeException(e); } TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query); this.opened.set(true); listener.start(); } @Override public void close() { this.opened.set(false); } @Override public void nextTuple() { DBObject dbo = this.queue.poll(); if(dbo == null) { Utils.sleep(50); } else { this.collector.emit(dbObjectToStormTuple(dbo)); } } @Override public void ack(Object msgId) { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub } public abstract List<Object> dbObjectToStormTuple(DBObject message); }
“Storm MongoDB接口怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
分享題目:StormMongoDB接口怎么使用
網(wǎng)站地址:http://muchs.cn/article26/gphccg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開發(fā)、App設(shè)計(jì)、靜態(tài)網(wǎng)站、關(guān)鍵詞優(yōu)化、服務(wù)器托管、網(wǎng)站策劃
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)