Netty需要的運行環(huán)境很簡單,只有2個。
JDK 1.8+Apache Maven 3.3.9+二、Netty 客戶端/服務器概覽如圖,展示了一個我們將要編寫的 Echo 客戶端和服務器應用程序。該圖展示是多個客戶端同時連接到一臺服務器。所能夠支持的客戶端數(shù)量,在理論上,僅受限于系統(tǒng)的可用資源(以及所使用的 JDK 版本可能會施加的限制)。
(資料圖)
Echo 客戶端和服務器之間的交互是非常簡單的;在客戶端建立一個連接之后,它會向服務器發(fā)送一個或多個消息,反過來服務器又會將每個消息回送給客戶端。雖然它本身看起來好像用處不大,但它充分地體現(xiàn)了客戶端/服務器系統(tǒng)中典型的請求-響應交互模式。
三、編寫 Echo 服務器所有的 Netty 服務器都需要以下兩部分。
至少一個 ChannelHandler—該組件實現(xiàn)了服務器對從客戶端接收的數(shù)據(jù)的處理,即它的業(yè)務邏輯。引導—這是配置服務器的啟動代碼。至少,它會將服務器綁定到它要監(jiān)聽連接請求的端口上。3.1 ChannelHandler 和業(yè)務邏輯上一篇博文我們介紹了 Future 和回調(diào),并且闡述了它們在事件驅(qū)動設(shè)計中的應用。我們還討論了 ChannelHandler,它是一個接口族的父接口,它的實現(xiàn)負責接收并響應事件通知。
在 Netty 應用程序中,所有的數(shù)據(jù)處理邏輯都包含在這些核心抽象的實現(xiàn)中。因為你的 Echo 服務器會響應傳入的消息,所以它需要實現(xiàn)ChannelInboundHandler 接口,用來定義響應入站事件的方法。簡單的應用程序只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的默認實現(xiàn)。
我們將要用到的方法是:
channelRead() :對于每個傳入的消息都要調(diào)用;channelReadComplete() : 通知ChannelInboundHandler最后一次對channelRead()的調(diào)用是當前批量讀取中的最后一條消息;exceptionCaught() :在讀取操作期間,有異常拋出時會調(diào)用。該 Echo 服務器的 ChannelHandler 實現(xiàn)是 EchoServerHandler,如代碼:
package com.example.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * @author lhd * @date 2023/05/16 15:05 * @notes Netty Echo服務端簡單邏輯 *///表示channel可以并多個實例共享,它是線程安全的@ChannelHandler.Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; //將消息打印到控制臺 System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); //將收到的消息寫給發(fā)送者,而不沖刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //將未決消息沖刷到遠程節(jié)點,并且關(guān)閉該 Channe ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //打印異常堆棧跟蹤 cause.printStackTrace(); //關(guān)閉該channel ctx.close(); }}
ChannelInboundHandlerAdapter 有一個直觀的 API,并且它的每個方法都可以被重寫以掛鉤到事件生命周期的恰當點上。
因為需要處理所有接收到的數(shù)據(jù),所以我們重寫了 channelRead()方法。在這個服務器應用程序中,我們將數(shù)據(jù)簡單地回送給了遠程節(jié)點。
重寫 exceptionCaught()方法允許我們對 Throwable 的任何子類型做出反應,在這里你記錄了異常并關(guān)閉了連接。
雖然一個更加完善的應用程序也許會嘗試從異常中恢復,但在這個場景下,只是通過簡單地關(guān)閉連接來通知遠程節(jié)點發(fā)生了錯誤。
ps:如果不捕獲異常,會發(fā)生什么呢?
每個 Channel 都擁有一個與之相關(guān)聯(lián)的 ChannelPipeline,其持有一個 ChannelHandler 的實例鏈。在默認的情況下,ChannelHandler 會把對它的方法的調(diào)用轉(zhuǎn)發(fā)給鏈中的下一個 ChannelHandler。因此,如果 exceptionCaught()方法沒有被該鏈中的某處實現(xiàn),那么所接收的異常將會被傳遞到 ChannelPipeline 的尾端并被記錄。為此,你的應用程序應該提供至少有一個實現(xiàn)exceptionCaught()方法的 ChannelHandler。
除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子類型和實現(xiàn)。這些之后會一一說明,目前,我們只關(guān)注:
針對不同類型的事件來調(diào)用 ChannelHandler;應用程序通過實現(xiàn)或者擴展 ChannelHandler 來掛鉤到事件的生命周期,并且提供自定義的應用程序邏輯;在架構(gòu)上,ChannelHandler 有助于保持業(yè)務邏輯與網(wǎng)絡處理代碼的分離。這簡化了開發(fā)過程,因為代碼必須不斷地演化以響應不斷變化的需求。3.2 引導服務器下面我們準備開始構(gòu)建服務器。構(gòu)建服務器涉及到兩個內(nèi)容:
綁定到服務器將在其上監(jiān)聽并接受傳入連接請求的端口;配置 Channel,以將有關(guān)的入站消息通知給 EchoServerHandler 實例。package com.example.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import java.net.InetSocketAddress;/** * @author lhd * @date 2023/05/16 15:21 * @notes Netty引導服務器 */public class EchoServer { public static void main(String[] args) throws Exception { //調(diào)用服務器的 start()方法 new EchoServer().start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); //創(chuàng)建EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { //創(chuàng)建ServerBootstra ServerBootstrap b = new ServerBootstrap(); //指定服務器監(jiān)視端口 int port = 8080; b.group(group) //指定所使用的 NIO 傳輸 Channel //因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連接, // 并且將 Channel 的類型指定為 NioServerSocketChannel 。 .channel(NioServerSocketChannel.class) //使用指定的端口設(shè)置套接字地址 //將本地地址設(shè)置為一個具有選定端口的 InetSocketAddress 。服務器將綁定到這個地址以監(jiān)聽新的連接請求 .localAddress(new InetSocketAddress(port)) //添加一個EchoServerHandler 到子Channel的 ChannelPipeline //這里使用了一個特殊的類——ChannelInitializer。這是關(guān)鍵。 // 當一個新的連接被接受時,一個新的子 Channel 將會被創(chuàng)建,而 ChannelInitializer 將會把一個你的 //EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的, // 這個 ChannelHandler 將會收到有關(guān)入站消息的通知。 .childHandler(new ChannelInitializer(){ @Override public void initChannel(SocketChannel ch) throws Exception { //EchoServerHandler 被標注為 @Shareable,所以我們可以總是使用同樣的實例 //實際上所有客戶端都是使用的同一個EchoServerHandler ch.pipeline().addLast(serverHandler); } }); //異步地綁定服務器,調(diào)用 sync()方法阻塞等待直到綁定完成 //sync()方法的調(diào)用將導致當前 Thread阻塞,一直到綁定操作完成為止 ChannelFuture f = b.bind().sync(); //獲取 Channel 的CloseFuture,并且阻塞當前線 //該應用程序?qū)枞却钡椒掌鞯?Channel關(guān)閉(因為你在 Channel 的 CloseFuture 上調(diào)用了 sync()方法) f.channel().closeFuture().sync(); } finally { //關(guān)閉 EventLoopGroup,釋放所有的資源,包括所有被創(chuàng)建的線程 group.shutdownGracefully().sync(); } }}
我們總結(jié)一下服務器實現(xiàn)中的重要步驟。下面這些是服務器的主要代碼組件:
EchoServerHandler 實現(xiàn)了業(yè)務邏輯;main()方法引導了服務器;引導過程中所需要的步驟如下:創(chuàng)建一個 ServerBootstrap 的實例以引導和綁定服務器;創(chuàng)建并分配一個 NioEventLoopGroup 實例以進行事件的處理,如接受新連接以及讀/寫數(shù)據(jù);指定服務器綁定的本地的 InetSocketAddress;使用一個 EchoServerHandler 的實例初始化每一個新的 Channel;調(diào)用 ServerBootstrap.bind()方法以綁定服務器。到此我們的引導服務器已經(jīng)完成。
四、編寫 Echo 客戶端Echo 客戶端將會:(1)連接到服務器;(2)發(fā)送一個或者多個消息;(3)對于每個消息,等待并接收從服務器發(fā)回的相同的消息;(4)關(guān)閉連接。編寫客戶端所涉及的兩個主要代碼部分也是業(yè)務邏輯和引導,和你在服務器中看到的一樣。
4.1 通過 ChannelHandler 實現(xiàn)客戶端邏輯如同服務器,客戶端將擁有一個用來處理數(shù)據(jù)的 ChannelInboundHandler。在這個場景下,我們將擴展 SimpleChannelInboundHandler 類以處理所有必須的任務。這要求重寫下面的方法:
channelActive() : 在到服務器的連接已經(jīng)建立之后將被調(diào)用;channelRead0() : 當從服務器接收到一條消息時被調(diào)用;exceptionCaught() :在處理過程中引發(fā)異常時被調(diào)用。具體代碼可以參考如下:
package com.example.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;/** * @author lhd * @date 2023/05/16 15:45 * @notes Netty 簡單的客戶端邏輯 *///標記該類的實例可以被多個 Channel 共享@ChannelHandler.Sharablepublic class EchoClientHandler extends SimpleChannelInboundHandler { //當被通知 Channel是活躍的時候,發(fā)送一條消息 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //記錄已接收消息的轉(zhuǎn)儲 @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); } //在發(fā)生異常時,記錄錯誤并關(guān)閉Channel @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
首先,我們重寫了 channelActive()方法,其將在一個連接建立時被調(diào)用。這確保了數(shù)據(jù)將會被盡可能快地寫入服務器,其在這個場景下是一個編碼了字符串"Netty rocks!"的字節(jié)緩沖區(qū)。
接下來,我們重寫了 channelRead0()方法。每當接收數(shù)據(jù)時,都會調(diào)用這個方法。由服務器發(fā)送的消息可能會被分塊接收。也就是說,如果服務器發(fā)送了 5 字節(jié),那么不能保證這 5 字節(jié)會被一次性接收。即使是對于這么少量的數(shù)據(jù),channelRead0()方法也可能會被調(diào)用兩次,第一次使用一個持有 3 字節(jié)的 ByteBuf(Netty 的字節(jié)容器),第二次使用一個持有 2 字節(jié)的 ByteBuf。作為一個面向流的協(xié)議,TCP 保證了字節(jié)數(shù)組將會按照服務器發(fā)送它們的順序被接收。
ps:所以channelRead0()的調(diào)用次數(shù)不一定等于服務器發(fā)布消息的次數(shù)
重寫的第三個方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的代碼示例)中所示,記錄 Throwable,關(guān)閉 Channel,在這個場景下,終止到服務器的連接。
ps:為什么客戶端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?
在客戶端,當 channelRead0()方法完成時,我們已經(jīng)有了傳入消息,并且已經(jīng)處理完它了。當該方法返回時,SimpleChannelInboundHandler 負責釋放指向保存該消息的 ByteBuf 的內(nèi)存引用。
在 EchoServerHandler 中,我們?nèi)匀恍枰獙魅胂⒒厮徒o發(fā)送者,而 write()操作是異步的,直到 channelRead()方法返回后可能仍然沒有完成。為此,EchoServerHandler擴展了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被調(diào)用時被釋放。
4.2 引導客戶端引導客戶端類似于引導服務器,不同的是,客戶端是使用主機和端口參數(shù)來連接遠程地址,也就是這里的 Echo 服務器的地址,而不是綁定到一個一直被監(jiān)聽的端口。
package com.example.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import java.net.InetSocketAddress;/** * @author lhd * @date 2023/05/16 15:59 * @notes 引導客戶端 */public class EchoClient { public void start() throws Exception { //指定 EventLoopGroup 以處理客戶端事件;需要適用于 NIO 的實現(xiàn) EventLoopGroup group = new NioEventLoopGroup(); try { //創(chuàng)建 Bootstrap Bootstrap b = new Bootstrap(); b.group(group) //適用于 NIO 傳輸?shù)?Channel 類型 .channel(NioSocketChannel.class) //設(shè)置服務器的InetSocketAddress .remoteAddress(new InetSocketAddress("127.0.0.1", 8080)) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { //在創(chuàng)建Channel時,向 ChannelPipeline中添加一個 EchoClientHandler 實例 ch.pipeline().addLast(new EchoClientHandler());} }); //連接到遠程節(jié)點,阻塞等待直到連接完成 ChannelFuture f = b.connect().sync(); //阻塞,直到Channel 關(guān)閉 f.channel().closeFuture().sync(); } finally { //關(guān)閉線程池并且釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient().start(); }}
總結(jié)一下要點:
為初始化客戶端,創(chuàng)建了一個 Bootstrap 實例;為進行事件處理分配了一個 NioEventLoopGroup 實例,其中事件處理包括創(chuàng)建新的連接以及處理入站和出站數(shù)據(jù);為服務器連接創(chuàng)建了一個 InetSocketAddress 實例;當連接被建立時,一個 EchoClientHandler 實例會被安裝到(該 Channel 的)ChannelPipeline 中;在一切都設(shè)置完成后,調(diào)用 Bootstrap.connect()方法連接到遠程節(jié)點;綜上客戶端的構(gòu)建已經(jīng)完成。
五、構(gòu)建和運行 Echo 服務器和客戶端將我們上面的代碼復制到IDEA中運行,先啟動服務端在啟動客戶端會得到以下預期效果:
服務端控制臺打印:客戶端控制臺打印:我們關(guān)閉服務端后,客戶端控制臺打印:因為服務端關(guān)閉,觸發(fā)了客戶端 EchoClientHandler 中的exceptionCaught()方法,打印出了異常堆棧并關(guān)閉了連接。
這只是一個簡單的應用程序,但是它可以伸縮到支持數(shù)千個并發(fā)連接——每秒可以比普通的基于套接字的 Java 應用程序處理多得多的消息。
推薦閱讀
關(guān)于我們| 聯(lián)系方式| 版權(quán)聲明| 供稿服務| 友情鏈接
咕嚕網(wǎng) www.fyuntv.cn 版權(quán)所有,未經(jīng)書面授權(quán)禁止使用
Copyright©2008-2023 By All Rights Reserved 皖I(lǐng)CP備2022009963號-10
聯(lián)系我們: 39 60 29 14 2@qq.com