java多路復(fù)用器代碼 http多路復(fù)用原理

阻塞、非阻塞、多路復(fù)用、同步、異步、BIO、NIO、AIO 一文搞定

關(guān)于IO會涉及到阻塞、非阻塞、多路復(fù)用、同步、異步、BIO、NIO、AIO等幾個知識點。知識點雖然不難但平常經(jīng)常容易搞混,特此Mark下,與君共勉。

網(wǎng)站制作、成都網(wǎng)站建設(shè),成都做網(wǎng)站公司-創(chuàng)新互聯(lián)建站已向上1000家企業(yè)提供了,網(wǎng)站設(shè)計,網(wǎng)站制作,網(wǎng)絡(luò)營銷等服務(wù)!設(shè)計與技術(shù)結(jié)合,多年網(wǎng)站推廣經(jīng)驗,合理的價格為您打造企業(yè)品質(zhì)網(wǎng)站。

阻塞IO情況下,當用戶調(diào)用 read 后,用戶線程會被阻塞,等內(nèi)核數(shù)據(jù)準備好并且數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶態(tài)緩存區(qū)后 read 才會返回??梢钥吹绞亲枞膬蓚€部分。

非阻塞IO發(fā)出read請求后發(fā)現(xiàn)數(shù)據(jù)沒準備好,會繼續(xù)往下執(zhí)行,此時應(yīng)用程序會不斷輪詢polling內(nèi)核詢問數(shù)據(jù)是否準備好,當數(shù)據(jù)沒有準備好時,內(nèi)核立即返回EWOULDBLOCK錯誤。直到數(shù)據(jù)被拷貝到應(yīng)用程序緩沖區(qū),read請求才獲取到結(jié)果。并且你要注意!這里最后一次 read 調(diào)用獲取數(shù)據(jù)的過程,是一個同步的過程,是需要等待的過程。這里的同步指的是 內(nèi)核態(tài)的數(shù)據(jù)拷貝到用戶程序的緩存區(qū)這個過程 。

非阻塞情況下無可用數(shù)據(jù)時,應(yīng)用程序每次輪詢內(nèi)核看數(shù)據(jù)是否準備好了也耗費CPU,能否不讓它輪詢,當內(nèi)核緩沖區(qū)數(shù)據(jù)準備好了,以事件通知當機制告知應(yīng)用進程數(shù)據(jù)準備好了呢?應(yīng)用進程在沒有收到數(shù)據(jù)準備好的事件通知信號時可以忙寫其他的工作。此時 IO多路復(fù)用 就派上用場了。

IO多路復(fù)用中文比較讓人頭大,IO多路復(fù)用的原文叫 I/O multiplexing,這里的 multiplexing 指的其實是在單個線程通過記錄跟蹤每一個Sock(I/O流)的狀態(tài)來同時管理多個I/O流. 發(fā)明它的目的是盡量多的提高服務(wù)器的吞吐能力。實現(xiàn)一個線程監(jiān)控多個IO請求,哪個IO有請求就把數(shù)據(jù)從內(nèi)核拷貝到進程緩沖區(qū),拷貝期間是阻塞的!現(xiàn)在已經(jīng)可以通過采用mmap地址映射的方法,達到內(nèi)存共享效果,避免真復(fù)制,提高效率。

像 select、poll、epoll 都是I/O多路復(fù)用的具體的實現(xiàn)。

select是第一版IO復(fù)用,提出后暴漏了很多問題。

poll 修復(fù)了 select 的很多問題。

但是poll仍然不是線程安全的, 這就意味著不管服務(wù)器有多強悍,你也只能在一個線程里面處理一組 I/O 流。你當然可以拿多進程來配合了,不過然后你就有了多進程的各種問題。

epoll 可以說是 I/O 多路復(fù)用最新的一個實現(xiàn),epoll 修復(fù)了poll 和select絕大部分問題, 比如:

