【Flume】HDFSSink源碼理解-創(chuàng)新互聯(lián)

HDFSSink組件中,主要由HDFSEventSink,BucketWriter,HDFSWriter幾個(gè)類(lèi)構(gòu)成。

創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:網(wǎng)站建設(shè)、成都網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的都昌網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

其中HDFSEventSink主要功能呢是判定Sink的配置條件是否合法,并負(fù)責(zé)從Channel中獲取events,通過(guò)解析event的header信息決定event對(duì)應(yīng)的BucketWriter。

BucketWriter負(fù)責(zé)按照rollCount,rollSize等條件在HDFS端生成(roll)文件,通過(guò)配置文件配置的文件數(shù)據(jù)格式以及序列化的方式,在每個(gè)BucetWriter同一處理。

HDFSWriter作為接口,其具體實(shí)現(xiàn)有HDFSSequenceFile,HDFSDataStream,HDFSCompressedDataStream這三種

HDFSSink功能中關(guān)鍵類(lèi)類(lèi)圖

【Flume】HDFSSink源碼理解

HDFSEventSink類(lèi)

走通HDFSEventSink之前,肯定要對(duì)其中配置參數(shù)有了解(Flume-HDFSSink配置參數(shù)說(shuō)明)

1、configure()方法中,從配置文件中獲取filePath,fileName等信息,具體參數(shù)含義可以參考(Flume-HDFSSink配置參數(shù)說(shuō)明)

2、start()方法,初始化固定大小線程池callTimeoutPool, 周期執(zhí)行線程池timedRollerPool,以及sfWriters,并啟動(dòng)sinkCounter

  1. callTimeoutPool

  2. timedRollerPool,周期執(zhí)行線程池中主要有HDFS文件重命名的線程(根據(jù)retryInterval),達(dá)到生成文件要求進(jìn)行roll操作的線程(根據(jù)idleTimeout),關(guān)閉閑置文件的線程等(rollInterval)

  3. sfWriters  sfWriters實(shí)際是一個(gè)LinkedHashMap的實(shí)現(xiàn)類(lèi),通過(guò)重寫(xiě)removeEldestEntry方法,將最久未使用的writer移除,保證sfWriters中能夠維護(hù)一個(gè)固定大?。╩axOpenFiles)的大打開(kāi)文件數(shù)

  4. sinkCounter sink組件監(jiān)控指標(biāo)的計(jì)數(shù)器


3、process() 方法是HDFSEventSink中最主要的邏輯(部分關(guān)鍵節(jié)點(diǎn)通過(guò)注釋寫(xiě)到代碼中),

  1. process()方法中獲取到Channel,

  2. 并按照batchSize大小循環(huán)從Channel中獲取event,通過(guò)解析event得到event的header等信息,確定該event的HDFS目的路徑以及目的文件名

  3. 每個(gè)event可能對(duì)應(yīng)不同的bucketWriter和hdfswriter,將每個(gè)event添加到相應(yīng)的writer中

  4. 當(dāng)event個(gè)數(shù)達(dá)到batchSize個(gè)數(shù)后,writer進(jìn)行flush,同時(shí)提交事務(wù)

其中bucketWriter負(fù)責(zé)生成(roll)文件的方式,處理文件格式以及序列化等邏輯

其中hdfsWriter具體實(shí)現(xiàn)有"SequenceFile","DataStream","CompressedStream";三種,用戶根據(jù)hdfs.fileType參數(shù)確定具體hdfsWriter的實(shí)現(xiàn)

public Status process() throws EventDeliveryException {
Channel channel = getChannel(); //調(diào)用父類(lèi)getChannel方法,建立Channel與Sink之間的連接
Transaction transaction = channel.getTransaction();//每次batch提交都建立在一個(gè)事務(wù)上
transaction.begin();
try {
Set<BucketWriter> writers = new LinkedHashSet<>();
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();//從Channel中取出event
if (event == null) {//沒(méi)有新的event的時(shí)候,則不需要按照batchSize循環(huán)取
break;
}
// reconstruct the path name by substituting place holders
// 在配置文件中會(huì)有“a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S”這樣的%表示的變量
// 解析配置文件中的變量構(gòu)造realPath 和 realName
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter;
HDFSWriter hdfsWriter = null;
WriterCallback closeCallback = new WriterCallback() {
@Override
public void run(String bucketPath) {
LOG.info("Writer callback called.");
synchronized (sfWritersLock) {
sfWriters.remove(bucketPath);//sfWriters以LRU方式維護(hù)了一個(gè)maxOpenFiles大小的map.始終保持最多打開(kāi)文件個(gè)數(shù)
}
}
};
synchronized (sfWritersLock) {
bucketWriter = sfWriters.get(lookupPath);
// we haven't seen this file yet, so open it and cache the handle
if (bucketWriter == null) {
hdfsWriter = writerFactory.getWriter(fileType);//通過(guò)工廠獲取文件類(lèi)型,其中包括"SequenceFile","DataStream","CompressedStream";
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
sfWriters.put(lookupPath, bucketWriter);
}
}
// Write the data to HDFS
try {
bucketWriter.append(event);
} catch (BucketClosedException ex) {
LOG.info("Bucket was closed while trying to append, " +
"reinitializing bucket and writing event.");
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
}
bucketWriter.append(event);
}
// track the buckets getting written in this transaction
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
}
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
transaction.commit();
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
}
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
transaction.close();
}
}

