宜人貸蜂巢API網(wǎng)關(guān)技術(shù)解密之Netty使用實(shí)踐

2021-02-08    分類: 網(wǎng)站建設(shè)

宜人貸蜂巢團(tuán)隊(duì),由Michael創(chuàng)立于2013年,通過使用互聯(lián)網(wǎng)科技手段助力金融生態(tài)和諧健康發(fā)展。自成立起一直致力于多維度數(shù)據(jù)閉環(huán)平臺(tái)建設(shè)。目前團(tuán)隊(duì)規(guī)模超過百人,涵蓋征信、

圖1 - API網(wǎng)關(guān)項(xiàng)目框架

圖中描繪了API網(wǎng)關(guān)系統(tǒng)的處理流程,以及與服務(wù)注冊(cè)發(fā)現(xiàn)、日志分析、報(bào)警系統(tǒng)、各類爬蟲的關(guān)系。其中API網(wǎng)關(guān)系統(tǒng)接收請(qǐng)求,對(duì)請(qǐng)求進(jìn)行編解碼、鑒權(quán)、限流、加解密,再基于Eureka服務(wù)注冊(cè)發(fā)現(xiàn)模塊,將請(qǐng)求發(fā)送到有效的服務(wù)節(jié)點(diǎn)上;網(wǎng)關(guān)及抓取系統(tǒng)的日志,會(huì)被收集到elk平臺(tái)中,做業(yè)務(wù)分析及報(bào)警處理。

二、BIO vs NIO

API網(wǎng)關(guān)承載數(shù)倍于爬蟲的流量,提升服務(wù)器的并發(fā)處理能力、縮短系統(tǒng)的響應(yīng)時(shí)間,通信模型的選擇是至關(guān)重要的,是選擇BIO,還是NIO?

1. Streamvs Buffer & 阻塞 vs 非阻塞

BIO是面向流的,io的讀寫,每次只能處理一個(gè)或者多個(gè)bytes,如果數(shù)據(jù)沒有讀寫完成,線程將一直等待于此,而不能暫時(shí)跳過io或者等待io讀寫完成異步通知,線程滯留在io讀寫上,不能充分利用機(jī)器有限的線程資源,造成server的吞吐量較低,見圖2。而NIO與此不同,面向Buffer,線程不需要滯留在io讀寫上,采用操作系統(tǒng)的epoll模式,在io數(shù)據(jù)準(zhǔn)備好了,才由線程來處理,見圖3。