橫軸 Dead connections 是鏈接數(shù)的意思,叫這個名字只是它的測試工具叫deadcon。縱軸是每秒處理請求的數(shù)量,可看到epoll每秒處理請求的數(shù)量基本不會隨著鏈接變多而下降的。poll 和/dev/poll 就很慘了。但 epoll 有個致命的缺點是只有 linux 支持。

比如平常Nginx為何可以支持4W的QPS是因為它會使用目標平臺上面最高效的I/O多路復(fù)用模型。

然后你會發(fā)現(xiàn)上面的提到過的操作都不是真正的異步,因為兩個階段總要等待會兒!而真正的異步 I/O 是內(nèi)核數(shù)據(jù)準備好和數(shù)據(jù)從內(nèi)核態(tài)拷貝到用戶態(tài)這兩個過程都不用等待。

很慶幸,Linux給我們準備了 aio_read 跟 aio_write 函數(shù)實現(xiàn)真實的異步,當用戶發(fā)起aio_read請求后就會自動返回。內(nèi)核會自動將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶進程空間,應(yīng)用進程啥都不用管。

我強力推薦C++后端開發(fā)免費學(xué)習(xí)地址:C/C++Linux服務(wù)器開發(fā)/后臺架構(gòu)師【零聲教育】-學(xué)習(xí)視頻教程-騰訊課堂

同步跟異步的區(qū)別在于 數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間是否由用戶線程完成 ,這里又分為同步阻塞跟同步非阻塞兩種。

我們以同步非阻塞為例,如下可看到,在將數(shù)據(jù)從內(nèi)核拷貝到用戶空間這一過程,是由用戶線程阻塞完成的。

可發(fā)現(xiàn),用戶在調(diào)用之后會立即返回,由內(nèi)核完成數(shù)據(jù)的拷貝工作,并通知用戶線程,進行回調(diào)。

在Java中,我們使用socket進行網(wǎng)絡(luò)通信,IO主要有三種模式,主要看 內(nèi)核支持 哪些。

同步阻塞IO ,每個客戶端的Socket連接請求,服務(wù)端都會對應(yīng)有個處理線程與之對應(yīng),對于沒有分配到處理線程的連接就會被阻塞或者拒絕。相當于是 一個連接一個線程 。

BIO特點 :

常量:

主類:

服務(wù)端監(jiān)聽線程:

服務(wù)端處理線程:

客戶端:

同步非阻塞IO之NIO :服務(wù)器端保存一個Socket連接列表,然后對這個列表進行輪詢,如果發(fā)現(xiàn)某個Socket端口上有數(shù)據(jù)可讀時說明讀就緒,則調(diào)用該socket連接的相應(yīng)讀操作。如果發(fā)現(xiàn)某個 Socket端口上有數(shù)據(jù)可寫時說明寫就緒,則調(diào)用該socket連接的相應(yīng)寫操作。如果某個端口的Socket連接已經(jīng)中斷,則調(diào)用相應(yīng)的析構(gòu)方法關(guān)閉該端口。這樣能充分利用服務(wù)器資源,效率得到了很大提高,在進行IO操作請求時候再用個線程去處理,是 一個請求一個線程 。Java中使用Selector、Channel、Buffer來實現(xiàn)上述效果。

每個線程中包含一個 Selector 對象,它相當于一個通道管理器,可以實現(xiàn)在一個線程中處理多個通道的目的,減少線程的創(chuàng)建數(shù)量。遠程連接對應(yīng)一個channel,數(shù)據(jù)的讀寫通過buffer均在同一個 channel 中完成,并且數(shù)據(jù)的讀寫是非阻塞的。通道創(chuàng)建后需要注冊在 selector 中,同時需要為該通道注冊感興趣事件(客戶端連接服務(wù)端事件、服務(wù)端接收客戶端連接事件、讀事件、寫事件), selector 線程需要采用 輪訓(xùn) 的方式調(diào)用 selector 的 select 函數(shù),直到所有注冊通道中有興趣的事件發(fā)生,則返回,否則一直阻塞。而后循環(huán)處理所有就緒的感興趣事件。以上步驟解決BIO的兩個瓶頸:

