如果還沒(méi)看過(guò)Flume-ng源碼解析之啟動(dòng)流程,可以點(diǎn)擊Flume-ng源碼解析之啟動(dòng)流程 查看
創(chuàng)新互聯(lián)專注于敦化企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,商城網(wǎng)站建設(shè)。敦化網(wǎng)站建設(shè)公司,為敦化等地區(qū)提供建站服務(wù)。全流程按需網(wǎng)站制作,專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
組件的分析順序是按照上一篇中啟動(dòng)順序來(lái)分析的,首先是Channel,然后是Sink,最后是Source,在開始看組件源碼之前我們先來(lái)看一下兩個(gè)重要的接口,一個(gè)是LifecycleAware ,另一個(gè)是NamedComponent
@InterfaceAudience.Public@InterfaceStability.Stablepublic interface LifecycleAware { public void start(); public void stop(); public LifecycleState getLifecycleState(); }
非常簡(jiǎn)單就是三個(gè)方法,start()、stop()和getLifecycleState,這個(gè)接口是flume好多類都要實(shí)現(xiàn)的接口,包括Flume-ng源碼解析之啟動(dòng)流程
所中提到PollingPropertiesFileConfigurationProvider(),只要涉及到生命周期的都會(huì)實(shí)現(xiàn)該接口,當(dāng)然組件們也是要實(shí)現(xiàn)的!
@InterfaceAudience.Public@InterfaceStability.Stablepublic interface NamedComponent { public void setName(String name); public String getName(); }
這個(gè)沒(méi)什么好講的,就是用來(lái)設(shè)置名字的。
作為Flume三大核心組件之一的Channel,我們有必要來(lái)看看它的構(gòu)成:
@InterfaceAudience.Public@InterfaceStability.Stablepublic interface Channel extends LifecycleAware, NamedComponent { public void put(Event event) throws ChannelException; public Event take() throws ChannelException; public Transaction getTransaction(); }
那么從上面的接口中我們可以看到Channel的主要功能就是put()和take(),那么我們就來(lái)看一下它的具體實(shí)現(xiàn)。這里我們選擇MemoryChannel作為例子,但是MemoryChannel太長(zhǎng)了,我們就截取一小段來(lái)看看
public class MemoryChannel extends BasicChannelSemantics { private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class); private static final Integer defaultCapacity = Integer.valueOf(100); private static final Integer defaultTransCapacity = Integer.valueOf(100); public MemoryChannel() { } ... }
我們又看到它繼承了BasicChannelSemantics ,從名字我們可以看出它是一個(gè)基礎(chǔ)的Channel,我們繼續(xù)看看看它的實(shí)現(xiàn)
@InterfaceAudience.Public@InterfaceStability.Stablepublic abstract class BasicChannelSemantics extends AbstractChannel { private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>(); private boolean initialized = false; protected void initialize() {} protected abstract BasicTransactionSemantics createTransaction(); @Override public void put(Event event) throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); transaction.put(event); } @Override public Event take() throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); return transaction.take(); } @Override public Transaction getTransaction() { if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } BasicTransactionSemantics transaction = currentTransaction.get(); if (transaction == null || transaction.getState().equals( BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction(); currentTransaction.set(transaction); } return transaction; } }
找了許久,終于發(fā)現(xiàn)了put()和take(),但是仔細(xì)一看,它們內(nèi)部調(diào)用的是BasicTransactionSemantics 的put()和take(),有點(diǎn)失望,繼續(xù)來(lái)看看BasicTransactionSemantics
public abstract class BasicTransactionSemantics implements Transaction { private State state; private long initialThreadId; protected void doBegin() throws InterruptedException {} protected abstract void doPut(Event event) throws InterruptedException; protected abstract Event doTake() throws InterruptedException; protected abstract void doCommit() throws InterruptedException; protected abstract void doRollback() throws InterruptedException; protected void doClose() {} protected BasicTransactionSemantics() { state = State.NEW; initialThreadId = Thread.currentThread().getId(); } protected void put(Event event) { Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "put() called from different thread than getTransaction()!"); Preconditions.checkState(state.equals(State.OPEN), "put() called when transaction is %s!", state); Preconditions.checkArgument(event != null, "put() called with null event!"); try { doPut(event); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ChannelException(e.toString(), e); } } protected Event take() { Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "take() called from different thread than getTransaction()!"); Preconditions.checkState(state.equals(State.OPEN), "take() called when transaction is %s!", state); try { return doTake(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } } protected State getState() { return state; } ...//我們這里只是討論put和take,所以一些暫時(shí)不涉及的方法就被我干掉,有興趣恩典朋友可以自行閱讀 protected static enum State { NEW, OPEN, COMPLETED, CLOSED } }
又是一個(gè)抽象類,put()和take()內(nèi)部調(diào)用的還是抽象方法doPut()和doTake(),看到這里,我相信沒(méi)有耐心的同學(xué)已經(jīng)崩潰了,但是就差最后一步了,既然是抽象類,那么最終Channel所使用的肯定是它的一個(gè)實(shí)現(xiàn)類,這時(shí)候我們可以回到一開始使用的MemoryChannel,到里面找找有沒(méi)有線索,一看,MemoryChannel中就藏著個(gè)內(nèi)部類
private class MemoryTransaction extends BasicTransactionSemantics { private LinkedBlockingDeque<Event> takeList; private LinkedBlockingDeque<Event> putList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; public MemoryTransaction(int transCapacity, ChannelCounter counter) { putList = new LinkedBlockingDeque<Event>(transCapacity); takeList = new LinkedBlockingDeque<Event>(transCapacity); channelCounter = counter; } @Override protected void doPut(Event event) throws InterruptedException { channelCounter.incrementEventPutAttemptCount(); int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } putByteCounter += eventByteSize; } @Override protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount(); if (takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } Event event; synchronized (queueLock) { event = queue.poll(); } Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); takeList.put(event); int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); takeByteCounter += eventByteSize; return event; } //...依然刪除暫時(shí)不需要的方法 }
在這個(gè)類中我們可以看到doPut()和doTake()的實(shí)現(xiàn)方法,也明白MemoryChannel的put()和take()最終調(diào)用的是MemoryTransaction 的doPut()和doTake()。
有朋友看到這里以為這次解析就要結(jié)束了,其實(shí)好戲還在后頭,Channel中還有兩個(gè)重要的類ChannelProcessor和ChannelSelector,耐心地聽(tīng)我慢慢道來(lái)。
ChannelProcessor 的作用就是執(zhí)行put操作,將數(shù)據(jù)放到channel里面。每個(gè)ChannelProcessor實(shí)例都會(huì)配備一個(gè)ChannelSelector來(lái)決定event要put到那個(gè)channl當(dāng)中
public class ChannelProcessor implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(ChannelProcessor.class); private final ChannelSelector selector; private final InterceptorChain interceptorChain; public ChannelProcessor(ChannelSelector selector) { this.selector = selector; this.interceptorChain = new InterceptorChain(); } public void initialize() { this.interceptorChain.initialize(); } public void close() { this.interceptorChain.close(); } public void configure(Context context) { this.configureInterceptors(context); } private void configureInterceptors(Context context) { //配置攔截器 } public ChannelSelector getSelector() { return this.selector; } public void processEventBatch(List<Event> events) { ... while(i$.hasNext()) { Event optChannel = (Event)i$.next(); List tx = this.selector.getRequiredChannels(optChannel); ...//將event放到Required隊(duì)列 t1 = this.selector.getOptionalChannels(optChannel); Object eventQueue; ...//將event放到Optional隊(duì)列 } ...//event的分配操作 } public void processEvent(Event event) { event = this.interceptorChain.intercept(event); if(event != null) { List requiredChannels = this.selector.getRequiredChannels(event); Iterator optionalChannels = requiredChannels.iterator(); ...//event的分配操作 List optionalChannels1 = this.selector.getOptionalChannels(event); Iterator i$1 = optionalChannels1.iterator(); ...//event的分配操作 } } }
為了簡(jiǎn)化代碼,我進(jìn)行了一些刪除,只保留需要講解的部分,說(shuō)白了Channel中的兩個(gè)寫入方法,都是需要從作為參數(shù)傳入的selector中獲取對(duì)應(yīng)的channel來(lái)執(zhí)行event的put操作。接下來(lái)我們來(lái)看看ChannelSelector
ChannelSelector是一個(gè)接口,我們可以通過(guò)ChannelSelectorFactory來(lái)創(chuàng)建它的子類,F(xiàn)lume提供了兩個(gè)實(shí)現(xiàn)類MultiplexingChannelSelector和ReplicatingChannelSelector。
public interface ChannelSelector extends NamedComponent, Configurable { void setChannels(List<Channel> var1); List<Channel> getRequiredChannels(Event var1); List<Channel> getOptionalChannels(Event var1); List<Channel> getAllChannels(); }
通過(guò)ChannelSelectorFactory 的create來(lái)創(chuàng)建,create中調(diào)用getSelectorForType來(lái)獲得一個(gè)selector,通過(guò)配置文件中的type來(lái)創(chuàng)建相應(yīng)的子類
public class ChannelSelectorFactory { private static final Logger LOGGER = LoggerFactory.getLogger( ChannelSelectorFactory.class); public static ChannelSelector create(List<Channel> channels, Map<String, String> config) { ... } public static ChannelSelector create(List<Channel> channels, ChannelSelectorConfiguration conf) { String type = ChannelSelectorType.REPLICATING.toString(); if (conf != null) { type = conf.getType(); } ChannelSelector selector = getSelectorForType(type); selector.setChannels(channels); Configurables.configure(selector, conf); return selector; } private static ChannelSelector getSelectorForType(String type) { if (type == null || type.trim().length() == 0) { return new ReplicatingChannelSelector(); } String selectorClassName = type; ChannelSelectorType selectorType = ChannelSelectorType.OTHER; try { selectorType = ChannelSelectorType.valueOf(type.toUpperCase(Locale.ENGLISH)); } catch (IllegalArgumentException ex) { LOGGER.debug("Selector type {} is a custom type", type); } if (!selectorType.equals(ChannelSelectorType.OTHER)) { selectorClassName = selectorType.getChannelSelectorClassName(); } ChannelSelector selector = null; try { @SuppressWarnings("unchecked") Class<? extends ChannelSelector> selectorClass = (Class<? extends ChannelSelector>) Class.forName(selectorClassName); selector = selectorClass.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to load selector type: " + type + ", class: " + selectorClassName, ex); } return selector; } }
對(duì)于這兩種Selector簡(jiǎn)單說(shuō)一下:
1)MultiplexingChannelSelector
下面是一個(gè)channel selector 配置文件
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
MultiplexingChannelSelector類中定義了三個(gè)屬性,用于存儲(chǔ)不同類型的channel
private Map<String, List<Channel>> channelMapping; private Map<String, List<Channel>> optionalChannels; private List<Channel> defaultChannels;
那么具體分配原則如下:
如果設(shè)置了maping,那么會(huì)event肯定會(huì)給指定的channel,如果同時(shí)設(shè)置了optional,也會(huì)發(fā)送給optionalchannel
如果沒(méi)有設(shè)置maping,設(shè)置default,那么event會(huì)發(fā)送給defaultchannel,如果還同時(shí)設(shè)置了optional,那么也會(huì)發(fā)送給optionalchannel
如果maping和default都沒(méi)指定,如果有指定option,那么會(huì)發(fā)送給optionalchannel,但是發(fā)送給optionalchannel不會(huì)進(jìn)行失敗重試
2)ReplicatingChannelSelector
分配原則比較簡(jiǎn)單
如果是replicating的話,那么如果沒(méi)有指定optional,那么全部channel都有,如果某個(gè)channel指定為option的話,那么就要從requiredChannel移除,只發(fā)送給optionalchannel
作為一個(gè)承上啟下的組件,Channel的作用就是將source來(lái)的數(shù)據(jù)通過(guò)自己流向sink,那么ChannelProcessor就起到將event put到分配好的channel中,而分配的規(guī)則是由selector決定的,flume提供的selector有multiplexing和replicating兩種。所以ChannelProcessor一般都是在Source中被調(diào)用。那么Channel的take()肯定是在Sink中調(diào)用的。
網(wǎng)站標(biāo)題:Flume-ng源碼解析之Channel組件
文章源于:http://muchs.cn/article34/pjjcpe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、品牌網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)公司、微信公眾號(hào)、網(wǎng)站設(shè)計(jì)、電子商務(wù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)