Netty需要的運(yùn)行環(huán)境很簡(jiǎn)單,只有2個(gè)。
JDK 1.8+Apache Maven 3.3.9+二、Netty 客戶端/服務(wù)器概覽如圖,展示了一個(gè)我們將要編寫(xiě)的 Echo 客戶端和服務(wù)器應(yīng)用程序。該圖展示是多個(gè)客戶端同時(shí)連接到一臺(tái)服務(wù)器。所能夠支持的客戶端數(shù)量,在理論上,僅受限于系統(tǒng)的可用資源(以及所使用的 JDK 版本可能會(huì)施加的限制)。
(資料圖)
Echo 客戶端和服務(wù)器之間的交互是非常簡(jiǎn)單的;在客戶端建立一個(gè)連接之后,它會(huì)向服務(wù)器發(fā)送一個(gè)或多個(gè)消息,反過(guò)來(lái)服務(wù)器又會(huì)將每個(gè)消息回送給客戶端。雖然它本身看起來(lái)好像用處不大,但它充分地體現(xiàn)了客戶端/服務(wù)器系統(tǒng)中典型的請(qǐng)求-響應(yīng)交互模式。
三、編寫(xiě) Echo 服務(wù)器所有的 Netty 服務(wù)器都需要以下兩部分。
至少一個(gè) ChannelHandler—該組件實(shí)現(xiàn)了服務(wù)器對(duì)從客戶端接收的數(shù)據(jù)的處理,即它的業(yè)務(wù)邏輯。引導(dǎo)—這是配置服務(wù)器的啟動(dòng)代碼。至少,它會(huì)將服務(wù)器綁定到它要監(jiān)聽(tīng)連接請(qǐng)求的端口上。3.1 ChannelHandler 和業(yè)務(wù)邏輯上一篇博文我們介紹了 Future 和回調(diào),并且闡述了它們?cè)谑录?qū)動(dòng)設(shè)計(jì)中的應(yīng)用。我們還討論了 ChannelHandler,它是一個(gè)接口族的父接口,它的實(shí)現(xiàn)負(fù)責(zé)接收并響應(yīng)事件通知。
在 Netty 應(yīng)用程序中,所有的數(shù)據(jù)處理邏輯都包含在這些核心抽象的實(shí)現(xiàn)中。因?yàn)槟愕?Echo 服務(wù)器會(huì)響應(yīng)傳入的消息,所以它需要實(shí)現(xiàn)ChannelInboundHandler 接口,用來(lái)定義響應(yīng)入站事件的方法。簡(jiǎn)單的應(yīng)用程序只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的默認(rèn)實(shí)現(xiàn)。
我們將要用到的方法是:
channelRead() :對(duì)于每個(gè)傳入的消息都要調(diào)用;channelReadComplete() : 通知ChannelInboundHandler最后一次對(duì)channelRead()的調(diào)用是當(dāng)前批量讀取中的最后一條消息;exceptionCaught() :在讀取操作期間,有異常拋出時(shí)會(huì)調(diào)用。該 Echo 服務(wù)器的 ChannelHandler 實(shí)現(xiàn)是 EchoServerHandler,如代碼:
package com.example.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * @author lhd * @date 2023/05/16 15:05 * @notes Netty Echo服務(wù)端簡(jiǎn)單邏輯 *///表示channel可以并多個(gè)實(shí)例共享,它是線程安全的@ChannelHandler.Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; //將消息打印到控制臺(tái) System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); //將收到的消息寫(xiě)給發(fā)送者,而不沖刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //將未決消息沖刷到遠(yuǎn)程節(jié)點(diǎn),并且關(guān)閉該 Channe ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //打印異常堆棧跟蹤 cause.printStackTrace(); //關(guān)閉該channel ctx.close(); }}
ChannelInboundHandlerAdapter 有一個(gè)直觀的 API,并且它的每個(gè)方法都可以被重寫(xiě)以掛鉤到事件生命周期的恰當(dāng)點(diǎn)上。
因?yàn)樾枰幚硭薪邮盏降臄?shù)據(jù),所以我們重寫(xiě)了 channelRead()方法。在這個(gè)服務(wù)器應(yīng)用程序中,我們將數(shù)據(jù)簡(jiǎn)單地回送給了遠(yuǎn)程節(jié)點(diǎn)。
重寫(xiě) exceptionCaught()方法允許我們對(duì) Throwable 的任何子類型做出反應(yīng),在這里你記錄了異常并關(guān)閉了連接。
雖然一個(gè)更加完善的應(yīng)用程序也許會(huì)嘗試從異常中恢復(fù),但在這個(gè)場(chǎng)景下,只是通過(guò)簡(jiǎn)單地關(guān)閉連接來(lái)通知遠(yuǎn)程節(jié)點(diǎn)發(fā)生了錯(cuò)誤。
ps:如果不捕獲異常,會(huì)發(fā)生什么呢?
每個(gè) Channel 都擁有一個(gè)與之相關(guān)聯(lián)的 ChannelPipeline,其持有一個(gè) ChannelHandler 的實(shí)例鏈。在默認(rèn)的情況下,ChannelHandler 會(huì)把對(duì)它的方法的調(diào)用轉(zhuǎn)發(fā)給鏈中的下一個(gè) ChannelHandler。因此,如果 exceptionCaught()方法沒(méi)有被該鏈中的某處實(shí)現(xiàn),那么所接收的異常將會(huì)被傳遞到 ChannelPipeline 的尾端并被記錄。為此,你的應(yīng)用程序應(yīng)該提供至少有一個(gè)實(shí)現(xiàn)exceptionCaught()方法的 ChannelHandler。
除了 ChannelInboundHandlerAdapter 之外,還有很多需要學(xué)習(xí)ChannelHandler的子類型和實(shí)現(xiàn)。這些之后會(huì)一一說(shuō)明,目前,我們只關(guān)注:
針對(duì)不同類型的事件來(lái)調(diào)用 ChannelHandler;應(yīng)用程序通過(guò)實(shí)現(xiàn)或者擴(kuò)展 ChannelHandler 來(lái)掛鉤到事件的生命周期,并且提供自定義的應(yīng)用程序邏輯;在架構(gòu)上,ChannelHandler 有助于保持業(yè)務(wù)邏輯與網(wǎng)絡(luò)處理代碼的分離。這簡(jiǎn)化了開(kāi)發(fā)過(guò)程,因?yàn)榇a必須不斷地演化以響應(yīng)不斷變化的需求。3.2 引導(dǎo)服務(wù)器下面我們準(zhǔn)備開(kāi)始構(gòu)建服務(wù)器。構(gòu)建服務(wù)器涉及到兩個(gè)內(nèi)容:
綁定到服務(wù)器將在其上監(jiān)聽(tīng)并接受傳入連接請(qǐng)求的端口;配置 Channel,以將有關(guān)的入站消息通知給 EchoServerHandler 實(shí)例。package com.example.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import java.net.InetSocketAddress;/** * @author lhd * @date 2023/05/16 15:21 * @notes Netty引導(dǎo)服務(wù)器 */public class EchoServer { public static void main(String[] args) throws Exception { //調(diào)用服務(wù)器的 start()方法 new EchoServer().start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); //創(chuàng)建EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { //創(chuàng)建ServerBootstra ServerBootstrap b = new ServerBootstrap(); //指定服務(wù)器監(jiān)視端口 int port = 8080; b.group(group) //指定所使用的 NIO 傳輸 Channel //因?yàn)槲覀冋谑褂玫氖?NIO 傳輸,所以你指定了 NioEventLoopGroup 來(lái)接受和處理新的連接, // 并且將 Channel 的類型指定為 NioServerSocketChannel 。 .channel(NioServerSocketChannel.class) //使用指定的端口設(shè)置套接字地址 //將本地地址設(shè)置為一個(gè)具有選定端口的 InetSocketAddress 。服務(wù)器將綁定到這個(gè)地址以監(jiān)聽(tīng)新的連接請(qǐng)求 .localAddress(new InetSocketAddress(port)) //添加一個(gè)EchoServerHandler 到子Channel的 ChannelPipeline //這里使用了一個(gè)特殊的類——ChannelInitializer。這是關(guān)鍵。 // 當(dāng)一個(gè)新的連接被接受時(shí),一個(gè)新的子 Channel 將會(huì)被創(chuàng)建,而 ChannelInitializer 將會(huì)把一個(gè)你的 //EchoServerHandler 的實(shí)例添加到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的, // 這個(gè) ChannelHandler 將會(huì)收到有關(guān)入站消息的通知。 .childHandler(new ChannelInitializer(){ @Override public void initChannel(SocketChannel ch) throws Exception { //EchoServerHandler 被標(biāo)注為 @Shareable,所以我們可以總是使用同樣的實(shí)例 //實(shí)際上所有客戶端都是使用的同一個(gè)EchoServerHandler ch.pipeline().addLast(serverHandler); } }); //異步地綁定服務(wù)器,調(diào)用 sync()方法阻塞等待直到綁定完成 //sync()方法的調(diào)用將導(dǎo)致當(dāng)前 Thread阻塞,一直到綁定操作完成為止 ChannelFuture f = b.bind().sync(); //獲取 Channel 的CloseFuture,并且阻塞當(dāng)前線 //該應(yīng)用程序?qū)?huì)阻塞等待直到服務(wù)器的 Channel關(guān)閉(因?yàn)槟阍?Channel 的 CloseFuture 上調(diào)用了 sync()方法) f.channel().closeFuture().sync(); } finally { //關(guān)閉 EventLoopGroup,釋放所有的資源,包括所有被創(chuàng)建的線程 group.shutdownGracefully().sync(); } }}
我們總結(jié)一下服務(wù)器實(shí)現(xiàn)中的重要步驟。下面這些是服務(wù)器的主要代碼組件:
EchoServerHandler 實(shí)現(xiàn)了業(yè)務(wù)邏輯;main()方法引導(dǎo)了服務(wù)器;引導(dǎo)過(guò)程中所需要的步驟如下:創(chuàng)建一個(gè) ServerBootstrap 的實(shí)例以引導(dǎo)和綁定服務(wù)器;創(chuàng)建并分配一個(gè) NioEventLoopGroup 實(shí)例以進(jìn)行事件的處理,如接受新連接以及讀/寫(xiě)數(shù)據(jù);指定服務(wù)器綁定的本地的 InetSocketAddress;使用一個(gè) EchoServerHandler 的實(shí)例初始化每一個(gè)新的 Channel;調(diào)用 ServerBootstrap.bind()方法以綁定服務(wù)器。到此我們的引導(dǎo)服務(wù)器已經(jīng)完成。
四、編寫(xiě) Echo 客戶端Echo 客戶端將會(huì):(1)連接到服務(wù)器;(2)發(fā)送一個(gè)或者多個(gè)消息;(3)對(duì)于每個(gè)消息,等待并接收從服務(wù)器發(fā)回的相同的消息;(4)關(guān)閉連接。編寫(xiě)客戶端所涉及的兩個(gè)主要代碼部分也是業(yè)務(wù)邏輯和引導(dǎo),和你在服務(wù)器中看到的一樣。
4.1 通過(guò) ChannelHandler 實(shí)現(xiàn)客戶端邏輯如同服務(wù)器,客戶端將擁有一個(gè)用來(lái)處理數(shù)據(jù)的 ChannelInboundHandler。在這個(gè)場(chǎng)景下,我們將擴(kuò)展 SimpleChannelInboundHandler 類以處理所有必須的任務(wù)。這要求重寫(xiě)下面的方法:
channelActive() : 在到服務(wù)器的連接已經(jīng)建立之后將被調(diào)用;channelRead0() : 當(dāng)從服務(wù)器接收到一條消息時(shí)被調(diào)用;exceptionCaught() :在處理過(guò)程中引發(fā)異常時(shí)被調(diào)用。具體代碼可以參考如下:
package com.example.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;/** * @author lhd * @date 2023/05/16 15:45 * @notes Netty 簡(jiǎn)單的客戶端邏輯 *///標(biāo)記該類的實(shí)例可以被多個(gè) Channel 共享@ChannelHandler.Sharablepublic class EchoClientHandler extends SimpleChannelInboundHandler { //當(dāng)被通知 Channel是活躍的時(shí)候,發(fā)送一條消息 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //記錄已接收消息的轉(zhuǎn)儲(chǔ) @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); } //在發(fā)生異常時(shí),記錄錯(cuò)誤并關(guān)閉Channel @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
首先,我們重寫(xiě)了 channelActive()方法,其將在一個(gè)連接建立時(shí)被調(diào)用。這確保了數(shù)據(jù)將會(huì)被盡可能快地寫(xiě)入服務(wù)器,其在這個(gè)場(chǎng)景下是一個(gè)編碼了字符串"Netty rocks!"的字節(jié)緩沖區(qū)。
接下來(lái),我們重寫(xiě)了 channelRead0()方法。每當(dāng)接收數(shù)據(jù)時(shí),都會(huì)調(diào)用這個(gè)方法。由服務(wù)器發(fā)送的消息可能會(huì)被分塊接收。也就是說(shuō),如果服務(wù)器發(fā)送了 5 字節(jié),那么不能保證這 5 字節(jié)會(huì)被一次性接收。即使是對(duì)于這么少量的數(shù)據(jù),channelRead0()方法也可能會(huì)被調(diào)用兩次,第一次使用一個(gè)持有 3 字節(jié)的 ByteBuf(Netty 的字節(jié)容器),第二次使用一個(gè)持有 2 字節(jié)的 ByteBuf。作為一個(gè)面向流的協(xié)議,TCP 保證了字節(jié)數(shù)組將會(huì)按照服務(wù)器發(fā)送它們的順序被接收。
ps:所以channelRead0()的調(diào)用次數(shù)不一定等于服務(wù)器發(fā)布消息的次數(shù)
重寫(xiě)的第三個(gè)方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的代碼示例)中所示,記錄 Throwable,關(guān)閉 Channel,在這個(gè)場(chǎng)景下,終止到服務(wù)器的連接。
ps:為什么客戶端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?
在客戶端,當(dāng) channelRead0()方法完成時(shí),我們已經(jīng)有了傳入消息,并且已經(jīng)處理完它了。當(dāng)該方法返回時(shí),SimpleChannelInboundHandler 負(fù)責(zé)釋放指向保存該消息的 ByteBuf 的內(nèi)存引用。
在 EchoServerHandler 中,我們?nèi)匀恍枰獙魅胂⒒厮徒o發(fā)送者,而 write()操作是異步的,直到 channelRead()方法返回后可能仍然沒(méi)有完成。為此,EchoServerHandler擴(kuò)展了 ChannelInboundHandlerAdapter,其在這個(gè)時(shí)間點(diǎn)上不會(huì)釋放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,當(dāng) writeAndFlush()方法被調(diào)用時(shí)被釋放。
4.2 引導(dǎo)客戶端引導(dǎo)客戶端類似于引導(dǎo)服務(wù)器,不同的是,客戶端是使用主機(jī)和端口參數(shù)來(lái)連接遠(yuǎn)程地址,也就是這里的 Echo 服務(wù)器的地址,而不是綁定到一個(gè)一直被監(jiān)聽(tīng)的端口。
package com.example.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import java.net.InetSocketAddress;/** * @author lhd * @date 2023/05/16 15:59 * @notes 引導(dǎo)客戶端 */public class EchoClient { public void start() throws Exception { //指定 EventLoopGroup 以處理客戶端事件;需要適用于 NIO 的實(shí)現(xiàn) EventLoopGroup group = new NioEventLoopGroup(); try { //創(chuàng)建 Bootstrap Bootstrap b = new Bootstrap(); b.group(group) //適用于 NIO 傳輸?shù)?Channel 類型 .channel(NioSocketChannel.class) //設(shè)置服務(wù)器的InetSocketAddress .remoteAddress(new InetSocketAddress("127.0.0.1", 8080)) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { //在創(chuàng)建Channel時(shí),向 ChannelPipeline中添加一個(gè) EchoClientHandler 實(shí)例 ch.pipeline().addLast(new EchoClientHandler());} }); //連接到遠(yuǎn)程節(jié)點(diǎn),阻塞等待直到連接完成 ChannelFuture f = b.connect().sync(); //阻塞,直到Channel 關(guān)閉 f.channel().closeFuture().sync(); } finally { //關(guān)閉線程池并且釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient().start(); }}
總結(jié)一下要點(diǎn):
為初始化客戶端,創(chuàng)建了一個(gè) Bootstrap 實(shí)例;為進(jìn)行事件處理分配了一個(gè) NioEventLoopGroup 實(shí)例,其中事件處理包括創(chuàng)建新的連接以及處理入站和出站數(shù)據(jù);為服務(wù)器連接創(chuàng)建了一個(gè) InetSocketAddress 實(shí)例;當(dāng)連接被建立時(shí),一個(gè) EchoClientHandler 實(shí)例會(huì)被安裝到(該 Channel 的)ChannelPipeline 中;在一切都設(shè)置完成后,調(diào)用 Bootstrap.connect()方法連接到遠(yuǎn)程節(jié)點(diǎn);綜上客戶端的構(gòu)建已經(jīng)完成。
五、構(gòu)建和運(yùn)行 Echo 服務(wù)器和客戶端將我們上面的代碼復(fù)制到IDEA中運(yùn)行,先啟動(dòng)服務(wù)端在啟動(dòng)客戶端會(huì)得到以下預(yù)期效果:
服務(wù)端控制臺(tái)打?。嚎蛻舳丝刂婆_(tái)打印:我們關(guān)閉服務(wù)端后,客戶端控制臺(tái)打?。阂?yàn)榉?wù)端關(guān)閉,觸發(fā)了客戶端 EchoClientHandler 中的exceptionCaught()方法,打印出了異常堆棧并關(guān)閉了連接。
這只是一個(gè)簡(jiǎn)單的應(yīng)用程序,但是它可以伸縮到支持?jǐn)?shù)千個(gè)并發(fā)連接——每秒可以比普通的基于套接字的 Java 應(yīng)用程序處理多得多的消息。
最新資訊
關(guān)于我們| 聯(lián)系方式| 版權(quán)聲明| 供稿服務(wù)| 友情鏈接
咕嚕網(wǎng) www.ulq4xuwux.cn 版權(quán)所有,未經(jīng)書(shū)面授權(quán)禁止使用
Copyright©2008-2023 By All Rights Reserved 皖I(lǐng)CP備2022009963號(hào)-10
聯(lián)系我們: 39 60 29 14 2@qq.com