Netty 到底是何方神圣?
用一句简单的话来说就是:Netty 封装了 JDK 的 NIO,让你用得更爽,你不用再写一大堆复杂的代码了。
用官方正式的话来说就是:Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。
使用 Netty 不使用 JDK 原生 NIO 的原因
- 使用 JDK 自带的 NIO 需要了解太多的概念,编程复杂,一不小心 bug 横飞
- Netty 底层 IO 模型随意切换,而这一切只需要做微小的改动,改改参数,Netty 可以直接从 NIO 模型变身为 IO 模型
- Netty 自带的拆包解包,异常检测等机制让你从 NIO 的繁重细节中脱离出来,让你只需要关心业务逻辑
- Netty 解决了 JDK 的很多包括空轮询在内的 Bug
- Netty 底层对线程,selector 做了很多细小的优化,精心设计的 reactor 线程模型做到非常高效的并发处理
- 自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
- Netty 社区活跃,遇到问题随时邮件列表或者 issue
- Netty 已经历各大 RPC 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大
强烈不建议直接基于 JDK 原生 NIO 来进行网络开发,原因:
- JDK 的 NIO 编程需要了解很多的概念,编程复杂,对 NIO 入门非常不友好,编程模型不友好,ByteBuffer 的 Api 简直反人类
- 对 NIO 编程来说,一个比较合适的线程模型能充分发挥它的优势,而 JDK 没有给你实现,你需要自己实现,就连简单的自定义协议拆包都要你自己实现
- JDK 的 NIO 底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%
- 项目庞大之后,自行实现的 NIO 很容易出现各类 bug,维护成本较高
服务端启动流程
总结:
- Netty 服务端启动的流程,一句话来说就是:创建一个引导类,然后给他指定线程模型,IO 模型,连接读写处理逻辑,绑定端口之后,服务端就启动起来了。
- bind 方法是异步的,我们可以通过这个异步机制来实现端口递增绑定。
- Netty 服务端启动额外的参数,主要包括给服务端 Channel 或者客户端 Channel 设置属性值,设置底层 TCP 参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class NettyServer {
public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() { @Override protected void initChannel(NioServerSocketChannel ch) { System.out.println("服务端启动中"); } });
serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer");
int port = 8000; NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }); bind(serverBootstrap, port);
}
private static void bind(final ServerBootstrap serverBootstrap, final int port) { serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { System.out.println("端口[" + port + "]绑定成功!"); } else { System.err.println("端口[" + port + "]绑定失败!"); bind(serverBootstrap, port + 1); } }); }
}
|
客户端启动流程
总结:
- 一句话来说就是:创建一个引导类,然后给他指定线程模型,IO 模型,连接读写处理逻辑,连接上特定主机和端口,客户端就启动起来了。
- connect 方法是异步的,我们可以通过这个异步回调机制来实现指数退避重连逻辑。
- netty 客户端启动额外的参数,主要包括给客户端 Channel 绑定自定义属性值,设置底层 TCP 参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class NettyClient {
private static int MAX_RETRY = 5;
public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } });
connect(bootstrap, "127.0.0.1", 8000, MAX_RETRY); }
private static void connect(Bootstrap bootstrap, String host, int port, int retry) { bootstrap.connect(host, port).addListener(future -> { if (future.isSuccess()) { System.out.println("连接成功!"); } else if (retry == 0) { System.err.println("重试次数已用完,放弃连接!"); } else { int order = (MAX_RETRY - retry) + 1; int delay = 1 << order; System.err.println(new Date() + ": 连接失败,第" + order + "次重连……"); bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS); } }); } }
|
双向通信
- 通过给逻辑处理链 pipeline 添加逻辑处理器,来编写数据的读写逻辑。
- 在客户端连接成功之后会回调到逻辑处理器的 channelActive() 方法,而不管是服务端还是客户端,收到数据之后都会调用到 channelRead 方法。
- 写数据调用writeAndFlush方法,客户端与服务端交互的二进制数据载体为 ByteBuf,ByteBuf 通过连接的内存管理器创建,字节数据填充到 ByteBuf 之后才能写到对端
client端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new FirstClientHandler()); } });
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客户端写出数据"); ByteBuf buffer = getByteBuf(ctx, "你好,我是client1"); ctx.channel().writeAndFlush(buffer); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 客户端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8"))); }
private ByteBuf getByteBuf(ChannelHandlerContext ctx, String content) {
ByteBuf buffer = ctx.alloc().buffer(); byte[] bytes = content.getBytes(Charset.forName("utf-8")); buffer.writeBytes(bytes);
return buffer; } }
|
server端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new FirstServerHandler()); } });
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 接入netter server ! "); ByteBuf buffer = getByteBuf(ctx,"hi,i'm netty server!"); ctx.channel().writeAndFlush(buffer); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg; System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));
System.out.println(new Date() + ": 服务端写出数据"); ByteBuf out = getByteBuf(ctx,"netty server response success."); ctx.channel().writeAndFlush(out); }
private ByteBuf getByteBuf(ChannelHandlerContext ctx,String content) {
byte[] bytes = content.getBytes(Charset.forName("utf-8")); ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(bytes); return buffer; } }
|