下面對以下三個概念做一個簡單介紹,Java NIO由以下三個核心部分組成:

channel和buffer有好幾種類型。下面是Java NIO中的一些主要channel的實現(xiàn):

正如你所看到的,這些通道涵蓋了UDP和TCP網(wǎng)絡(luò)IO,以及文件IO。以下是Java NIO里關(guān)鍵的buffer實現(xiàn):

在微服務(wù)階段,一個請求可能涉及到多個不同服務(wù)之間的跨服務(wù)器調(diào)用,如果你想實現(xiàn)高性能的PRC框架來進行數(shù)據(jù)傳輸,那就可以基于Java NIO做個支持長連接、自定義協(xié)議、高并發(fā)的框架,比如Netty。Netty本身就是一個基于NIO的網(wǎng)絡(luò)框架, 封裝了Java NIO那些復(fù)雜的底層細節(jié),給你提供簡單好用的抽象概念來編程。比如Dubbo底層就是用的Netty。

AIO是異步非阻塞IO,相比NIO更進一步,進程讀取數(shù)據(jù)時只負責(zé)發(fā)送跟接收指令,數(shù)據(jù)的準備工作完全由操作系統(tǒng)來處理。

推薦一個零聲教育C/C++后臺開發(fā)的免費公開課程,個人覺得老師講得不錯,分享給大家:C/C++后臺開發(fā)高級架構(gòu)師,內(nèi)容包括Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協(xié)程,DPDK等技術(shù)內(nèi)容,C/C++Linux服務(wù)器開發(fā)/后臺架構(gòu)師【零聲教育】-學(xué)習(xí)視頻教程-騰訊課堂 立即學(xué)習(xí)

原文:阻塞、非阻塞、多路復(fù)用、同步、異步、BIO、NIO、AIO 一鍋端

java nio多路復(fù)用是什么意思

就是NIO庫可以利用Selector多路復(fù)用各個Socket連接。

提高連接效率,降低連接的阻塞。

java網(wǎng)絡(luò)io模型有幾種

#BIO---Blocking IO

- 每個socket一個線程,讀寫時線程處于阻塞狀態(tài)。

優(yōu)點:實現(xiàn)簡單

缺點:無法滿足高并發(fā),高接入的需求

- 不使用線程池的BIO模型,除了無法滿足高并發(fā)需求外,由于需要為每個請求創(chuàng)建一個線程,還可能因為接入大量不活躍連接而耗盡服務(wù)器資源。

- 使用線程池的BIO模型,雖然控制了線程數(shù)量,但由于其本質(zhì)上讀寫仍是阻塞的,仍無法滿足高并發(fā)需求。

#NIO---Non-Blocking IO(非阻塞IO)

##非阻塞IO和多路復(fù)用

非阻塞IO和多路復(fù)用實際上是兩個不用的概念,由于兩者通常結(jié)合在一起使用,因此兩者往往被混為一談。下面我將試著分清這兩個概念:

###非阻塞IO

與BIO相對應(yīng),非阻塞IO的讀寫方法無論是否有數(shù)據(jù)都立即返回,因此可以通過輪詢方式來實現(xiàn),但輪詢方式的效率并不比BIO有顯著提高,因為每個連接仍然需要占用一個線程。下面是輪詢方式實現(xiàn)的IO模式圖:

###多路復(fù)用

- 多路復(fù)用結(jié)合非阻塞IO能夠明顯提高IO的效率,這也是Java1.4把非阻塞IO和多路復(fù)用同時發(fā)布的原因。

- 多路復(fù)用的核心是多路復(fù)用器(Selector),它是需要操作系統(tǒng)底層支持的,簡單的說,就是進程把多個socket和它們關(guān)心的事件(比如連接請求或數(shù)據(jù)已準備好)都注冊在多路復(fù)用器上,操作系統(tǒng)會在事件發(fā)生時通知多路復(fù)用器,這樣進程就可以通過多路復(fù)用器知道在那個socket上發(fā)生了什么時間,從而進行對應(yīng)的處理。

