異步通道和異步運算結(jié)果

異步通道和異步運算結(jié)果

成都創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供綿陽網(wǎng)站建設、綿陽做網(wǎng)站、綿陽網(wǎng)站設計、綿陽網(wǎng)站制作等企業(yè)網(wǎng)站建設、網(wǎng)頁設計與制作、綿陽企業(yè)網(wǎng)站模板建站服務,10多年綿陽做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡服務。

以下內(nèi)容參考孫衛(wèi)琴所寫的《Java網(wǎng)絡編程核心技術(shù)詳解》一書的第12章。
源代碼下載地址為:http://lesson.javathinker.net/javanet/javanetsourcecode.rar

從JDK7開始,引入了表示異步通道的AsynchronousSocketChannel類和AsynchronousServerSocketChannel類,這兩個類的作用與SocketChannel類和ServerSocketChannel相似,區(qū)別在于異步通道的一些方法總是采用非阻塞工作模式,并且它們的非阻塞方法會立即返回一個Future對象,用來存放方法的異步運算結(jié)果。

AsynchronousSocketChannel類有以下非阻塞方法:

  • Future<Void> connect(SocketAddress remote):連接遠程主機。
  • Future<Integer> read(ByteBuffer dst):從通道中讀入數(shù)據(jù),存放到ByteBuffer中。Future對象中包含了實際從通道中讀到的字節(jié)數(shù)。
  • Future<Integer> write(ByteBuffer src):把ByteBuffer中的數(shù)據(jù)寫入到通道中。Future對象中包含了實際寫入通道的字節(jié)數(shù)。
  • AsynchronousServerSocketChannel類有以下非阻塞方法:

  • Future<AsynchronousSocketChannel> accept():接受客戶連接請求。Future對象中包含了連接建立成功后創(chuàng)建的AsynchronousSocketChannel對象。

使用異步通道,可以使程序并行執(zhí)行多個異步操作,例如:

SocketAddress socketAddress=……;
AsynchronousSocketChannel client= AsynchronousSocketChannel.open();
//請求建立連接
Future<Void > connected=client.connect(socketAddress);
ByteBuffer byteBuffer=ByteBuffer.allocate(128);

//執(zhí)行其他操作
//……

//等待連接完成
connected.get();  
//讀取數(shù)據(jù)
Future<Integer> future=client.read(byteBuffer);

//執(zhí)行其他操作
//……

//等待從通道讀取數(shù)據(jù)完成
future.get();

byteBuffer.flip();
WritableByteChannel out=Channels.newChannel(System.out);
out.write(byteBuffer);

以下PingClient類演示了異步通道的用法。它不斷接收用戶輸入的域名(即網(wǎng)絡上主機的名字),然后與這個主機上的80端口建立連接,最后打印建立連接所花費的時間。如果程序無法連接到指定的主機,就打印相關(guān)錯誤信息。如果用戶輸入“bye”,就結(jié)束程序。以下是運行PingClient類時用戶輸入的信息以及程序輸出的信息。其中采用非斜體字體的行表示用戶向控制臺輸入的信息,采用斜體字體的行表示程序的輸出結(jié)果:

C:\chapter04\classes>java nonblock.PingClient
www.abc888.com
www.javathinker.net
ping www.abc888.com的結(jié)果 : 連接失敗
ping www.javathinker.net的結(jié)果 : 20ms
bye

從以上打印結(jié)果可以看出,PingClient連接遠程主機www.javathinker.net用了20ms,而連接www.abc888.com主機失敗。從打印結(jié)果還可以看出,PingClient采用異步通信方式,當用戶輸入一個主機名后,不必等到程序輸出對這個主機名的處理結(jié)果,就可以繼續(xù)輸入下一個主機名。對每個主機名的處理結(jié)果要等到連接已經(jīng)成功或者失敗后才打印出來。

/* PingClient.java */
package nonblock;
import java.net.*;
……
class PingResult {  //表示連接一個主機的結(jié)果
  InetSocketAddress address;
  long connectStart;  //開始連接時的時間
  long connectFinish = 0;  //連接成功時的時間
  String failure;
  Future<Void> connectResult;  //連接操作的異步運算結(jié)果
  AsynchronousSocketChannel socketChannel;
  String host;
  final String ERROR="連接失敗";

  PingResult(String host) {
      try {
          this.host=host;
          address =
              new InetSocketAddress(InetAddress.getByName(host),80);
      } catch (IOException x) {
          failure = ERROR;
      }
  }  

  public void print() {  //打印連接一個主機的執(zhí)行結(jié)果
      String result;
      if (connectFinish != 0)
          result = Long.toString(connectFinish - connectStart) + "ms";
      else if (failure != null)
          result = failure;
      else
          result = "Timed out";
      System.out.println("ping "+ host+"的結(jié)果" + " : " + result);
  }
}