BucketWriter

flush() 方法:

BucketWriter中維護(hù)了一個(gè)batchCounter,在這個(gè)batchCounter大小不為0的時(shí)候會(huì)進(jìn)行doFlush(), doFlush()主要就是對(duì)batch中的event進(jìn)行序列化和輸出流flush操作,最終結(jié)果就是將events寫(xiě)入HDFS中。

如果用戶設(shè)置了idleTimeout參數(shù)不為0,在doFlush()操作之后,會(huì)往定時(shí)執(zhí)行線程池中添加一個(gè)任務(wù),該關(guān)閉當(dāng)前連接HDFS的輸出對(duì)象HDFSWriter,執(zhí)行時(shí)間間隔為idleTimeout,并將這個(gè)延遲調(diào)度的任務(wù)賦值給idleFuture變量。

append()方法:

在介紹flush()方法中,會(huì)介紹一個(gè)idleFuture變量對(duì)應(yīng)的功能,在append()方法執(zhí)行前首先會(huì)檢查idleFuture任務(wù)是否執(zhí)行完畢,如果沒(méi)有執(zhí)行完成會(huì)設(shè)置一個(gè)超時(shí)時(shí)間callTimeout等待該進(jìn)程完成,然后再進(jìn)行append之后的操作。這樣做主要是為了防止關(guān)閉HdfsWriter的過(guò)程中還在往HDFS中append數(shù)據(jù),在append一半時(shí)候,HdfsWriter關(guān)閉了。

之后,在正是append()之前,又要首先檢查當(dāng)前是否存在HDFSWirter可用于append操作,如果沒(méi)有調(diào)用open()方法。

每次將event往hdfs中append的時(shí)候都需要對(duì)rollCount,rollSize兩個(gè)參數(shù)進(jìn)行檢查,在滿足這兩個(gè)參數(shù)條件的情況下,就需要將臨時(shí)文件重命名為(roll)正式的HDFS文件。之后,重新再open一個(gè)hdfswriter,往這個(gè)hdfswriter中append每個(gè)event,當(dāng)event個(gè)數(shù)達(dá)到batchSize時(shí),進(jìn)行flush操作。

public synchronized void append(final Event event) throws IOException, InterruptedException {
checkAndThrowInterruptedException();
// idleFuture是ScheduledFuture實(shí)例,主要功能關(guān)閉當(dāng)前HDFSWriter,在append event之前需要判斷
// idleFuture是否已經(jīng)執(zhí)行完成,否則會(huì)造成在append一半的時(shí)候 hdfswriter被關(guān)閉
if (idleFuture != null) {
idleFuture.cancel(false);
// There is still a small race condition - if the idleFuture is already
// running, interrupting it can cause HDFS close operation to throw -
// so we cannot interrupt it while running. If the future could not be
// cancelled, it is already running - wait for it to finish before
// attempting to write.
if (!idleFuture.isDone()) {
try {
idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
" file close may have failed", ex);
} catch (Exception ex) {
LOG.warn("Error while trying to cancel closing of idle file. ", ex);
}
}
idleFuture = null;
}
// If the bucket writer was closed due to roll timeout or idle timeout,
// force a new bucket writer to be created. Roll count and roll size will
// just reuse this one
if (!isOpen) {
if (closed) {
throw new BucketClosedException("This bucket writer was closed and " +
"this handle is thus no longer valid");
}
open();
}
// 檢查rollCount,rollSize兩個(gè)roll文件的參數(shù),判斷是否roll出新文件
if (shouldRotate()) {
boolean doRotate = true;
if (isUnderReplicated) {
if (maxConsecUnderReplRotations > 0 &&
consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
doRotate = false;
if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
LOG.error("Hit max consecutive under-replication rotations ({}); " +
"will not continue rolling files under this path due to " +
"under-replication", maxConsecUnderReplRotations);
}
} else {
LOG.warn("Block Under-replication detected. Rotating file.");
}
consecutiveUnderReplRotateCount++;
} else {
consecutiveUnderReplRotateCount = 0;
}
if (doRotate) {
close();
open();
}
}
// write the event
try {
sinkCounter.incrementEventDrainAttemptCount();// sinkCounter統(tǒng)計(jì)metrix
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
writer.append(event); //writer是通過(guò)配置參數(shù)hdfs.fileType創(chuàng)建的HDFSWriter實(shí)現(xiàn)
return null;
}
});
} catch (IOException e) {
LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
bucketPath + ") and rethrowing exception.",
e.getMessage());
try {
close(true);
} catch (IOException e2) {
LOG.warn("Caught IOException while closing file (" +
bucketPath + "). Exception follows.", e2);
}
throw e;
}
// update statistics
processSize += event.getBody().length;
eventCounter++;
batchCounter++;
if (batchCounter == batchSize) {
flush();
}
}

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專(zhuān)為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

新聞名稱(chēng):【Flume】HDFSSink源碼理解-創(chuàng)新互聯(lián)
當(dāng)前網(wǎng)址:http://muchs.cn/article4/cspsoe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、動(dòng)態(tài)網(wǎng)站商城網(wǎng)站、電子商務(wù)企業(yè)建站、小程序開(kāi)發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁(yè)設(shè)計(jì)公司