- 多路復(fù)用的優(yōu)點在于只需要一個線程監(jiān)測(阻塞或輪詢方式均可)多路選擇器的狀態(tài),只有在有事件需要發(fā)生時才會真正的創(chuàng)建線程進行處理,因此更適合高并發(fā)多接入的應(yīng)用環(huán)境。

- 在Linux系統(tǒng)下,多路復(fù)用的底層實現(xiàn)是epoll方法,與select/poll的順序掃描不同,epoll采用效率更高的事件驅(qū)動方式,而且epoll方式并沒有socket個數(shù)限制。

##BIO和NIO的比較

- BIO適用于連接長期保持的應(yīng)用,比如一個復(fù)雜系統(tǒng)中模塊之間通過長連接來進行通信。

- NIO加多路復(fù)用的模式更適合短連接、高并發(fā)、多接入的情形,比如網(wǎng)絡(luò)服務(wù)器。

##NIO網(wǎng)絡(luò)編程的常用接口

##Reactor模式

Reactor模式用于解決事件分發(fā)處理的問題,Handler把自己的channel和關(guān)注的事件注冊到Selector中,當對應(yīng)的事件發(fā)生在自己的channel上時,對應(yīng)的handler就會得到通知并進行處理。

- 單線程的Reactor

消息的分發(fā)、讀寫、處理都在一個線程中處理,是Reactor最簡單的實現(xiàn)方式,如果消息的處理需要較長時間,會影響效率。

```java

//Reactor類,負責(zé)分發(fā)事件并調(diào)用對應(yīng)的handler

class Reactor implements Runnable {

final Selector selector;

final ServerSocketChannel serverSocket;

//Reactor初始化

Reactor(int port) throws IOException {

selector = Selector.open();

serverSocket = ServerSocketChannel.open();

serverSocket.socket().bind(new InetSocketAddress(port));

serverSocket.configureBlocking(false); //必須配置為非阻塞

//Acceptor會在Reactor初始化時就注冊到Selector中,用于接受connect請求

SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

sk.attach(new Acceptor()); //attach callback object, Acceptor

}

//分發(fā)消息并調(diào)用對應(yīng)的handler

public void run() {

try {

while (!Thread.interrupted()) {

selector.select();

Set selected = selector.selectedKeys();

Iterator it = selected.iterator();

while (it.hasNext())

dispatch((SelectionKey)(it.next()); //Reactor負責(zé)dispatch收到的事件

selected.clear();

}

} catch (IOException ex) { /* ... */ }

}

void dispatch(SelectionKey k) {

Runnable r = (Runnable)(k.attachment()); //調(diào)用之前注冊的callback對象

if (r != null)

r.run();

}

//Acceptor也是一個handler,負責(zé)創(chuàng)建socket并把新建的socket也注冊到selector中

class Acceptor implements Runnable { // inner

public void run() {

try {

SocketChannel c = serverSocket.accept();

if (c != null)

new Handler(selector, c);

}

catch(IOException ex) { /* ... */ }

}

}

}

//Concrete Handler:用于收發(fā)和處理消息。

//在當前的實現(xiàn)中,使用Runnable接口作為每個具體Handler的統(tǒng)一接口

//如果在處理時需要參數(shù)和返回值,也可以為Handler另外聲明一個統(tǒng)一接口來代替Runnable接口

final class Handler implements Runnable {

final SocketChannel socket;

final SelectionKey sk;

ByteBuffer input = ByteBuffer.allocate(MAXIN);

ByteBuffer output = ByteBuffer.allocate(MAXOUT);

static final int READING = 0, SENDING = 1;

int state = READING;

Handler(Selector sel, SocketChannel c) throws IOException {

socket = c; c.configureBlocking(false);

// Optionally try first read now

sk = socket.register(sel, 0);

sk.attach(this); //將Handler作為callback對象

sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件

sel.wakeup();

}

boolean inputIsComplete() { /* ... */ }

boolean outputIsComplete() { /* ... */ }

void process() { /* ... */ }

public void run() {

try {

if (state == READING) read();

else if (state == SENDING) send();

} catch (IOException ex) { /* ... */ }

}

void read() throws IOException {

socket.read(input);

if (inputIsComplete()) {

process();

state = SENDING;

// Normally also do first write now

sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件

}

}

void send() throws IOException {

socket.write(output);

if (outputIsComplete()) sk.cancel(); //write完就結(jié)束了, 關(guān)閉select key

}

}

//上面 的實現(xiàn)用Handler來同時處理Read和Write事件, 所以里面出現(xiàn)狀態(tài)判斷

//我們可以用State-Object pattern來更優(yōu)雅的實現(xiàn)

class Handler { // ...

public void run() { // initial state is reader

socket.read(input);

if (inputIsComplete()) {

process();

sk.attach(new Sender()); //狀態(tài)遷移, Read后變成write, 用Sender作為新的callback對象

sk.interest(SelectionKey.OP_WRITE);

sk.selector().wakeup();

}

}

class Sender implements Runnable {

public void run(){ // ...

socket.write(output);

if (outputIsComplete()) sk.cancel();

}

}

}

```

