Disruptor分析

什么是 Disruptor?

Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現(xiàn),或者事件監(jiān)聽模式的實現(xiàn)

創(chuàng)新互聯(lián)專注于獲嘉企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,購物商城網(wǎng)站建設(shè)。獲嘉網(wǎng)站建設(shè)公司,為獲嘉等地區(qū)提供建站服務(wù)。全流程按需定制網(wǎng)站,專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)

性能遠遠高于傳統(tǒng)的BlockingQueue容器

Disruptor使用觀察者模式,主動將消息發(fā)送給消費者,而不是等消費者從隊列中取,在無鎖的情況下, 實現(xiàn)queue(環(huán)形, RingBuffer)的并發(fā)操作, 性能遠高于BlockingQueue

Disruptor 的設(shè)計思想

環(huán)形數(shù)組結(jié)構(gòu)

為了避免垃圾回收,使用數(shù)組,數(shù)組對處理器的緩存機制更加友好

數(shù)組長度為 2^n,通過位運算,加快定位速度,下標采用遞增的方式,不用擔心索引溢出

無鎖設(shè)計

每個生產(chǎn)者或者消費者線程,會先申請可以操作的元素在數(shù)組中的位置,申請到之后,直接在該位置寫入或者讀取數(shù)據(jù)

Disruptor 實現(xiàn)生產(chǎn)消費模型

pom

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.2.1</version>
</dependency>

LongEvent

// 聲明一個Event來包含需要傳遞的數(shù)據(jù)
public class LongEvent {
    private Long value;

    public Long getValue() {
        return value;
    }

    public void setValue(Long value) {
        this.value = value;
    }
}

LongEventFactory

// Event工廠
public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {
        return new LongEvent();
    }
}

LongEventHandler

// 事件消費者
public class LongEventHandler implements EventHandler<LongEvent> {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("消費者:"+event.getValue());
    }
}

LongEventProducer

public class LongEventProducer {
    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 獲取事件隊列下標位置
        long sequence = ringBuffer.next();
        try {
            // 取出空隊列
            LongEvent longEvent = ringBuffer.get(sequence);
            // 賦值
            longEvent.setValue(byteBuffer.getLong(0));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("生產(chǎn)者發(fā)送數(shù)據(jù)。。。");
            // 發(fā)送數(shù)據(jù)
            ringBuffer.publish(sequence);
        }
    }
}

Main

public class Main {
    public static void main(String[] args) {
        // 創(chuàng)建可緩存線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 創(chuàng)建工廠
        EventFactory eventFactory = new LongEventFactory();
        // 創(chuàng)建ringBufferSize
        int ringBufferSize = 1024 * 1024;
        // 創(chuàng)建disruptor
        // MULTI表示可以多個生產(chǎn)者
        Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executorService, ProducerType.MULTI, new YieldingWaitStrategy());
        // 注冊消費者
        longEventDisruptor.handleEventsWith(new LongEventHandler());
        // 啟動
        longEventDisruptor.start();
        // 創(chuàng)建RingBuffer容器
        RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();
        // 創(chuàng)建生產(chǎn)者
        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
        // 指定緩沖區(qū)大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 0; i < 100; i++) {
            byteBuffer.putLong(0, i);
            longEventProducer.onData(byteBuffer);
        }
        executorService.shutdown();
        longEventDisruptor.shutdown();
    }
}

什么是 RingBuffer

它是一個環(huán)(首尾相接的環(huán)),作用是存儲數(shù)據(jù),實現(xiàn)不同線程之間的數(shù)據(jù)傳輸

Disruptor 分析

如果圓環(huán)滿了,它會將金數(shù)據(jù)覆蓋,如上圖:現(xiàn)在12的區(qū)域的下個區(qū)域目前是3,如果有新的數(shù)據(jù)到來,那么指針往下移的時候就會把區(qū)域3的數(shù)據(jù)給覆蓋變成13,框架提供了一系列幫助我們平行消費的監(jiān)控,會很好的控制生產(chǎn)者和消費者之間的速度,從而達到生產(chǎn)和消費之間的平衡

RingBuffer 為什么效率高?

采用數(shù)組,數(shù)組支持索引訪問

數(shù)組的內(nèi)存分配是預(yù)先加載的,一但指定大小創(chuàng)建后,就一直存在,這也意味著不需要花大量的時間做垃圾回收,而阻塞隊列采用鏈表實現(xiàn),需要不斷的刪除、創(chuàng)建節(jié)點

Disruptor的核心概念

  • RingBuffer:Disruptor 底層數(shù)據(jù)結(jié)構(gòu)實現(xiàn),核心類,是線程間交換數(shù)據(jù)的中轉(zhuǎn)地
  • Sequence:序號,聲明一個序號,用于跟蹤 ringbuffer 中任務(wù)的變化和消費者的消費情況

  • Sequencer:生產(chǎn)者與緩存 RingBuffer 之間的橋梁,單生產(chǎn)者與多生產(chǎn)者分別對應(yīng)于兩個實現(xiàn)SingleProducerSequencer 與 MultiProducerSequencer,Sequencer 用于向 RingBuffer 申請空間,使用 publish 方法通過 waitStrategy 通知所有在等待可消費事件的 SequenceBarrie
  • SequenceBarrier:序號柵欄,管理和協(xié)調(diào)生產(chǎn)者的游標序號和各個消費者的序號,確保生產(chǎn)者不會覆蓋消費者未來得及處理的消息,確保存在依賴的消費者之間能夠按照正確的順序處理
  • WaitStrategy:有多種實現(xiàn),用以表示當無可消費事件時,消費者的等待策略

  • Event:消費事件

  • EventProcessor:事件處理器,監(jiān)聽 RingBuffer 的事件,并消費可用事件,從 RingBuffer 讀取的事件會交由實際的生產(chǎn)者實現(xiàn)類來消費,它會一直偵聽下一個可用的序號,直到該序號對應(yīng)的事件已經(jīng)準備好

  • EventHandler:業(yè)務(wù)處理器,是實際消費者的接口,完成具體的業(yè)務(wù)邏輯實現(xiàn),第三方實現(xiàn)該接口,代表著消費者

  • Producer:生產(chǎn)者接口,第三方線程充當該角色,producer 向 RingBuffer 寫入事件

當前標題:Disruptor分析
網(wǎng)站URL:http://muchs.cn/article32/piecsc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、虛擬主機微信小程序、云服務(wù)器、營銷型網(wǎng)站建設(shè)網(wǎng)頁設(shè)計公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

綿陽服務(wù)器托管