public class PingClient{
  //存放所有PingResult結(jié)果的隊列
  private LinkedList<PingResult> pingResults=
               new LinkedList<PingResult>();
  boolean shutdown=false;
  ExecutorService executorService;

  public PingClient()throws IOException{
    executorService= Executors.newFixedThreadPool(4);
    executorService.execute(new Printer());
    receivePingAddress();
  }

  public static void main(String args[])throws IOException{
    new PingClient();
  }

  /** 接收用戶輸入的主機地址,由線程池執(zhí)行PingHandler任務 */  
  public void receivePingAddress(){
    try{
      BufferedReader localReader=new BufferedReader(
                    new InputStreamReader(System.in));
      String msg=null;
      //接收用戶輸入的主機地址
      while((msg=localReader.readLine())!=null){
        if(msg.equals("bye")){
          shutdown=true;
          executorService.shutdown();
          break;
        }
        executorService.execute(new PingHandler(msg));
      }
    }catch(IOException e){ }
  }

  /** 嘗試連接特定主機,并且把運算結(jié)果加入到PingResults結(jié)果隊列中 */
  public void addPingResult(PingResult pingResult) {
     AsynchronousSocketChannel socketChannel = null;
     try {
       socketChannel = AsynchronousSocketChannel.open();

       pingResult.socketChannel=socketChannel;
       pingResult.connectStart = System.currentTimeMillis();

       synchronized (pingResults) {
         //向pingResults隊列中加入一個PingResult對象
         pingResults.add(pingResult);
         pingResults.notify();
       }

       Future<Void> connectResult=
           socketChannel.connect(pingResult.address);
       pingResult.connectResult = connectResult;
    }catch (Exception x) {
      if (socketChannel != null) {
        try {socketChannel.close();} catch (IOException e) {}
      }
      pingResult.failure = pingResult.ERROR;
    }
  }

  /** 打印PingResults結(jié)果隊列中已經(jīng)執(zhí)行完畢的任務的結(jié)果 */
  public void printPingResults() {
    PingResult pingResult = null;
    while(!shutdown ){
      synchronized (pingResults) {
        while (!shutdown && pingResults.size() == 0 ){
          try{
            pingResults.wait(100);
          }catch(InterruptedException e){e.printStackTrace();}
        }

        if(shutdown  && pingResults.size() == 0 )break;
        pingResult=pingResults.getFirst();

        try{
          if(pingResult.connectResult!=null)
            pingResult.connectResult.get(500,TimeUnit.MILLISECONDS);
        }catch(Exception e){
            pingResult.failure= pingResult.ERROR;
        }

        if(pingResult.connectResult!=null
           && pingResult.connectResult.isDone()){

          pingResult.connectFinish = System.currentTimeMillis();
        }

        if(pingResult.connectResult!=null
           && pingResult.connectResult.isDone()
           || pingResult.failure!=null){

           pingResult.print();
           pingResults.removeFirst();
           try {
              pingResult.socketChannel.close();
            } catch (IOException e) { }
         }
      }
    }
  }

  /** 嘗試連接特定主機,生成一個PingResult對象,
     把它加入到PingResults結(jié)果隊列中 */
  public class PingHandler implements Runnable{
    String msg;
    public PingHandler(String msg){
        this.msg=msg;  
    }
    public void run(){
        if(!msg.equals("bye")){
          PingResult pingResult=new PingResult(msg);
          addPingResult(pingResult);
        }
    }
  }

  /** 打印PingResults結(jié)果隊列中已經(jīng)執(zhí)行完畢的任務的結(jié)果 */
  public class Printer implements Runnable{
    public void run(){
        printPingResults();
    }
  }
}

以上PingResult類表示連接一個主機的執(zhí)行結(jié)果。PingClient類的PingResults隊列存放所有的PingResult對象。
PingClient類還定義了兩個表示特定任務的內(nèi)部類:

  • PingHandler任務類:負責通過異步通道去嘗試連接客戶端輸入的主機地址,并且創(chuàng)建一個PingResult對象,它包含了連接操作的異步運算結(jié)果。再把PingResult對象加入到PingResults結(jié)果隊列中。
  • Printer任務類:負責打印PingResults結(jié)果隊列中已經(jīng)執(zhí)行完畢的任務結(jié)果。打印完畢的PingResult對象會從PingResults隊列中刪除。

PingClient類的main主線程完成以下操作:

  • 創(chuàng)建線程池。
  • 向線程池提交Printer任務。
  • 不斷讀取客戶端輸入的主機地址,向線程池提交PingHandler任務。如果客戶端輸入“bye”,就結(jié)束程序。

PingClient類的線程池完成以下操作:

  • 執(zhí)行Printer任務。
  • 執(zhí)行PingHander任務。

分享標題:異步通道和異步運算結(jié)果
文章路徑:http://muchs.cn/article6/gppjog.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設、網(wǎng)頁設計公司、電子商務、標簽優(yōu)化、企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設計公司