- 多線程Reacotr

處理消息過程放在其他線程中執(zhí)行

```java

class Handler implements Runnable {

// uses util.concurrent thread pool

static PooledExecutor pool = new PooledExecutor(...);

static final int PROCESSING = 3;

// ...

synchronized void read() { // ...

socket.read(input);

if (inputIsComplete()) {

state = PROCESSING;

pool.execute(new Processer()); //使用線程pool異步執(zhí)行

}

}

synchronized void processAndHandOff() {

process();

state = SENDING; // or rebind attachment

sk.interest(SelectionKey.OP_WRITE); //process完,開始等待write事件

}

class Processer implements Runnable {

public void run() { processAndHandOff(); }

}

}

```

- 使用多個selector

mainReactor只負責(zé)處理accept并創(chuàng)建socket,多個subReactor負責(zé)處理讀寫請求

```java

Selector[] selectors; //subReactors集合, 一個selector代表一個subReactor

int next = 0;

class Acceptor { // ...

public synchronized void run() { ...

Socket connection = serverSocket.accept(); //主selector負責(zé)accept

if (connection != null)

new Handler(selectors[next], connection); //選個subReactor去負責(zé)接收到的connection

if (++next == selectors.length) next = 0;

}

}

```

#AIO

AIO是真正的異步IO,它于JDK1.7時引入,它和NIO的區(qū)別在于:

- NIO仍然需要一個線程阻塞在select方法上,AIO則不需要

- NIO得到數(shù)據(jù)準備好的消息以后,仍然需要自己把消息復(fù)制到用戶空間,AIO則是通過操作系統(tǒng)的支持把數(shù)據(jù)異步復(fù)制到用戶空間以后再給應(yīng)用進程發(fā)出信號。

IO多路復(fù)用

阻塞IO只能阻塞一個IO操作,IO復(fù)用模型能阻塞多個IO操作,所以才叫多路復(fù)用

讀數(shù)據(jù) :

直到數(shù)據(jù)全拷貝至User Space后才返回

不斷去Kernel做Polling,詢問比如讀操作是否完成,沒完成則read()操作會返回EWOUDBLOCK,需要過一會再嘗試執(zhí)行一次read()。該模式會消耗大量CPU

之前等待時間主要消耗在等數(shù)據(jù)到達上。IO Multiplexing則是將等待數(shù)據(jù)到來和讀取實際數(shù)據(jù)兩個事情分開,好處是通過select()等IO Multiplexing的接口一次可以等待在多個Socket上。select()返回后,處于Ready狀態(tài)的Socket執(zhí)行讀操作時候也會阻塞,只是只阻塞將數(shù)據(jù)從Kernel拷貝到User的時間