NioEvenrLoopGroup的創(chuàng)建,具體執(zhí)行過程是執(zhí)行類MultithreadEventExecutorGroup的構(gòu)造方法:

  1. /**  
  2.  * Create a new instance.  
  3.  *  
  4.  * @param nThreads          the number of threads that will be used by this instance.  
  5.  * @param executor          the Executor to use, or {@code null} if the default should be used.  
  6.  * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.  
  7.  * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call  
  8.  */  
  9. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,  
  10.                                         EventExecutorChooserFactory chooserFactory, Object... args) {  
  11.     if (nThreads <= 0) {  
  12.         throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));  
  13.     }  
  14.     if (executor == null) {  
  15.         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());  
  16.     }  
  17.     children = new EventExecutor[nThreads];  
  18.     for (int i = 0; i < nThreads; i ++) {  
  19.         boolean success = false;  
  20.         try {  
  21.             children[i] = newChild(executor, args);  
  22.             success = true;  
  23.         } catch (Exception e) {   
  24.             throw new IllegalStateException("failed to create a child event loop", e);  
  25.         } finally {  
  26.             if (!success) {  
  27.                 for (int j = 0; j < i; j ++) {  
  28.                     children[j].shutdownGracefully();  
  29.                 }  
  30.                 for (int j = 0; j < i; j ++) { 
  31.                      EventExecutor e = children[j]; 
  32.                      try { 
  33.                          while (!e.isTerminated()) {  
  34.                             e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);  
  35.                         }  
  36.                     } catch (InterruptedException interrupted) {  
  37.                         // Let the caller handle the interruption.  
  38.                         Thread.currentThread().interrupt();  
  39.                         break;  
  40.                     }  
  41.                 }  
  42.             }  
  43.         }  
  44.     }  
  45.     chooser = chooserFactory.newChooser(children);  
  46.     final FutureListener terminationListener = new FutureListener() {  
  47.         @Override  
  48.         public void operationComplete(Future future) throws Exception {  
  49.             if (terminatedChildren.incrementAndGet() == children.length) {  
  50.                 terminationFuture.setSuccess(null);  
  51.             }  
  52.         }  
  53.     };  
  54.     for (EventExecutor e: children) {  
  55.         e.terminationFuture().addListener(terminationListener);  
  56.     }  
  57.     Set childrenSet = new LinkedHashSet(children.length);  
  58.     Collections.addAll(childrenSet, children);  
  59.     readonlyChildren = Collections.unmodifiableSet(childrenSet);  
  60. 其中,創(chuàng)建細(xì)節(jié)見下:

    • 線程池中的線程數(shù)nThreads必須大于0;
    • 如果executor為null,創(chuàng)建默認(rèn)executor,executor用于創(chuàng)建線程(newChild方法使用executor對(duì)象);
    • 依次創(chuàng)建線程池中的每一個(gè)線程即NioEventLoop,如果其中有一個(gè)創(chuàng)建失敗,將關(guān)閉之前創(chuàng)建的所有線程;
    • chooser為線程池選擇器,用來選擇下一個(gè)EventExecutor,可以理解為,用來選擇一個(gè)線程來執(zhí)行task。

    chooser的創(chuàng)建細(xì)節(jié),見下:

    DefaultEventExecutorChooserFactory根據(jù)線程數(shù)創(chuàng)建具體的EventExecutorChooser,線程數(shù)如果等于2^n,可使用按位與替代取模運(yùn)算,節(jié)省cpu的計(jì)算資源,見源碼:

    1. @SuppressWarnings("unchecked")  
    2. @Override  
    3. public EventExecutorChooser newChooser(EventExecutor[] executors) {  
    4.     if (isPowerOfTwo(executors.length)) {  
    5.         return new PowerOfTowEventExecutorChooser(executors);  
    6.     } else {  
    7.         return new GenericEventExecutorChooser(executors);  
    8.     }  
    9. }   
    10.     private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {  
    11.         private final AtomicInteger idx = new AtomicInteger();  
    12.         private final EventExecutor[] executors;   
    13.  
    14.         PowerOfTowEventExecutorChooser(EventExecutor[] executors) {  
    15.             this.executors = executors;  
    16.         }   
    17.  
    18.         @Override  
    19.         public EventExecutor next() {  
    20.             return executors[idx.getAndIncrement() & executors.length - 1];  
    21.         }  
    22.     }   
    23.  
    24.     private static final class GenericEventExecutorChooser implements EventExecutorChooser {  
    25.         private final AtomicInteger idx = new AtomicInteger();  
    26.         private final EventExecutor[] executors;   
    27.  
    28.         GenericEventExecutorChooser(EventExecutor[] executors) {  
    29.             this.executors = executors;  
    30.         }   
    31.  
    32.         @Override  
    33.         public EventExecutor next() {  
    34.             return executors[Math.abs(idx.getAndIncrement() % executors.length)];  
    35.         }  
    36.     } 

    newChild(executor, args)的創(chuàng)建細(xì)節(jié),見下:

    MultithreadEventExecutorGroup的newChild方法是一個(gè)抽象方法,故使用NioEventLoopGroup的newChild方法,即調(diào)用NioEventLoop的構(gòu)造函數(shù):

    1. @Override  
    2.     protected EventLoop newChild(Executor executor, Object... args) throws Exception {  
    3.         return new NioEventLoop(this, executor, (SelectorProvider) args[0], 
    4.             ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);  
    5.     } 

    在這里先看下NioEventLoop的類層次關(guān)系:

    創(chuàng)建任務(wù)隊(duì)列tailTasks(內(nèi)部為有界的LinkedBlockingQueue):

    創(chuàng)建線程的任務(wù)隊(duì)列taskQueue(內(nèi)部為有界的LinkedBlockingQueue),以及任務(wù)過多防止系統(tǒng)宕機(jī)的拒絕策略rejectedHandler。

    其中tailTasks和taskQueue均是任務(wù)隊(duì)列,而優(yōu)先級(jí)不同,taskQueue的優(yōu)先級(jí)高于tailTasks,定時(shí)任務(wù)的優(yōu)先級(jí)高于taskQueue。

    五、ServerBootstrap初始化及啟動(dòng)

    了解了Netty線程池NioEvenrLoopGroup的創(chuàng)建過程后,下面看下API網(wǎng)關(guān)服務(wù)ServerBootstrap的是如何使用線程池引入服務(wù)中,為高并發(fā)訪問服務(wù)的。

    API網(wǎng)關(guān)ServerBootstrap初始化及啟動(dòng)代碼,見下:

    1. serverBootstrap = new ServerBootstrap();  
    2. bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());  
    3. workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());   
    4.  
    5. serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  
    6.         .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())  
    7.         .option(ChannelOption.SO_BACKLOG, config.getBacklogSize())  
    8.         .option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())  
    9.         // Memory pooled  
    10.         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
    11.         .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
    12.         .childHandler(channelInitializer);    
    13.  
    14. ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();  
    15. log.info("API-gateway started on port: {}", config.getPort());  
    16. future.channel().closeFuture().sync(); 

    API網(wǎng)關(guān)系統(tǒng)使用netty自帶的線程池,共有三組線程池,分別為bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暫不作介紹)。其中,bossGroup用于接收客戶端的TCP連接,workerGroup用于處理I/O、執(zhí)行系統(tǒng)task和定時(shí)任務(wù),executorGroup用于處理網(wǎng)關(guān)業(yè)務(wù)加解密、限流、路由,及將請(qǐng)求轉(zhuǎn)發(fā)給后端的抓取服務(wù)等業(yè)務(wù)操作。

    六、Channel與線程池的綁定

    ServerBootstrap初始化后,通過調(diào)用bind(port)方法啟動(dòng)Server,bind的調(diào)用鏈如下:

    1. AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister 

    其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化過程指定channel為NioServerSocketChannel.class,至此將NioServerSocketChannel與bossGroup綁定到一起,bossGroup負(fù)責(zé)客戶端連接的建立。那么NioSocketChannel是如何與workerGroup綁定到一起的?

    調(diào)用鏈AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead:

    1. public void channelRead(ChannelHandlerContext ctx, Object msg) {  
    2.     final Channel child = (Channel) msg;  
    3.     child.pipeline().addLast(childHandler);  
    4.     for (Entry, Object> e: childOptions) {  
    5.         try {  
    6.             if (!child.config().setOption((ChannelOption) e.getKey(), e.getValue())) {  
    7.                 logger.warn("Unknown channel option: " + e);  
    8.             }  
    9.         } catch (Throwable t) {  
    10.             logger.warn("Failed to set a channel option: " + child, t); 
    11.         }  
    12.     }  
    13.     for (Entry, Object> e: childAttrs) {  
    14.         child.attr((AttributeKey) e.getKey()).set(e.getValue());  
    15.     } 
    16.  
    17.     try {  
    18.         childGroup.register(child).addListener(new ChannelFutureListener() {  
    19.             @Override  
    20.             public void operationComplete(ChannelFuture future) throws Exception {  
    21.                 if (!future.isSuccess()) { 
    22.                      forceClose(child, future.cause());  
    23.                 }  
    24.             }  
    25.         });  
    26.     } catch (Throwable t) {  
    27.         forceClose(child, t);  
    28.     }  
    29. 其中,childGroup.register(child)就是將NioSocketChannel與workderGroup綁定到一起,那又是什么觸發(fā)了ServerBootstrapAcceptor的channelRead方法?

      其實(shí)當(dāng)一個(gè) client 連接到 server 時(shí),Java 底層的 NIO ServerSocketChannel 會(huì)有一個(gè)SelectionKey.OP_ACCEPT 就緒事件,接著就會(huì)調(diào)用到 NioServerSocketChannel.doReadMessages方法。

      1. @Override  
      2. protected int doReadMessages(List buf) throws Exception {  
      3.     SocketChannel ch = javaChannel().accept();  
      4.     try {  
      5.         if (ch != null) {  
      6.             buf.add(new NioSocketChannel(this, ch));  
      7.             return 1;  
      8.         }  
      9.     } catch (Throwable t) {          … 
      10.  
      11.     }  
      12.     return 0;  
      13. javaChannel().accept() 會(huì)獲取到客戶端新連接的SocketChannel,實(shí)例化為一個(gè) NioSocketChannel, 并且傳入 NioServerSocketChannel 對(duì)象(即 this),由此可知, 我們創(chuàng)建的這個(gè)NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實(shí)例 。

        接下來就經(jīng)由 Netty 的 ChannelPipeline 機(jī)制,將讀取事件逐級(jí)發(fā)送到各個(gè) handler 中,于是就會(huì)觸發(fā)前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦。

        至此,分析了Netty線程池的初始化、ServerBootstrap的啟動(dòng)及channel與線程池的綁定過程,能夠看出Netty中線程池的優(yōu)雅設(shè)計(jì),使用不同的線程池負(fù)責(zé)連接的建立、IO讀寫等,為API網(wǎng)關(guān)項(xiàng)目的高并發(fā)訪問提供了技術(shù)基礎(chǔ)。

        七、總結(jié)

        文章題目:宜人貸蜂巢API網(wǎng)關(guān)技術(shù)解密之Netty使用實(shí)踐
        文章起源:http://www.muchs.cn/news/99846.html

        成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站網(wǎng)站改版、網(wǎng)站導(dǎo)航、微信小程序、云服務(wù)器、企業(yè)網(wǎng)站制作

        廣告

        聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)