首先注冊處理函數(shù)到SIGIO信號上,在等待數(shù)據(jù)到來過程結(jié)束后,系統(tǒng)觸發(fā)SGIO信號,之后可以在信號處理函數(shù)中執(zhí)行讀數(shù)據(jù)操作,再喚醒Main Thread或直接喚醒Main Thread讓它完成數(shù)據(jù)讀取。整個過程沒有一次阻塞。

問題:TCP下,連接斷開/可讀/可寫等都會產(chǎn)生Signal,并且Signal沒有提供好的方法去區(qū)分這些Signal到底為什么被觸發(fā)

AIO是注冊一個讀任務(wù),直到讀任務(wù)完全完成后才會通知應(yīng)用層。AIO是由內(nèi)核通知IO操作什么時候完成,信號驅(qū)動IO是由內(nèi)核告知何時啟動IO操作

也存在挺多問題,比如如何去cancel一個讀任務(wù)

除了AIO是異步IO,其他全是同步IO

fd_set: 一個long類型的數(shù)組,每一位可以表示一個文件描述符

問題 :

返回條件與select一樣。

fds還是關(guān)注的描述符列表。poll將events和reevents分開了,所以如果關(guān)注的events沒有發(fā)生變化就可以重用fds,poll只修改rents不會動events。fds是個數(shù)組,不是fds_set,沒有了上限。

相對于select,poll解決了fds長度上限問題,解決了監(jiān)聽描述符無法復(fù)用問題,但仍需在poll返回后遍歷fds去找ready的描述符,也要清理ready描述符對應(yīng)的revents,Kernel也同樣是每次poll調(diào)用需要去遍歷fds注冊監(jiān)聽,poll返回時拆除監(jiān)聽,也仍有驚群問題,無法動態(tài)修改描述符的問題。

使用步驟:

優(yōu)點 :

缺點 :

changelist用于傳遞關(guān)心的event

nchanges用于傳遞changelist的大小

eventlist用于當有事件產(chǎn)生后,將產(chǎn)生的事件放在這里

nevents用于傳遞eventlist大小

timeout 超時時間

kqueue高級的地方在于,它監(jiān)聽的不一定非要是Socket,不一定非要是文件,可以是一系列事件,所以struct kevent內(nèi)參數(shù)叫filter,用于過濾出關(guān)心的事件。

kqueue有epoll所有優(yōu)點,還能通過changelist一次注冊多個關(guān)心的event,不需要像epoll那樣每次調(diào)用epoll_ctl去配置

當我們執(zhí)行epoll_ctl時,除了把socket放到epoll文件系統(tǒng)里file對象對應(yīng)的紅黑樹上之外,還會給內(nèi)核中斷處理程序注冊一個回調(diào)函數(shù),告訴內(nèi)核,如果這個句柄的中斷到了,就把它放到準備就緒list鏈表里。所以,當一個socket上有數(shù)據(jù)到了,內(nèi)核在把網(wǎng)卡上的數(shù)據(jù)copy到內(nèi)核中后就來把socket插入到準備就緒鏈表里。

如此,一棵紅黑樹,一張準備就緒句柄鏈表,少量的內(nèi)核cache,就幫我們解決了大并發(fā)下的socket處理問題。執(zhí)行epoll_create時,創(chuàng)建了紅黑樹和就緒鏈表,執(zhí)行epoll_ctl時,如果增加socket句柄,則檢查在紅黑樹中是否存在,存在立即返回,不存在則添加到樹干上,然后向內(nèi)核注冊回調(diào)函數(shù),用于當中斷事件來臨時向準備就緒鏈表中插入數(shù)據(jù)。執(zhí)行epoll_wait時立刻返回準備就緒鏈表里的數(shù)據(jù)即可。

Epoll有兩種觸發(fā)模式,一種Edge Trigger簡稱ET,一種Level Trigger簡稱LT。每個使用epoll_ctl注冊在epoll描述符上的被監(jiān)聽的描述符都能單獨配置自己的觸發(fā)模式。

從使用角度的區(qū)別:ET模式下當一個文件描述符Ready后,需要以Non-Blocking方式一直操作這個FD直到操作返回EAGAIN錯誤位置,期間Ready這個事件只會觸發(fā)epoll_wait一次返回。LT模式,如果FD上的事件一直處在Ready狀態(tài)沒處理完,則每次調(diào)用epoll_wait都會立即返回

場景:

Java的NIO提供了Selector類,用于跨平臺的實現(xiàn)Socket Polling,即IO多路復(fù)用。BSD系統(tǒng)上對應(yīng)的是Kqueue,Window上對應(yīng)的是Select,Linux上對應(yīng)的是LT的Epoll(為了跨平臺統(tǒng)一,Windows上背后是Select,是LT的)

Selector的使用:

如何構(gòu)建一個基于netty的后端服務(wù)器

直接上干貨,這個是前奏,比較山寨的實現(xiàn),大家可先自行看下

下面將分析手頭上一個項目,運用的技術(shù)很全,值得學(xué)習(xí),先做一個簡單介紹,當然業(yè)務(wù)部分代碼就不講了。

整個工程采用maven來管理,主要的技術(shù)是spring+jedis+netty+disruptor.看這個組合,這個服務(wù)器端性能應(yīng)該很不錯。

這個工程又引發(fā)我對技術(shù)無限熱愛 ,哈哈。

個工程,目前主要是針對一些基于json/xml/text格式的請求,同時也是支持標準手機請求的,當然,可以自定義一些其他格式或者pc端的請求,而

且針對不同URI,后面掛了不同的handler,這些可能都是一些web處理的基本思想,只是脫離了常規(guī)的web容器或者應(yīng)用服務(wù)器。

xml工具采用xstram來處理,兩個字,方便。

json工具采用jackson\不知道和業(yè)界出名的fastjson\gson\sf.json有何區(qū)別,待鑒定。

戶端的請求,統(tǒng)一繼承ClientRequestModel,經(jīng)過編碼統(tǒng)一轉(zhuǎn)化為domainMessage,交由disruptor來處理,其實oop

里什么繼承,實現(xiàn),封裝思想,大部分都在圍繞一個東西在走,一句話,把看似各有棱角的東西如何轉(zhuǎn)化為共同的東西,求同存異?。ū热?,水,石頭,空氣等,如

果在這一層,我們沒法統(tǒng)一用一個特征來表示,我們可以先把它轉(zhuǎn)化為分子,那是不是可以用同一個東西來表示呢?如何高度抽象封裝,這真是一門藝術(shù))。

看這個工程對客戶端請求,是如何一步步處理的,message-request-event 交由disruptor來處理,很美妙的思想。在了解這些之前,我們有必要深入學(xué)習(xí)一下disruptor,很特別的一個框架,宣言很牛逼,中文文檔在這里(),E文好的同學(xué)請移步到這里()

了解disruptor之前,先學(xué)習(xí)下ringbuffer是如何實現(xiàn)的?

1、ringbuffer的特別之處:

只有一個指針,沒有尾指針,基于數(shù)組,且不會刪除元素,元素會覆蓋,充分利用緩存行,減少垃圾回收。

2、如何從ringbuffer讀取數(shù)據(jù):

------------------------------------------2013-9-9 補充-----------------------------------------------------

下面主要講一下請求如何處理這塊架構(gòu)吧,其實架構(gòu)這個東西,說簡單一點,就是一種簡單可擴展的實現(xiàn)方式,在某些程度上,不要太在意性能。

底層通信建立在netty之上,基本沒做任何改動

Java代碼

public class HttpServerPipelineFactory implements ChannelPipelineFactory {

private ChannelUpstreamHandler channelUpstreamHandler;

public ChannelPipeline getPipeline() throws Exception {

// Create a default pipeline implementation.

ChannelPipeline pipeline = pipeline();

// Uncomment the following line if you want HTTPS

//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();

//engine.setUseClientMode(false);

//pipeline.addLast("ssl", new SslHandler(engine));

pipeline.addLast("decoder", new HttpRequestDecoder());

// Uncomment the following line if you don't want to handle HttpChunks.

pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));

pipeline.addLast("encoder", new HttpResponseEncoder());

// Remove the following line if you don't want automatic content compression.

pipeline.addLast("deflater", new HttpContentCompressor());

//pipeline.addLast("handler", new HttpRequestHandler());

pipeline.addLast("handler", channelUpstreamHandler);

return pipeline;

}

public void setChannelUpstreamHandler(ChannelUpstreamHandler channelUpstreamHandler) {

this.channelUpstreamHandler = channelUpstreamHandler;

}

}

相關(guān)spring配置

Java代碼

bean id="httpServerPipelineFactory" class="com.yunchao.cm.network.http.HttpServerPipelineFactory"

property name="channelUpstreamHandler" ref="httpRequestHandler"/

/bean

Java代碼

bean id="httpRequestHandler" class="com.yunchao.cm.network.http.HttpRequestHandler"

property name="urlMaps"

map

entry key="/payorder"

ref bean="payOrderCodecFactory"/

/entry

entry key="/question"

ref bean="questionCodecFactory"/

/entry

entry key="/sms"

ref bean="smsCodecFactory"/

/entry

代碼太多,不全部貼出來,后面整理一下放到我的github上去。

基如此,我們還是得定義一個handler,繼承simpleChannelUpstreamHander,并重寫了messageReceied方法,具體在這里。

Java代碼

QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri());

String url = queryStringDecoder.getPath();

CodecFactory codecFactory = urlMaps.get(url);

if (null == codecFactory) {

logger.error("unsupported url:{} request.", url);

//sendError(ctx, BAD_REQUEST);

e.getChannel().close();

return;

}

//獲取cmwap網(wǎng)絡(luò)中的手機號碼

String phone = PhoneUtils.getPhone(request.getHeader("x-up-calling-line-id"));

if (request.getMethod().equals(HttpMethod.POST)) {

ChannelBuffer content = request.getContent();

String postParams = content.toString(CharsetUtil.UTF_8);

logger.debug("request content:{}", postParams);

ClientRequestModel model = (ClientRequestModel) codecFactory.decode(postParams);

model.setProperty(model.MESSAGE_EVENT_KEY, e);

model.setProperty(model.HTTP_REQUEST_KEY, request);

model.setProperty(model.HTTP_PHONE_KEY, phone);

InetSocketAddress remoteAddress = (InetSocketAddress) e.getRemoteAddress();

model.setProperty(model.IP_KEY, remoteAddress.getAddress().getHostAddress());

logger.info("user request model:{}", model);

model.fireSelf();

Java代碼

@Override

public DomainMessage fireSelf() {

DomainMessage em = new DomainMessage(this);

EventUtils.fireEvent(em, "alipayNotifyState");

return em;

}

看到這里基本上能夠清楚了,是如何把客戶端請求包裝成ClientRequestModel了,且后面涉及到處理的對象,全部繼承它,在整個架構(gòu)之

中,has a 優(yōu)于 is

a,對于客戶端netty的一些對象,也是存儲在ClientRequestModel中,codec無非也是采用了xml/json/kv,如斯,實現(xiàn)

了字節(jié)與對象之間的轉(zhuǎn)換。

此之外,突然想到剛來杭州工作的第一家公司,基于此,采用的架構(gòu)師servlet充當服務(wù)器,因為這是一個公司內(nèi)部的server,而不是一個平臺,采用

的數(shù)據(jù)格式也比較單一,就是xml,但是采用的外部類庫也是xstream來處理的,但是整個系統(tǒng)維持的日調(diào)用量也是在百萬級別,運用的client則是

采用httpclient,對于不同請求后面掛的handler,是在容器啟動時加載到內(nèi)存中,其余也沒有什么亮點了。

分享文章:java多路復(fù)用器代碼 http多路復(fù)用原理
文章網(wǎng)址:http://muchs.cn/article32/doscpsc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營銷推廣、網(wǎng)站導(dǎo)航、企業(yè)網(wǎng)站制作軟件開發(fā)、定制網(wǎng)站微信小程序

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司